Dbt Einblicke

2022-11-11

Erwarten Sie keine Magie von dbt und gehen Sie nicht davon aus, dass es all Ihre Probleme löst. Stattdessen können Sie ein stabiles Framework erwarten, das Ihr Projekt so einfach wie möglich gestaltet. Der einzige Nebeneffekt dieser Vereinfachung, den ich festgestellt habe, ist, dass ich deutlich weniger Probleme habe und die, die auftreten, in der Regel einfach zu debuggen sind. Bedenken Sie, dass jedes Tool nur so intelligent arbeitet, wie wir es zulassen. Das Grundkonzept von dbt ist die Verwendung von Modellen, die in Kürze SQL-Anweisungen sind. Anstatt gespeicherte Prozeduren, dynamisches SQL oder Python-Datenaufbereitung zu verwenden, zwingt uns dbt dazu, uns an seine allgemeine Strategie zu halten. Es lässt uns keine andere Wahl, als es direkt zu verwenden und unsere Pipelines einfach und effizient zu organisieren. Langfristig bringt diese Vereinfachung Vorteile.

Sehen wir uns an, wie die Hauptfunktionen von dbt im Detail funktionieren. Ein einfaches Beispiel:

dbt run --profile dev --target local --model fact_events --full-refresh

10:53:35  Running with dbt=1.0.4
10:53:36  Change detected to override macro used during parsing. Starting full parse.
10:53:43  Concurrency: 1 threads (target='local')
10:53:43 1 of 1 START table model marcin.fact_events.......... [RUN]
10:53:44 1 of 1 OK created table model marcin.fact_events.......... [SELECT 0 in 0.50s]
10:53:44  Finished running 1 table model in 0.75s.

Beim Überprüfen der Logs habe ich festgestellt, dass viele Ausführungsargumente verfügbar sind. Die meisten Argumente verwenden Standardwerte.

10:53:35.805042 [debug] [MainThread]: running dbt with arguments Namespace(record_timing_info=None, debug=None, log_format=None, write_json=None, use_colors=None, printer_width=None, warn_error=
None, version_check=None, partial_parse=None, single_threaded=False, use_experimental_parser=None, static_parser=None, profiles_dir='/home/dell/.dbt', send_anonymous_usage_stats=None, fail_fast=None, 
event_buffer_size=None, project_dir=None, profile='dev', target='local', vars='{}', log_cache_events=False, threads=None, select=['fact_events'], exclude=None, selector_name=None, tate=None, defer=None, full_refresh=False, cls=<class 'dbt.task.run.RunTask'>, which='run', rpc_method='run')

Nach einem erfolgreichen Lauf werfen wir einen Blick in die Logs und prüfen, wie die dbt-Ausführungsreihenfolge aussieht. Im ersten Schritt analysiert dbt Jinja-Makros und kompiliert das Modell.

10:53:40 [debug] [MainThread]: 1602: parser fallback to jinja rendering on marcin.fact_events.sql

Anschließend wird eine Verbindung hergestellt.

10:53:43 [debug] [MainThread]: Acquiring new Postgres connection "master"

Dann wird eine Liste der in der Orchestrierung verwendeten Abhängigkeiten erstellt. In diesem Schritt prüft dbt, ob keine Schleifen vorhanden sind, und positioniert das Modell an der richtigen Stelle in unserer Pipeline. So wird der DAG (Directed Acyclic Graph) in dbt generiert.

with relation as (
  select
    pg_rewrite.ev_class as class,
    pg_rewrite.oid as id
  from pg_rewrite
),

class as (
  select
    oid as id,
    relname as name,
    relnamespace as schema,
    relkind as kind
  from pg_class
),

dependency as (
  select
    pg_depend.objid as id,
    pg_depend.refobjid as ref
  from pg_depend
),

schema as (
  select
    pg_namespace.oid as id,
    pg_namespace.nspname as name
  from pg_namespace
  where 
    pg_namespace.nspname != 'information_schema' and 
    pg_namespace.nspname not like 'pg\_%'
),

referenced as (
  select
    relation.id as id,
    referenced_class.name,
    referenced_class.schema,
    referenced_class.kind
  from relation
  inner join class as referenced_class 
    on relation.class = referenced_class.id
  where referenced_class.kind in ('r', 'v')
),

