[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.

This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.

Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags

(cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2bebeaf9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2bebeaf9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2bebeaf9

Branch: refs/heads/v1-8-stable
Commit: 2bebeaf9554d35710de6eb1b4006157e105ac79b
Parents: 68b1c98
Author: Joe Schmid <jsch...@symphonyrm.com>
Authored: Tue Apr 4 08:27:45 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 08:28:07 2017 +0200

----------------------------------------------------------------------
 airflow/jobs.py   |  7 +++++--
 airflow/models.py |  1 +
 tests/jobs.py     | 28 ++++++++++++++++++++++++++++
 3 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 222d9ba..7db9b9c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob):
 
         # consider max_active_runs but ignore when running subdags
         # "parent.child" as a dag_id is by convention a subdag
-        if self.dag.schedule_interval and "." not in self.dag.dag_id:
+        if self.dag.schedule_interval and not self.dag.is_subdag:
             active_runs = DagRun.find(
                 dag_id=self.dag.dag_id,
                 state=State.RUNNING,
@@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob):
 
         # create dag runs
         dr_start_date = start_date or min([t.start_date for t in 
self.dag.tasks])
-        next_run_date = self.dag.normalize_schedule(dr_start_date)
         end_date = end_date or datetime.now()
+        # next run date for a subdag isn't relevant (schedule_interval for 
subdags
+        # is ignored) so we use the dag run's start date in the case of a 
subdag
+        next_run_date = (self.dag.normalize_schedule(dr_start_date)
+                         if not self.dag.is_subdag else dr_start_date)
 
         active_dag_runs = []
         while next_run_date and next_run_date <= end_date:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index bdda701..fdff54e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2682,6 +2682,7 @@ class DAG(BaseDag, LoggingMixin):
         self.sla_miss_callback = sla_miss_callback
         self.orientation = orientation
         self.catchup = catchup
+        self.is_subdag = False  # DagBag.bag_dag() will set this to True if 
appropriate
 
         self.partial = False
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index aee0e9c..f9ede68 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase):
             else:
                 self.assertEqual(State.NONE, ti.state)
 
+    def test_backfill_execute_subdag(self):
+        dag = self.dagbag.get_dag('example_subdag_operator')
+        subdag_op_task = dag.get_task('section-1')
+
+        subdag = subdag_op_task.subdag
+        subdag.schedule_interval = '@daily'
+
+        start_date = datetime.datetime.now()
+        executor = TestExecutor(do_update=True)
+        job = BackfillJob(dag=subdag,
+                          start_date=start_date,
+                          end_date=start_date,
+                          executor=executor,
+                          donot_pickle=True)
+        job.run()
+
+        history = executor.history
+        subdag_history = history[0]
+
+        # check that all 5 task instances of the subdag 'section-1' were 
executed
+        self.assertEqual(5, len(subdag_history))
+        for sdh in subdag_history:
+            ti = sdh[3]
+            self.assertIn('section-1-task-', ti.task_id)
+
+        subdag.clear()
+        dag.clear()
+
 
 class LocalTaskJobTest(unittest.TestCase):
     def setUp(self):

Reply via email to