Airflow-DAGs planen und auslösen

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite wird erläutert, wie die Planung und das Auslösen von DAGs in Airflow funktionieren, wie Sie einen Zeitplan für einen DAG definieren und wie Sie einen DAG manuell auslösen oder pausieren.

Airflow-DAGs in Cloud Composer

Airflow-DAGs in Cloud Composer werden in einer oder mehreren Cloud Composer-Umgebungen in Ihrem Projekt ausgeführt. Sie laden die Quelldateien Ihrer Airflow-DAGs in einen Cloud Storage-Bucket hoch, der einer Umgebung zugeordnet ist. Die Airflow-Instanz der Umgebung parst diese Dateien und plant DAG-Ausführungen gemäß dem Zeitplan der einzelnen DAGs. Während einer DAG-Ausführung plant und führt Airflow einzelne Aufgaben aus, aus denen ein DAG besteht, in einer vom DAG definierten Reihenfolge.

Weitere Informationen zu den Kernkonzepten von Airflow wie Airflow-DAGs, DAG-Ausführungen, Aufgaben oder Operatoren finden Sie in der Airflow-Dokumentation auf der Seite Core Concepts (Kernkonzepte).

DAG-Planung in Airflow

Airflow bietet die folgenden Konzepte für den Planungsmechanismus:

Logisches Datum

Stellt ein Datum dar, für das ein bestimmter DAG-Lauf ausgeführt wird.

Dies ist nicht das tatsächliche Datum, an dem Airflow einen DAG ausführt, sondern ein Zeitraum, der von einer bestimmten DAG-Ausführung verarbeitet werden muss. Bei einem DAG, der täglich um 12:00 Uhr ausgeführt werden soll, wäre das logische Datum beispielsweise auch 12:00 Uhr an einem bestimmten Tag. Da sie zweimal täglich ausgeführt wird, muss sie die Daten der letzten 12 Stunden verarbeiten. Gleichzeitig wird in der im DAG selbst definierten Logik möglicherweise weder das logische Datum noch das Zeitintervall verwendet. Ein DAG kann beispielsweise dasselbe Skript einmal pro Tag ausführen, ohne den Wert des logischen Datums zu verwenden.

In Airflow-Versionen vor 2.2 wird dieses Datum als Ausführungsdatum bezeichnet.

Ausführungsdatum

Gibt ein Datum an, an dem ein bestimmter DAG-Lauf ausgeführt wird.

Bei einem DAG, der täglich um 12:00 Uhr ausgeführt werden soll, kann die tatsächliche Ausführung des DAG beispielsweise um 12:05 Uhr erfolgen, also einige Zeit nach dem logischen Datum.

Zeitplanintervall

Gibt an, wann und wie oft ein DAG in Bezug auf logische Datumsangaben ausgeführt werden muss.

Bei einem täglichen Zeitplan wird ein DAG beispielsweise einmal pro Tag ausgeführt und die logischen Datumsangaben für die DAG-Ausführungen haben 24-Stunden-Intervalle.

Startdatum

Gibt an, wann Airflow Ihren DAG planen soll.

Aufgaben in Ihrem DAG können individuelle Startzeiten haben oder Sie können ein einziges Startdatum für alle Aufgaben festlegen. Basierend auf dem Mindeststartdatum für Aufgaben in Ihrem DAG und dem Zeitplanintervall plant Airflow DAG-Ausführungen.

Aufholen, Backfill und Wiederholungen

Mechanismen zum Ausführen von DAG-Ausführungen für vergangene Datumsangaben.

Bei Catchup werden DAG-Ausführungen ausgeführt, die noch nicht erfolgt sind, z. B. wenn der DAG für einen längeren Zeitraum pausiert und dann wieder aktiviert wurde. Mit Backfill können Sie DAG-Ausführungen für einen bestimmten Zeitraum ausführen. Mit „Wiederholungen“ wird angegeben, wie viele Versuche Airflow beim Ausführen von Aufgaben aus einem DAG unternehmen muss.

Die Planung funktioniert folgendermaßen:

  1. Nach Ablauf des Startdatums wartet Airflow auf das nächste Vorkommen des Zeitplanintervalls.

  2. Airflow plant die erste DAG-Ausführung am Ende dieses Zeitplanintervalls.

    Wenn ein DAG beispielsweise jede Stunde ausgeführt werden soll und das Startdatum um 12:00 Uhr ist, erfolgt die erste DAG-Ausführung heute um 13:00 Uhr.

Im Abschnitt Airflow-DAG planen in diesem Dokument wird beschrieben, wie Sie die Planung für Ihre DAGs mithilfe dieser Konzepte einrichten. Weitere Informationen zu DAG-Ausführungen und zur Planung finden Sie in der Airflow-Dokumentation unter DAG-Ausführungen.

Möglichkeiten zum Auslösen eines DAG

