添加和更新 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页面介绍如何在 Cloud Composer 环境中管理 DAG。

Cloud Composer 使用 Cloud Storage 存储桶来存储 Cloud Composer 环境的 DAG。您的环境会将此存储桶中的 DAG 同步到 Airflow 组件(例如 Airflow 工作器和调度器)。

准备工作

  • 由于 Apache Airflow 不提供强大的 DAG 隔离功能,建议您分开维护生产环境和测试环境,以防止产生 DAG 干扰。如需了解详情,请参阅测试 DAG
  • 确保您的账号具有足够的权限来管理 DAG。
  • 对 DAG 的更改会在 3-5 分钟内传播到 Airflow。您可以在 Airflow 网页界面中查看任务状态。

访问环境的存储桶

如需访问与您的环境关联的存储桶,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含您的环境名称的行,然后在 DAGs 文件夹列中点击 DAGs 链接。系统会打开存储分区详情页面。它会显示环境存储桶中 /dags 文件夹的内容。

gcloud

gcloud CLI 提供了单独的命令,用于在环境的存储桶中添加删除 DAG。

如果您想与环境的存储桶进行交互,也可以使用 Google Cloud CLI。如需获取环境的存储桶地址,请运行以下 gcloud CLI 命令:

gcloud composer environments describe ENVIRONMENT_NAME \     --location LOCATION \     --format="get(config.dagGcsPrefix)" 

您需要进行如下替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

例如:

gcloud beta composer environments describe example-environment \     --location us-central1 \     --format="get(config.dagGcsPrefix)" 

API

构建 environments.get API 请求。在环境资源中,在 EnvironmentConfig 资源中,dagGcsPrefix 资源是环境存储桶的地址。

示例:

GET https://composer.googleapis.com/v1/projects/example-project/ locations/us-central1/environments/example-environment 

Python

使用 google-auth 库获取凭据,并使用 requests 库来调用 REST API。

import google.auth import google.auth.transport.requests  # Authenticate with Google Cloud. # See: https://cloud.google.com/docs/authentication/getting-started credentials, _ = google.auth.default(     scopes=["https://www.googleapis.com/auth/cloud-platform"] ) authed_session = google.auth.transport.requests.AuthorizedSession(credentials)  # project_id = 'YOUR_PROJECT_ID' # location = 'us-central1' # composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'  environment_url = (     "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"     "/environments/{}" ).format(project_id, location, composer_environment) response = authed_session.request("GET", environment_url) environment_data = response.json()  # Print the bucket name from the response body. print(environment_data["config"]["dagGcsPrefix"])

添加或更新 DAG

如需添加或更新 DAG,请将 DAG 的 Python .py 文件移至环境的存储桶中的 /dags 文件夹。

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含您的环境名称的行,然后在 DAGs 文件夹列中点击 DAGs 链接。系统会打开存储分区详情页面。它会显示环境存储桶中 /dags 文件夹的内容。

  3. 点击上传文件。然后,使用浏览器的对话框选择 DAG 的 Python .py 文件并确认。

gcloud

gcloud composer environments storage dags import \     --environment ENVIRONMENT_NAME \     --location LOCATION \     --source="LOCAL_FILE_TO_UPLOAD" 

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • LOCAL_FILE_TO_UPLOAD 是 DAG 的 Python .py 文件。

示例:

gcloud composer environments storage dags import \     --environment example-environment \     --location us-central1 \     --source="example_dag.py" 

更新具有活跃 DAG 运行作业的 DAG

如果您更新具有活跃 DAG 运行作业的 DAG:

  • 所有当前正在执行的任务都会使用原始 DAG 文件继续运行。
  • 已安排但尚未运行的所有任务都会使用更新后的 DAG 文件。
  • 更新后的 DAG 文件中不再存在的所有任务都会被标记为已移除。

更新频繁运行的 DAG

上传 DAG 文件后,Airflow 需要一些时间来加载此文件并更新此 DAG。如果您的 DAG 频繁运行,您可能需要确保 DAG 使用更新后的 DAG 文件版本。为此,请执行以下操作:

  1. 在 Airflow 界面中暂停 DAG。

  2. 上传更新后的 DAG 文件。

  3. 请等待,直到您在 Airflow 界面中看到更新。这意味着调度器已正确解析 DAG,并在 Airflow 数据库中进行了更新。

    即使 Airflow 界面显示更新后的 DAG,也无法保证 Airflow 工作器拥有更新后的 DAG 文件版本。这是因为 DAG 文件不与调度器和工作器一起同步。

  4. 您可能需要延长等待时间,以确保 DAG 文件与环境中的所有工作器同步。系统每分钟执行多次同步。在运行状况良好的环境中,等待大约 20-30 秒足以让所有工作器完成同步。

  5. (可选)如果要完全确保所有工作器都具有新版 DAG 文件,请检查每个工作器的日志。为此,请执行以下操作:

    1. 在 Google Cloud 控制台中打开环境的日志标签页。

    2. 依次前往 Composer 日志 > 基础架构 > Cloud Storage 同步项,然后检查环境中每个工作器的日志。查找时间戳在您上传新 DAG 文件之后的最新 Syncing dags directory 日志项。如果您看到后面跟随 Finished syncing 项,则相应 DAG 在此工作器上成功同步。

  6. 取消暂停 DAG。

在环境中删除 DAG

如需删除 DAG,请从环境的存储桶中移除该 DAG 的 Python .py 文件。/dags

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含您的环境名称的行,然后在 DAGs 文件夹列中点击 DAGs 链接。系统会打开存储分区详情页面。它会显示您环境的存储桶中 /dags 文件夹的内容。

  3. 选择 DAG 文件,点击删除,然后确认操作。

gcloud

gcloud composer environments storage dags delete \     --environment ENVIRONMENT_NAME \     --location LOCATION \     DAG_FILE 

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_FILE 替换为 DAG 的 Python .py 文件。

示例:

gcloud composer environments storage dags delete \     --environment example-environment \     --location us-central1 \     example_dag.py 

从 Airflow 界面中移除 DAG

如需从 Airflow 网页界面中移除 DAG 的元数据,请执行以下操作:

Airflow 界面

  1. 前往 Airflow 界面
  2. 对于 DAG,请点击删除 DAG

gcloud

在低于 1.14.0 的 Airflow 1 版本中,请在 gcloud CLI 中运行以下命令:

  gcloud composer environments run ENVIRONMENT_NAME \     --location LOCATION \     delete_dag -- DAG_NAME 

在 Airflow 2、Airflow 1.14.0 及更高版本中,在 gcloud CLI 中运行以下命令:

  gcloud composer environments run ENVIRONMENT_NAME \     --location LOCATION \     dags delete -- DAG_NAME 

您需要进行如下替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_NAME 是要删除的 DAG 的名称。

后续步骤