[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs BackfillJob._execute() checks that the next run date is less than or equal to the end date before creating a DAG run and task instances. For SubDAGs, the next run date is not relevant, i.e. schedule_interval can be anything other than None or '@once' and should be ignored. However, current code calculates the next run date for a SubDAG and the condition check mentioned above always fails for SubDAG triggered manually.
This change adds a simple check to determine if this is a SubDAG and, if so, sets next run date to DAG run's start date. Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug- backfill-execute-for-subdags (cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc) Signed-off-by: Bolke de Bruin <bo...@xs4all.nl> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2bebeaf9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2bebeaf9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2bebeaf9 Branch: refs/heads/v1-8-stable Commit: 2bebeaf9554d35710de6eb1b4006157e105ac79b Parents: 68b1c98 Author: Joe Schmid <jsch...@symphonyrm.com> Authored: Tue Apr 4 08:27:45 2017 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Tue Apr 4 08:28:07 2017 +0200 ---------------------------------------------------------------------- airflow/jobs.py | 7 +++++-- airflow/models.py | 1 + tests/jobs.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 222d9ba..7db9b9c 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob): # consider max_active_runs but ignore when running subdags # "parent.child" as a dag_id is by convention a subdag - if self.dag.schedule_interval and "." not in self.dag.dag_id: + if self.dag.schedule_interval and not self.dag.is_subdag: active_runs = DagRun.find( dag_id=self.dag.dag_id, state=State.RUNNING, @@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob): # create dag runs dr_start_date = start_date or min([t.start_date for t in self.dag.tasks]) - next_run_date = self.dag.normalize_schedule(dr_start_date) end_date = end_date or datetime.now() + # next run date for a subdag isn't relevant (schedule_interval for subdags + # is ignored) so we use the dag run's start date in the case of a subdag + next_run_date = (self.dag.normalize_schedule(dr_start_date) + if not self.dag.is_subdag else dr_start_date) active_dag_runs = [] while next_run_date and next_run_date <= end_date: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index bdda701..fdff54e 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2682,6 +2682,7 @@ class DAG(BaseDag, LoggingMixin): self.sla_miss_callback = sla_miss_callback self.orientation = orientation self.catchup = catchup + self.is_subdag = False # DagBag.bag_dag() will set this to True if appropriate self.partial = False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index aee0e9c..f9ede68 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase): else: self.assertEqual(State.NONE, ti.state) + def test_backfill_execute_subdag(self): + dag = self.dagbag.get_dag('example_subdag_operator') + subdag_op_task = dag.get_task('section-1') + + subdag = subdag_op_task.subdag + subdag.schedule_interval = '@daily' + + start_date = datetime.datetime.now() + executor = TestExecutor(do_update=True) + job = BackfillJob(dag=subdag, + start_date=start_date, + end_date=start_date, + executor=executor, + donot_pickle=True) + job.run() + + history = executor.history + subdag_history = history[0] + + # check that all 5 task instances of the subdag 'section-1' were executed + self.assertEqual(5, len(subdag_history)) + for sdh in subdag_history: + ti = sdh[3] + self.assertIn('section-1-task-', ti.task_id) + + subdag.clear() + dag.clear() + class LocalTaskJobTest(unittest.TestCase): def setUp(self):