Airflow bietet die folgenden Möglichkeiten zum Auslösen eines DAG:

  • Trigger nach Zeitplan starten. Airflow löst den DAG anhand des in der DAG-Datei angegebenen Zeitplans automatisch aus.

  • Manuell auslösen. Sie können einen DAG manuell über dieGoogle Cloud -Konsole, die Airflow-UI oder durch einen Airflow-Befehlszeilenbefehl über die Google Cloud CLI auslösen.

  • Trigger als Reaktion auf Ereignisse. Als Standardmethode zum Auslösen eines DAG als Reaktion auf Ereignisse wird die Verwendung eines Sensors verwendet.

Weitere Möglichkeiten zum Auslösen von DAGs:

Hinweise

  • Prüfen Sie, ob Ihr Konto eine Rolle hat, mit der Objekte in den Umgebungs-Buckets verwaltet und DAGs aufgerufen und ausgelöst werden können. Weitere Informationen finden Sie unter Zugriffssteuerung.

Airflow-DAG planen

Sie definieren einen Zeitplan für einen DAG in der DAG-Datei. Bearbeiten Sie die DAG-Definition so:

  1. Suchen Sie die DAG-Datei auf Ihrem Computer und bearbeiten Sie sie. Wenn Sie die DAG-Datei nicht haben, können Sie eine Kopie aus dem Bucket der Umgebung herunterladen. Bei einem neuen DAG können Sie alle Parameter beim Erstellen der DAG-Datei definieren.

  2. Definieren Sie den Zeitplan im Parameter schedule_interval. Sie können einen Cron-Ausdruck wie 0 0 * * * oder eine Voreinstellung wie @daily verwenden. Weitere Informationen finden Sie in der Airflow-Dokumentation unter Cron and Time Intervals.

    Airflow bestimmt logische Datumsangaben für DAG-Ausführungen basierend auf dem von Ihnen festgelegten Zeitplan.

  3. Geben Sie im Parameter start_date das Startdatum an.

    Airflow bestimmt das logische Datum der ersten DAG-Ausführung anhand dieses Parameters.

  4. Optional: Im Parameter catchup können Sie definieren, ob Airflow alle vorherigen Ausführungen dieses DAG vom Startdatum bis zum aktuellen Datum ausführen muss, die noch nicht ausgeführt wurden.

    Bei DAG-Ausführungen, die während des Catch-up ausgeführt werden, liegt das logische Datum in der Vergangenheit und das Ausführungsdatum gibt den Zeitpunkt an, zu dem die DAG-Ausführung tatsächlich ausgeführt wurde.

  5. Optional: Definieren Sie im Parameter retries, wie oft Airflow Aufgaben, die fehlgeschlagen sind, wiederholen muss. Jeder DAG besteht aus einer oder mehreren einzelnen Aufgaben. Standardmäßig werden Aufgaben in Cloud Composer zweimal wiederholt.

  6. Laden Sie die neue Version des DAG in den Bucket der Umgebung hoch.

  7. Warten Sie, bis Airflow den DAG erfolgreich geparst hat. Sie können beispielsweise die Liste der DAGs in Ihrer Umgebung in derGoogle Cloud -Konsole oder in der Airflow-Benutzeroberfläche aufrufen.

Die folgende Beispiel-DAG-Definition wird zweimal täglich um 00:00 Uhr und 12:00 Uhr ausgeführt. Das Startdatum ist auf den 1. Januar 2024 festgelegt, aber Airflow führt den DAG nicht für vergangene Daten aus, nachdem Sie ihn hochgeladen oder pausiert haben, da der Catch-up deaktiviert ist.

Der DAG enthält eine Aufgabe namens insert_query_job, mit der mit dem Operator BigQueryInsertJobOperator eine Zeile in eine Tabelle eingefügt wird. Dieser Operator ist einer der Google Cloud BigQuery-Operatoren, mit denen Sie Datasets und Tabellen verwalten, Abfragen ausführen und Daten validieren können. Wenn eine bestimmte Ausführung dieser Aufgabe fehlschlägt, wird sie von Airflow mit dem Standardwiederholungsintervall viermal wiederholt. Das logische Datum für diese Wiederholungsversuche bleibt gleich.

In der SQL-Abfrage für diese Zeile werden Airflow-Vorlagen verwendet, um das logische Datum und den Namen des DAG in die Zeile zu schreiben.

