[ 
https://issues.apache.org/jira/browse/AIRFLOW-3097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Longo updated AIRFLOW-3097:
--------------------------------
    Description: 
Unless I'm doing something incorrectly, it appears that you cannot nest SubDags 
which would be a very helpful feature.  I've created a simple pipeline to 
demonstrate the failure case below.  It produces the following in Airflow:  
Broken DAG: [/home/airflow/airflow/dags/test_dag.py] 'NoneType' object has no 
attribute 'dag_id' 

test_dag.py
{code:java}
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
import datetime
from datetime import timedelta
from test_subdag1 import TestSubDag1

startDate = '2018-09-20'

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['em...@airflow.com'],
'start_date': datetime.datetime(2018, 3, 20, 9, 0),
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(seconds=30),
'run_as_user': 'airflow'
}

Test_DAG = DAG('Test_DAG', default_args=default_args, 
start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None, 
catchup=False)

test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1', 
startDate),
task_id='test_subdag1',
dag=Test_DAG)


TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate', 
dag=Test_DAG)

test_subdag1 >> TestDagConsolidateTask

{code}
test_subdag1.py
{code:java}
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from test_subdag2 import TestSubDag2
import datetime
from datetime import timedelta

def TestSubDag1(parent_dag_name, child_dag_name, startDate):
subdag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=None,
start_date=startDate)

test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name, 
child_dag_name), 'test_subdag2', startDate),
task_id='test_subdag2',
dag=subdag) 

Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate', 
dag=subdag)

test_subdag2 >> Subdag1ConsolidateTask

{code}
 

test_subdag2.py
{code:java}
// code placeholder
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
import datetime
from datetime import timedelta

def TestSubDag2(parent_dag_name, child_dag_name, startDate):
subdag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=None,
start_date=startDate)

TestTask = DummyOperator(task_id='TestTask', dag=subdag)
Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate', 
dag=subdag)

TestTask >> Subdag2ConsolidateTask
{code}

  was:
Unless I'm doing something incorrectly, it appears that you cannot nest SubDags 
which would be a very helpful feature.  I've created a simple pipeline to 
demonstrate the failure case below:

 

test_dag.py
{code:java}
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
import datetime
from datetime import timedelta
from test_subdag1 import TestSubDag1

startDate = '2018-09-20'

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['em...@airflow.com'],
'start_date': datetime.datetime(2018, 3, 20, 9, 0),
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(seconds=30),
'run_as_user': 'airflow'
}

Test_DAG = DAG('Test_DAG', default_args=default_args, 
start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None, 
catchup=False)

test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1', 
startDate),
task_id='test_subdag1',
dag=Test_DAG)


TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate', 
dag=Test_DAG)

test_subdag1 >> TestDagConsolidateTask

{code}
test_subdag1.py
{code:java}
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from test_subdag2 import TestSubDag2
import datetime
from datetime import timedelta

def TestSubDag1(parent_dag_name, child_dag_name, startDate):
subdag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=None,
start_date=startDate)

test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name, 
child_dag_name), 'test_subdag2', startDate),
task_id='test_subdag2',
dag=subdag) 

Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate', 
dag=subdag)

test_subdag2 >> Subdag1ConsolidateTask

{code}
 

test_subdag2.py
{code:java}
// code placeholder
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
import datetime
from datetime import timedelta

def TestSubDag2(parent_dag_name, child_dag_name, startDate):
subdag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=None,
start_date=startDate)

TestTask = DummyOperator(task_id='TestTask', dag=subdag)
Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate', 
dag=subdag)

TestTask >> Subdag2ConsolidateTask
{code}


> Capability for nested SubDags
> -----------------------------
>
>                 Key: AIRFLOW-3097
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3097
>             Project: Apache Airflow
>          Issue Type: New Feature
>          Components: subdag
>    Affects Versions: 1.8.0
>            Reporter: John Longo
>            Priority: Critical
>
> Unless I'm doing something incorrectly, it appears that you cannot nest 
> SubDags which would be a very helpful feature.  I've created a simple 
> pipeline to demonstrate the failure case below.  It produces the following in 
> Airflow:  Broken DAG: [/home/airflow/airflow/dags/test_dag.py] 'NoneType' 
> object has no attribute 'dag_id' 
> test_dag.py
> {code:java}
> from airflow import DAG
> from airflow.operators.subdag_operator import SubDagOperator
> import datetime
> from datetime import timedelta
> from test_subdag1 import TestSubDag1
> startDate = '2018-09-20'
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'email': ['em...@airflow.com'],
> 'start_date': datetime.datetime(2018, 3, 20, 9, 0),
> 'email_on_failure': False,
> 'email_on_retry': False,
> 'retries': 5,
> 'retry_delay': timedelta(seconds=30),
> 'run_as_user': 'airflow'
> }
> Test_DAG = DAG('Test_DAG', default_args=default_args, 
> start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None, 
> catchup=False)
> test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1', 
> startDate),
> task_id='test_subdag1',
> dag=Test_DAG)
> TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate', 
> dag=Test_DAG)
> test_subdag1 >> TestDagConsolidateTask
> {code}
> test_subdag1.py
> {code:java}
> from airflow import DAG
> from airflow.operators.subdag_operator import SubDagOperator
> from airflow.operators.dummy_operator import DummyOperator
> from test_subdag2 import TestSubDag2
> import datetime
> from datetime import timedelta
> def TestSubDag1(parent_dag_name, child_dag_name, startDate):
> subdag = DAG(
> '%s.%s' % (parent_dag_name, child_dag_name),
> schedule_interval=None,
> start_date=startDate)
> test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name, 
> child_dag_name), 'test_subdag2', startDate),
> task_id='test_subdag2',
> dag=subdag) 
> Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate', 
> dag=subdag)
> test_subdag2 >> Subdag1ConsolidateTask
> {code}
>  
> test_subdag2.py
> {code:java}
> // code placeholder
> from airflow import DAG
> from airflow.operators.dummy_operator import DummyOperator
> import datetime
> from datetime import timedelta
> def TestSubDag2(parent_dag_name, child_dag_name, startDate):
> subdag = DAG(
> '%s.%s' % (parent_dag_name, child_dag_name),
> schedule_interval=None,
> start_date=startDate)
> TestTask = DummyOperator(task_id='TestTask', dag=subdag)
> Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate', 
> dag=subdag)
> TestTask >> Subdag2ConsolidateTask
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to