Krishna Garapati created AIRFLOW-3873:
-----------------------------------------

             Summary: Issue with DAG dependency using ExternalTaskSensor
                 Key: AIRFLOW-3873
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3873
             Project: Apache Airflow
          Issue Type: Bug
          Components: DAG
    Affects Versions: 1.10.1
         Environment: Running on Redhat Linux box on which Airflow is Installed.
            Reporter: Krishna Garapati
             Fix For: 1.10.2


I have two DAGs Created and want to set dependencies between them using 
externalTaskSensor as shown below. I am getting the error as "Broken DAG: 
[/data1/airflow/dags/testdagdependency.py] No module named snakebite.client". 
Please help me on this.

==================================================================

*DAG 1:* 

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
 'owner': 'Krishna Garapati',
 'depends_on_past': False,
 'start_date': datetime(2019, 2, 9),
 'email': ['[email protected]'],
 'email_on_failure': True,
 'email_on_retry': True,
 'retries': 0,
 'retry_delay': timedelta(minutes=5)
 #'queue': 'finance-ingestion',
 # 'run_as_user': 'sptfinactmodel'
 # 'pool': 'backfill',
 # 'priority_weight': 10,
 # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('pythontest',default_args=default_args,schedule_interval='27 2 * * *')

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
 task_id='print_date',
 bash_command='date',
 dag=dag)

t2 = BashOperator(
 task_id='pythontest',
 bash_command='\{{"python 
/preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}',
 dag=dag)

t2.set_upstream(t1)

 

==========================================================

 

*DAG 2 ( Keeping dependency on DAG1)*

 

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor

default_args = {
 'owner': 'Krishna Garapati',
 'depends_on_past': False,
 'start_date': datetime(2019, 2, 11),
 'email': ['[email protected]'],
 'email_on_failure': True,
 'email_on_retry': True,
 'retries': 0,
 'retry_delay': timedelta(minutes=5)
 #'queue': 'finance-ingestion',
 # 'run_as_user': 'sptfinactmodel'
 # 'pool': 'backfill',
 # 'priority_weight': 10,
 # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('testdagdependency',default_args=default_args,schedule_interval='27 
15 * * *')

wait_for_pythontest = ExternalTaskSensor(
 task_id='wait_for_pythontest',
 external_dag_id='pythontest',
 external_task_id='pythontest',
 execution_delta=None, # Same day as today
 dag=dag)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
 task_id='print_date',
 bash_command='date',
 dag=dag)

t2 = BashOperator(
 task_id='testdependency',
 bash_command='\{{"python 
/preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}',
 dag=dag)


wait_for_pythontest >> testdagdependency
t2.set_upstream(t1)

 

=====================================================



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to