import datetime  from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator  with DAG(   "bq_example_scheduling_dag",   start_date=datetime.datetime(2024, 1, 1),   schedule_interval='0 */12 * * *',   catchup=False   ) as dag:    insert_query_job = BigQueryInsertJobOperator(     task_id="insert_query_job",     retries=4,     configuration={         "query": {             # schema: date (string), description (string)             # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"             "query": "INSERT example_dataset.example_table VALUES ('&lcub;&lcub; ts_nodash &rcub;&rcub;', 'DAG run: &lcub;&lcub; dag &rcub;&rcub;' )",             "useLegacySql": False,             "priority": "BATCH",         }     },     location="us-central1"   )    insert_query_job 

Wenn Sie diese DAG testen möchten, können Sie sie manuell auslösen und dann die Logs der Aufgabenausführung ansehen.

Weitere Beispiele für Planungsparameter

Die folgenden Beispiele für Planungsparameter zeigen, wie die Planung mit verschiedenen Parameterkombinationen funktioniert:

  • Wenn start_date den Wert datetime(2024, 4, 4, 16, 25) und schedule_interval den Wert 30 16 * * * hat, erfolgt die erste DAG-Ausführung um 16:30 Uhr am 5. April 2024.

  • Wenn start_date den Wert datetime(2024, 4, 4, 16, 35) und schedule_interval den Wert 30 16 * * * hat, erfolgt die erste DAG-Ausführung um 16:30 Uhr am 6. April 2024. Da das Startdatum nach dem Zeitplanintervall am 4. April 2024 liegt, erfolgt die DAG-Ausführung nicht am 5. April 2024. Stattdessen endet das Zeitplanintervall am 5. April 2024 um 16:35 Uhr. Die nächste DAG-Ausführung wird also am folgenden Tag um 16:30 Uhr geplant.

  • Wenn start_date datetime(2024, 4, 4) und der schedule_interval @daily ist, wird die erste DAG-Ausführung am 5. April 2024 um 00:00 Uhr geplant.

  • Wenn start_date datetime(2024, 4, 4, 16, 30) und der schedule_interval 0 * * * * ist, wird die erste DAG-Ausführung am 4. April 2024 um 18:00 Uhr geplant. Nach dem angegebenen Datum und der angegebenen Uhrzeit plant Airflow eine DAG-Ausführung zur Minute 0 jeder Stunde. Der nächste Zeitpunkt, zu dem dies der Fall ist, ist 17:00 Uhr. Zu diesem Zeitpunkt plant Airflow eine DAG-Ausführung am Ende des Zeitplanintervalls, also um 18:00 Uhr.

DAG manuell auslösen

Wenn Sie einen Airflow-DAG manuell auslösen, führt Airflow den DAG einmal aus, unabhängig vom Zeitplan, der in der DAG-Datei angegeben ist.

Console

So lösen Sie einen DAG über die Google Cloud -Konsole aus:

  1. Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Wählen Sie eine Umgebung aus, um die zugehörigen Details anzuzeigen.

  3. Wechseln Sie auf der Seite Umgebungsdetails zum Tab DAGs.

  4. Klicken Sie auf den Namen eines DAG.

  5. Klicken Sie auf der Seite DAG-Details auf DAG auslösen. Eine neue DAG-Ausführung wird erstellt.

Airflow-UI

So lösen Sie einen DAG über die Airflow-UI aus:

  1. Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Melden Sie sich mit dem Google-Konto an, das über die entsprechenden Berechtigungen verfügt.

  4. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Aktionen für Ihren DAG auf die Schaltfläche DAG auslösen.

gcloud

Führen Sie den Befehl dags trigger der Airflow-Befehlszeile aus:

  gcloud composer environments run ENVIRONMENT_NAME \     --location LOCATION \     dags trigger -- DAG_ID 

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • DAG_ID: der Name des DAG.

Weitere Informationen zum Ausführen von Befehlen der Airflow-Befehlszeile in Cloud Composer-Umgebungen finden Sie unter Befehle der Airflow-Befehlszeile ausführen.

Weitere Informationen zu den verfügbaren Airflow-Befehlszeilenbefehlen finden Sie in der Befehlsreferenz zu gcloud composer environments run.

Logs und Details zu DAG-Ausführungen ansehen

In der Google Cloud -Konsole haben Sie folgende Möglichkeiten:

Außerdem bietet Cloud Composer Zugriff auf die Airflow-UI, die eigene Weboberfläche von Airflow.

DAG pausieren

Console

So pausieren Sie einen DAG über die Google Cloud Console:

  1. Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Wählen Sie eine Umgebung aus, um die zugehörigen Details anzuzeigen.

  3. Wechseln Sie auf der Seite Umgebungsdetails zum Tab DAGs.

  4. Klicken Sie auf den Namen eines DAG.

  5. Klicken Sie auf der Seite DAG-Details auf DAG pausieren.

Airflow-UI

So pausieren Sie einen DAG über die Airflow-UI:

  1. Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.

Zur Seite Umgebungen

  1. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  2. Melden Sie sich mit dem Google-Konto an, das über die entsprechenden Berechtigungen verfügt.

  3. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs auf den Ein/Aus-Schalter neben dem Namen des DAG.

gcloud

Führen Sie den Befehl dags pause der Airflow-Befehlszeile aus:

  gcloud composer environments run ENVIRONMENT_NAME \     --location LOCATION \     dags pause -- DAG_ID 

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • DAG_ID: der Name des DAG.

Weitere Informationen zum Ausführen von Befehlen der Airflow-Befehlszeile in Cloud Composer-Umgebungen finden Sie unter Befehle der Airflow-Befehlszeile ausführen.

Weitere Informationen zu den verfügbaren Airflow-Befehlszeilenbefehlen finden Sie in der Befehlsreferenz zu gcloud composer environments run.

Nächste Schritte