[ https://issues.apache.org/jira/browse/AIRFLOW-3873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sajid Sajid reassigned AIRFLOW-3873: ------------------------------------ Assignee: Sajid Sajid > Issue with DAG dependency using ExternalTaskSensor > -------------------------------------------------- > > Key: AIRFLOW-3873 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3873 > Project: Apache Airflow > Issue Type: Bug > Components: DAG > Affects Versions: 1.10.1 > Environment: Running on Redhat Linux box on which Airflow is > Installed. > Reporter: Krishna Garapati > Assignee: Sajid Sajid > Priority: Critical > > I have two DAGs Created and want to set dependencies between them using > externalTaskSensor as shown below. I am getting the error as "Broken DAG: > [/data1/airflow/dags/testdagdependency.py] No module named snakebite.client". > Please help me on this. > ================================================================== > *DAG 1:* > from airflow import DAG > from airflow.operators.bash_operator import BashOperator > from datetime import datetime, timedelta > default_args = { > 'owner': 'Krishna Garapati', > 'depends_on_past': False, > 'start_date': datetime(2019, 2, 9), > 'email': ['krishna.garap...@transamerica.com'], > 'email_on_failure': True, > 'email_on_retry': True, > 'retries': 0, > 'retry_delay': timedelta(minutes=5) > #'queue': 'finance-ingestion', > # 'run_as_user': 'sptfinactmodel' > # 'pool': 'backfill', > # 'priority_weight': 10, > # 'end_date': datetime(2016, 1, 1), > } > dag = DAG('pythontest',default_args=default_args,schedule_interval='27 2 * * > *') > # t1, t2 and t3 are examples of tasks created by instantiating operators > t1 = BashOperator( > task_id='print_date', > bash_command='date', > dag=dag) > t2 = BashOperator( > task_id='pythontest', > bash_command='\{{"python > /preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}', > dag=dag) > t2.set_upstream(t1) > > ========================================================== > > *DAG 2 ( Keeping dependency on DAG1)* > > from airflow import DAG > from airflow.operators.bash_operator import BashOperator > from datetime import datetime, timedelta > from airflow.operators.sensors import ExternalTaskSensor > default_args = { > 'owner': 'Krishna Garapati', > 'depends_on_past': False, > 'start_date': datetime(2019, 2, 11), > 'email': ['krishna.garap...@transamerica.com'], > 'email_on_failure': True, > 'email_on_retry': True, > 'retries': 0, > 'retry_delay': timedelta(minutes=5) > #'queue': 'finance-ingestion', > # 'run_as_user': 'sptfinactmodel' > # 'pool': 'backfill', > # 'priority_weight': 10, > # 'end_date': datetime(2016, 1, 1), > } > dag = DAG('testdagdependency',default_args=default_args,schedule_interval='27 > 15 * * *') > wait_for_pythontest = ExternalTaskSensor( > task_id='wait_for_pythontest', > external_dag_id='pythontest', > external_task_id='pythontest', > execution_delta=None, # Same day as today > dag=dag) > # t1, t2 and t3 are examples of tasks created by instantiating operators > t1 = BashOperator( > task_id='print_date', > bash_command='date', > dag=dag) > t2 = BashOperator( > task_id='testdependency', > bash_command='\{{"python > /preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}', > dag=dag) > wait_for_pythontest >> testdagdependency > t2.set_upstream(t1) > > ===================================================== -- This message was sent by Atlassian Jira (v8.20.1#820001)