relationships as (
  select
    referenced.name as referenced_name,
    referenced.schema as referenced_schema_id,
    dependent_class.name as dependent_name,
    dependent_class.schema as dependent_schema_id,
    referenced.kind as kind
  from referenced
  inner join dependency 
    on referenced.id = dependency.id
  inner join class as dependent_class 
    on dependency.ref = dependent_class.id
  where
    (referenced.name != dependent_class.name
      or referenced.schema != dependent_class.schema)
)

select
  referenced_schema.name as referenced_schema,
  relationships.referenced_name as referenced_name,
  dependent_schema.name as dependent_schema,
  relationships.dependent_name as dependent_name
from relationships
inner join schema as dependent_schema 
  on relationships.dependent_schema_id = dependent_schema.id
inner join schema as referenced_schema 
  on relationships.referenced_schema_id = referenced_schema.id
group by 
  referenced_schema, 
  referenced_name, 
  dependent_schema, 
  dependent_name
order by 
  referenced_schema, 
  referenced_name, 
  dependent_schema, 
  dependent_name;

Anschließend prüft dbt die Konfigurationen. Diese können entweder in der Datei dbt_project.yml oder direkt in model.sql festgelegt werden. Dbt führt die konfigurierten pre_hooks aus, die vor der Modellausführung ausgeführt werden sollen, normalerweise jedoch in derselben Transaktion. Beispielsweise können wir pre_hooks verwenden, um Sitzungsvariablen zu setzen:

