Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa pagina descrive come gestire i DAG nell'ambiente Cloud Composer.
Cloud Composer utilizza un bucket Cloud Storage per archiviare i DAG del tuo ambiente Cloud Composer. Il tuo ambiente sincronizza i DAG da questo bucket con i componenti Airflow come i worker e gli scheduler di Airflow.
Prima di iniziare
- Poiché Apache Airflow non fornisce un forte isolamento dei DAG, ti consigliamo di mantenere ambienti di produzione e test separati per evitare interferenze tra i DAG. Per ulteriori informazioni, consulta Testare i DAG.
- Assicurati che il tuo account disponga di autorizzazioni sufficienti per gestire i DAG.
- Le modifiche al DAG vengono propagate ad Airflow entro 3-5 minuti. Puoi vedere lo stato dell'attività nell'interfaccia web di Airflow.
Accedere al bucket del tuo ambiente
Per accedere al bucket associato al tuo ambiente:
Console
Nella console Google Cloud , vai alla pagina Ambienti.
Nell'elenco degli ambienti, trova una riga con il nome del tuo ambiente e nella colonna Cartella DAG fai clic sul link DAG. Viene visualizzata la pagina Dettagli bucket. Mostra i contenuti della cartella
/dags
nel bucket dell'ambiente.
gcloud
gcloud CLI dispone di comandi separati per aggiungere ed eliminare i DAG nel bucket del tuo ambiente.
Se vuoi interagire con il bucket del tuo ambiente, puoi utilizzare anche Google Cloud CLI. Per ottenere l'indirizzo del bucket del tuo ambiente, esegui questo comando gcloud CLI:
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="get(config.dagGcsPrefix)"
Sostituisci:
ENVIRONMENT_NAME
con il nome dell'ambiente.LOCATION
con la regione in cui si trova l'ambiente.
Esempio:
gcloud beta composer environments describe example-environment \ --location us-central1 \ --format="get(config.dagGcsPrefix)"
API
Crea una richiesta API environments.get
. Nella risorsa Environment, nella risorsa EnvironmentConfig, nella risorsa dagGcsPrefix
si trova l'indirizzo del bucket del tuo ambiente.
Esempio:
GET https://composer.googleapis.com/v1/projects/example-project/ locations/us-central1/environments/example-environment
Python
Utilizza la libreria google-auth per ottenere le credenziali e la libreria requests per chiamare l'API REST.
Aggiungere o aggiornare un DAG
Per aggiungere o aggiornare un DAG, sposta il file Python .py
del DAG nella cartella /dags
del bucket dell'ambiente.
Console
Nella console Google Cloud , vai alla pagina Ambienti.
Nell'elenco degli ambienti, trova una riga con il nome del tuo ambiente e nella colonna Cartella DAG fai clic sul link DAG. Viene visualizzata la pagina Dettagli bucket. Mostra i contenuti della cartella
/dags
nel bucket dell'ambiente.Fai clic su Carica file. Poi seleziona il file Python
.py
per il DAG utilizzando la finestra di dialogo del browser e conferma.
gcloud
gcloud composer environments storage dags import \ --environment ENVIRONMENT_NAME \ --location LOCATION \ --source="LOCAL_FILE_TO_UPLOAD"
Sostituisci:
ENVIRONMENT_NAME
con il nome dell'ambiente.LOCATION
con la regione in cui si trova l'ambiente.LOCAL_FILE_TO_UPLOAD
è il file Python.py
per il DAG.
Esempio:
gcloud composer environments storage dags import \ --environment example-environment \ --location us-central1 \ --source="example_dag.py"
Aggiorna un DAG con esecuzioni di DAG attive
Se aggiorni un DAG con esecuzioni attive:
- Tutte le attività attualmente in esecuzione vengono completate utilizzando il file DAG originale.
- Tutte le attività pianificate ma non attualmente in esecuzione utilizzano il file DAG aggiornato.
- Tutte le attività non più presenti nel file DAG aggiornato vengono contrassegnate come rimosse.
Aggiornare i DAG eseguiti con una pianificazione frequente
Dopo aver caricato un file DAG, Airflow impiega un po' di tempo per caricare il file e aggiornare il DAG. Se il DAG viene eseguito con una pianificazione frequente, ti consigliamo di assicurarti che utilizzi la versione aggiornata del file DAG. Ecco come fare:
Metti in pausa il DAG nell'interfaccia utente di Airflow.
Carica un file DAG aggiornato.
Attendi finché non visualizzi gli aggiornamenti nell'interfaccia utente di Airflow. Ciò significa che il DAG è stato analizzato correttamente dallo scheduler e aggiornato nel database Airflow.
Se l'interfaccia utente di Airflow mostra i DAG aggiornati, ciò non garantisce che i worker di Airflow dispongano della versione aggiornata del file DAG. Ciò accade perché i file DAG vengono sincronizzati in modo indipendente per gli scheduler e i worker.
Potresti voler estendere il tempo di attesa per assicurarti che il file DAG venga sincronizzato con tutti i worker del tuo ambiente. La sincronizzazione avviene diverse volte al minuto. In un ambiente sano, l'attesa di circa 20-30 secondi è sufficiente per la sincronizzazione di tutti i worker.
(Facoltativo) Se vuoi assicurarti che tutti i worker abbiano la nuova versione del file DAG, esamina i log di ogni singolo worker. Ecco come fare:
Apri la scheda Log per il tuo ambiente nella console Google Cloud .
Vai a Log di Composer > Infrastruttura > Sincronizzazione di Cloud Storage e controlla i log di ogni worker nel tuo ambiente. Cerca la voce di log
Syncing dags directory
più recente con un timestamp successivo al caricamento del nuovo file DAG. Se vedi un elementoFinished syncing
che lo segue, significa che i DAG sono stati sincronizzati correttamente su questo worker.
Riattiva il DAG.
Eliminare un DAG nell'ambiente
Per eliminare un DAG, rimuovi il file Python .py
per il DAG dalla cartella /dags
dell'ambiente nel bucket dell'ambiente.
Console
Nella console Google Cloud , vai alla pagina Ambienti.
Nell'elenco degli ambienti, trova una riga con il nome del tuo ambiente e nella colonna Cartella DAG fai clic sul link DAG. Viene visualizzata la pagina Dettagli bucket. Mostra i contenuti della cartella
/dags
nel bucket del tuo ambiente.Seleziona il file DAG, fai clic su Elimina e conferma l'operazione.
gcloud
gcloud composer environments storage dags delete \ --environment ENVIRONMENT_NAME \ --location LOCATION \ DAG_FILE
Sostituisci:
ENVIRONMENT_NAME
con il nome dell'ambiente.LOCATION
con la regione in cui si trova l'ambiente.DAG_FILE
con il file Python.py
per il DAG.
Esempio:
gcloud composer environments storage dags delete \ --environment example-environment \ --location us-central1 \ example_dag.py
Rimuovere un DAG dall'UI di Airflow
Per rimuovere i metadati di un DAG dall'interfaccia web di Airflow:
UI di Airflow
- Vai alla UI di Airflow per il tuo ambiente.
- Per il DAG, fai clic su Elimina DAG.
gcloud
Nelle versioni di Airflow 1 precedenti alla 1.14.0, esegui questo comando nella gcloud CLId:
gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ delete_dag -- DAG_NAME
In Airflow 2, Airflow 1.14.0 e versioni successive, esegui il seguente comando nellgcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags delete -- DAG_NAME
Sostituisci:
ENVIRONMENT_NAME
con il nome dell'ambiente.LOCATION
con la regione in cui si trova l'ambiente.DAG_NAME
è il nome del DAG da eliminare.
Passaggi successivi
- Scrivere DAG
- Testare, sincronizzare ed eseguire il deployment dei DAG da GitHub
- Risoluzione dei problemi dei DAG