[ https://issues.apache.org/jira/browse/AIRFLOW-3097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
John Longo updated AIRFLOW-3097: -------------------------------- Description: Unless I'm doing something incorrectly, it appears that you cannot nest SubDags which would be a very helpful feature. I've created a simple pipeline to demonstrate the failure case below. It produces the following in Airflow: Broken DAG: [/home/airflow/airflow/dags/test_dag.py] 'NoneType' object has no attribute 'dag_id' test_dag.py {code:java} from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator import datetime from datetime import timedelta from test_subdag1 import TestSubDag1 startDate = '2018-09-20' default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['em...@airflow.com'], 'start_date': datetime.datetime(2018, 3, 20, 9, 0), 'email_on_failure': False, 'email_on_retry': False, 'retries': 5, 'retry_delay': timedelta(seconds=30), 'run_as_user': 'airflow' } Test_DAG = DAG('Test_DAG', default_args=default_args, start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None, catchup=False) test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1', startDate), task_id='test_subdag1', dag=Test_DAG) TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate', dag=Test_DAG) test_subdag1 >> TestDagConsolidateTask {code} test_subdag1.py {code:java} from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator from test_subdag2 import TestSubDag2 import datetime from datetime import timedelta def TestSubDag1(parent_dag_name, child_dag_name, startDate): subdag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=None, start_date=startDate) test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name, child_dag_name), 'test_subdag2', startDate), task_id='test_subdag2', dag=subdag) Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate', dag=subdag) test_subdag2 >> Subdag1ConsolidateTask {code} test_subdag2.py {code:java} // code placeholder from airflow import DAG from airflow.operators.dummy_operator import DummyOperator import datetime from datetime import timedelta def TestSubDag2(parent_dag_name, child_dag_name, startDate): subdag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=None, start_date=startDate) TestTask = DummyOperator(task_id='TestTask', dag=subdag) Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate', dag=subdag) TestTask >> Subdag2ConsolidateTask {code} was: Unless I'm doing something incorrectly, it appears that you cannot nest SubDags which would be a very helpful feature. I've created a simple pipeline to demonstrate the failure case below: test_dag.py {code:java} from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator import datetime from datetime import timedelta from test_subdag1 import TestSubDag1 startDate = '2018-09-20' default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['em...@airflow.com'], 'start_date': datetime.datetime(2018, 3, 20, 9, 0), 'email_on_failure': False, 'email_on_retry': False, 'retries': 5, 'retry_delay': timedelta(seconds=30), 'run_as_user': 'airflow' } Test_DAG = DAG('Test_DAG', default_args=default_args, start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None, catchup=False) test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1', startDate), task_id='test_subdag1', dag=Test_DAG) TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate', dag=Test_DAG) test_subdag1 >> TestDagConsolidateTask {code} test_subdag1.py {code:java} from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator from test_subdag2 import TestSubDag2 import datetime from datetime import timedelta def TestSubDag1(parent_dag_name, child_dag_name, startDate): subdag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=None, start_date=startDate) test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name, child_dag_name), 'test_subdag2', startDate), task_id='test_subdag2', dag=subdag) Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate', dag=subdag) test_subdag2 >> Subdag1ConsolidateTask {code} test_subdag2.py {code:java} // code placeholder from airflow import DAG from airflow.operators.dummy_operator import DummyOperator import datetime from datetime import timedelta def TestSubDag2(parent_dag_name, child_dag_name, startDate): subdag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=None, start_date=startDate) TestTask = DummyOperator(task_id='TestTask', dag=subdag) Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate', dag=subdag) TestTask >> Subdag2ConsolidateTask {code} > Capability for nested SubDags > ----------------------------- > > Key: AIRFLOW-3097 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3097 > Project: Apache Airflow > Issue Type: New Feature > Components: subdag > Affects Versions: 1.8.0 > Reporter: John Longo > Priority: Critical > > Unless I'm doing something incorrectly, it appears that you cannot nest > SubDags which would be a very helpful feature. I've created a simple > pipeline to demonstrate the failure case below. It produces the following in > Airflow: Broken DAG: [/home/airflow/airflow/dags/test_dag.py] 'NoneType' > object has no attribute 'dag_id' > test_dag.py > {code:java} > from airflow import DAG > from airflow.operators.subdag_operator import SubDagOperator > import datetime > from datetime import timedelta > from test_subdag1 import TestSubDag1 > startDate = '2018-09-20' > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'email': ['em...@airflow.com'], > 'start_date': datetime.datetime(2018, 3, 20, 9, 0), > 'email_on_failure': False, > 'email_on_retry': False, > 'retries': 5, > 'retry_delay': timedelta(seconds=30), > 'run_as_user': 'airflow' > } > Test_DAG = DAG('Test_DAG', default_args=default_args, > start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None, > catchup=False) > test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1', > startDate), > task_id='test_subdag1', > dag=Test_DAG) > TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate', > dag=Test_DAG) > test_subdag1 >> TestDagConsolidateTask > {code} > test_subdag1.py > {code:java} > from airflow import DAG > from airflow.operators.subdag_operator import SubDagOperator > from airflow.operators.dummy_operator import DummyOperator > from test_subdag2 import TestSubDag2 > import datetime > from datetime import timedelta > def TestSubDag1(parent_dag_name, child_dag_name, startDate): > subdag = DAG( > '%s.%s' % (parent_dag_name, child_dag_name), > schedule_interval=None, > start_date=startDate) > test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name, > child_dag_name), 'test_subdag2', startDate), > task_id='test_subdag2', > dag=subdag) > Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate', > dag=subdag) > test_subdag2 >> Subdag1ConsolidateTask > {code} > > test_subdag2.py > {code:java} > // code placeholder > from airflow import DAG > from airflow.operators.dummy_operator import DummyOperator > import datetime > from datetime import timedelta > def TestSubDag2(parent_dag_name, child_dag_name, startDate): > subdag = DAG( > '%s.%s' % (parent_dag_name, child_dag_name), > schedule_interval=None, > start_date=startDate) > TestTask = DummyOperator(task_id='TestTask', dag=subdag) > Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate', > dag=subdag) > TestTask >> Subdag2ConsolidateTask > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)