{{
  config(
    alias='fact_events',
    unique_key="event_id",
    materialized='incremental',
    pre_hook=["SET work_mem='4GB'"],
    post_hook=[after_commit("
      CREATE INDEX IF NOT EXISTS idx_{{ this.name }}_event_id ON {{ this }} (event_id);
    ")]
  )
}}

Anschließend generiert und führt dbt dynamisch eine Sequenz von dbt-generischen SQLs mit Jinja-Injektionen aus. Der im Modell definierte Code wird in diese Sequenz eingespeist. Die generischen SQLs werden damit angereichert und schließlich committet. Dbt verwendet hier eine gängige Strategie mit temporären Objekten und Umbenennungen. Dies erhöht die Verfügbarkeit von Tabellen, reduziert Sperren und ermöglicht bei Bedarf ein einfaches Rollback. Hier ist der Code für full-refresh. Ich denke, Sie haben so etwas Ähnliches schon einmal gesehen.

create temp table --> rename original table to backup --> rename temp table to original --> drop backup table
full-refresh flow
create  table "marcin"."fact_events__dbt_tmp" as (SELECT …);
alter table "marcin"."fact_events" rename to "fact_events__dbt_backup";
alter table "marcin"."fact_events__dbt_tmp" rename to "fact_events;
drop table if exists "marcin"."events__dbt_backup" cascade;
commit;

Im Fall eines inkrementellen Ladevorgangs sieht die Ausführungssequenz anders aus, da wir nur einen Teil der Daten verwenden. Wir führen die Beispiele auf PostgreSQL aus, daher sehen wir unten die Delete/Insert-Strategie. Die inkrementellen Strategien sind jedoch datenbankspezifisch, und bei anderen Datenbanksystemen könnten wir auch die Merge-Strategie verwenden. Inkrementelle Ladevorgänge sind im Data Engineering Alltag, doch hier erhalten wir eine generische Lösung, die das Projekt erneut einheitlich und wartungsfreundlich macht. Darüber hinaus ist die Portierung auf einen anderen Datenbank-Backend meist nur eine Frage kleiner Konfigurationsänderungen, während die Logik des Modells unberührt bleibt.

create temp table --> delete from target table --> insert into target table
create temporary table "fact_events__dbt_tmp152147555458" as (SELECT * FROM"source"."fact_events" WHERE updated_at > (SELECT MAX(updated_at) FROM "marcin"."fact_events"));
delete from "marcin"."fact_events" where (event_id) in (select (event_id) from "fact_events__dbt_tmp152147555458");
insert into "postgres"."marcin"."fact_events" select * from "fact_events__dbt_tmp152147555458")

Im Konfigurationsblock haben wir auch einen after_commit-post_hook definiert, den dbt außerhalb der Transaktion ausführt. Pre_hooks und post_hooks können innerhalb der Transaktion ausgeführt werden. In diesem Fall hatten wir jedoch einen wichtigen Grund, einen außerhalb zu platzieren. Da alle zuvor erstellten Indizes im Backup-Objekt verbleiben, bis die Transaktion abgeschlossen ist, erlaubt die Datenbank uns nicht, ein Duplikat zu erstellen.

Indizes haben innerhalb der Datenbank einzigartige Namen. Es ist unmöglich, zwei Indizes gleichzeitig mit demselben Namen zu haben. Deshalb müssen wir warten, bis das Backup-Objekt (mit den ursprünglichen Indizes) gelöscht wird, und dann mit neuen fortfahren. Das kann nur after_commit geschehen. Wir sehen, dass dbt uns in dieser Hinsicht viel Flexibilität bietet. Eine andere Möglichkeit ist, keinen Indexnamen anzugeben und sich stattdessen auf den automatisch generierten zu verlassen.

Ein weiteres gutes Beispiel ist der vacuum-post_hook, der auf Projektebene in der dbt_project.yml-Datei definiert ist. In diesem Fall müssen wir denselben Code nicht in jedem Modell wiederholen. So funktioniert DRY-Code in dbt.

VACUUM (ANALYZE) "postgres"."marcin"."fact_events";
CREATE INDEX IF NOT EXISTS idx_fact_event_id ON "marcin"."fact_events" (event_id);

Der Befehl dbt run kann eine komplexere Syntax haben. Wir können alle abhängigen Objekte mit oder ohne Tag oder andere Einschränkungen ausführen. Bedingungen können mit einem Komma (AND) oder ohne Komma (OR) kombiniert werden, um den genauen Satz von Bedingungen zu erhalten. Beispiel aus der dbt-Dokumentation:

$ dbt run --select path:marts/finance,tag:nightly,config.materialized:table

Andere Befehle wie dbt testdbt source freshness oder dbt snapshot sind ebenfalls generische SQL-Befehle, die ähnlich wie die oben genannten Befehle generiert und ausgeführt werden. In dbt sehen wir weder Raketenwissenschaft noch Magie. Wir müssen uns keine Gedanken über den Aufbau langsam veränderlicher Dimensionen (SCD Typ 2) oder Quellvalidierungen machen. Diese sind bereits vorhanden. Es ist uns gestattet, die generischen Makros zu manipulieren oder eigene zu erstellen, um außergewöhnliche Modelle anzupassen. Die Hauptidee bleibt jedoch, unser Projekt so einfach wie möglich zu gestalten.

not_null: Um die Datenqualität und die Qualität der Pipeline zu überprüfen. accepted_values: Um geschäftliche Annahmen und unerwartete Werte zu überprüfen.
uniqueness: Um die Granularität zu überprüfen und Duplikate zu finden.
integrity: Um die Integrität der Quelle und unserer Modelle zu überprüfen
dbt generic tests

Der wichtigste Punkt ist, dass alle dbt-Funktionen von einer zentralisierten Engine verwaltet und ausgeführt werden. Dbt bietet uns gemeinsame Kernfunktionen, die die meisten Anforderungen der Ingenieurwissenschaften abdecken und den Entwicklern Platz für kreative Jinja-Makros oder Community-Snippets lassen. Das macht dieses Tool einzigartig. Wir als Team sind in der Lage, dieselben Engineering-Standards und Kodiertechniken zu verwenden. Anfangs dachte ich, dass es unsere Arbeit zu sehr einschränken würde, aber nach einem Jahr der Nutzung stellte sich heraus, dass diese Einschränkungen klug sind. Die Pipelines, mit ihrer Einfachheit, sind nahezu fehlerfrei und viel einfacher zu debuggen.

pipeline stages

Innerhalb eines Projekts verwenden alle Modelle die gleichen Strategien und Makros. Noch einmal, ich empfehle dbt dringend für alle Projekte, die eine standardisierte Architektur und standardisierte Lösungen erfordern. Wenn die Natur unseres Projekts oder das Geschäft uns irgendwie dazu zwingt, für jedes Modell oder jede Pipeline unterschiedliche Techniken zu verwenden, kann dbt das wahrscheinlich handhaben, aber es wurde nicht dafür entwickelt, so zu arbeiten. Es wurde entwickelt, um Datenumwandlungen auf eine sehr bequeme und effiziente Weise zu vereinfachen und zu organisieren. Langfristig bringt diese Vereinfachung Vorteile.

Read More