John Longo created AIRFLOW-3097: ----------------------------------- Summary: Capability for nested SubDags Key: AIRFLOW-3097 URL: https://issues.apache.org/jira/browse/AIRFLOW-3097 Project: Apache Airflow Issue Type: New Feature Components: subdag Affects Versions: 1.8.0 Reporter: John Longo
Unless I'm doing something incorrectly, it appears that you cannot nest SubDags which would be a very helpful feature. I've created a simple pipeline to demonstrate the failure case below: test_dag.py {code:java} from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator import datetime from datetime import timedelta from test_subdag1 import TestSubDag1 startDate = '2018-09-20' default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['em...@airflow.com'], 'start_date': datetime.datetime(2018, 3, 20, 9, 0), 'email_on_failure': False, 'email_on_retry': False, 'retries': 5, 'retry_delay': timedelta(seconds=30), 'run_as_user': 'airflow' } Test_DAG = DAG('Test_DAG', default_args=default_args, start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None, catchup=False) test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1', startDate), task_id='test_subdag1', dag=Test_DAG) TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate', dag=Test_DAG) test_subdag1 >> TestDagConsolidateTask {code} test_subdag1.py {code:java} from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator from test_subdag2 import TestSubDag2 import datetime from datetime import timedelta def TestSubDag1(parent_dag_name, child_dag_name, startDate): subdag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=None, start_date=startDate) test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name, child_dag_name), 'test_subdag2', startDate), task_id='test_subdag2', dag=subdag) Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate', dag=subdag) test_subdag2 >> Subdag1ConsolidateTask {code} test_subdag2.py {code:java} // code placeholder from airflow import DAG from airflow.operators.dummy_operator import DummyOperator import datetime from datetime import timedelta def TestSubDag2(parent_dag_name, child_dag_name, startDate): subdag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=None, start_date=startDate) TestTask = DummyOperator(task_id='TestTask', dag=subdag) Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate', dag=subdag) TestTask >> Subdag2ConsolidateTask {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)