quoc-t-le removed a comment on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821535724


   yeah at the moment I cant continue to use the subdag because of 
prev_execution_date not correctly passed down to subdag or something with 
Airflow 2: [https://github.com/apache/airflow/issues/15396](url)
   
   I tried BranchPythonOperator but doesnt work as expected.  In theory, this 
should branch to the skip and execute t2...
   
   `from airflow import DAG
   from datetime import datetime
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.python_operator import ShortCircuitOperator, 
PythonOperator, BranchPythonOperator
   from airflow.utils.task_group import TaskGroup
   
   default_args = {
       'owner': 'airflow',
       'retries': 3,
       'depends_on_past': False,
   }
   
   def branch_op (*arg, **kwargs):
       category = kwargs.get('category')
       if (category=='t1'):
           return 't1.skip'
   
       if (category=='t2'):
           return 't2.skip'
   
   with DAG ("short-circuit",
             catchup=True,
             default_args=default_args,
             schedule_interval='@daily',
             description='Aggregates and pulls down data for API endpoints that 
use analytics',
             start_date=datetime.strptime('04/14/2021', '%m/%d/%Y'),
             max_active_runs=1
   ) as dag:
       t0 = DummyOperator(task_id='start')
       with TaskGroup('t1') as t1:
           s1 = BranchPythonOperator(
               task_id='short_circuit',
               python_callable=branch_op,
               provide_context=True,
               op_kwargs={"category": "t1"}
           )
           s2 = DummyOperator(task_id='t1s2')
           s3 = DummyOperator(task_id='t1s3')
           s4 = DummyOperator(task_id='t1s4')
           s5 = DummyOperator(task_id='t1s5')
           s6 = DummyOperator(task_id='t1s6')
           s7 = DummyOperator(task_id='t1s7')
           s8 = DummyOperator(task_id='t1s8')
           s9 = DummyOperator(task_id='t1s9')
           s10 = DummyOperator(task_id='skip')
           s1 >> s2 >> s3 >> s4
           s4 >> s5
           s5 >> s6 >> s7
           s7 >> s8 >> s9 >> s10
   
   
       with TaskGroup('t2') as t2:
           s1 = BranchPythonOperator(
                       task_id='short_circuit',
                       python_callable=branch_op,
                       provide_context=True,
                       op_kwargs={"category": "t2"}
                   )
           s2 = DummyOperator(task_id='t1s2')
           s3 = DummyOperator(task_id='t1s3')
           s4 = DummyOperator(task_id='t1s4')
           s5 = DummyOperator(task_id='t1s5')
           s6 = DummyOperator(task_id='t1s6')
           s7 = DummyOperator(task_id='t1s7')
           s8 = DummyOperator(task_id='t1s8')
           s9 = DummyOperator(task_id='t1s9')
           s10 = DummyOperator(task_id='skip')
           s1 >> s2 >> s3 >> s4
           s4 >> s5
           s5 >> s6 >> s7
           s7 >> s8 >> s9 >> s10
       t0 >> t1 >> t2`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to