This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f4cc5c50f350801f43fc17152605d45cc169b452 Author: Ephraim Anierobi <[email protected]> AuthorDate: Thu Mar 18 11:38:52 2021 +0100 Fix running child tasks in a subdag after clearing a successful subdag (#14776) After successfully running a SUBDAG, clearing it (including downstream+recursive) doesn't trigger the inner tasks. Instead, the subdag is marked successful and the inner tasks all stay cleared and aren't re-run. The above problem is because the DagRun state of the subdags are not updated after clearing. This PR solves it by updating the DagRun state of all DAGs including subdags when include_subdags is True (cherry picked from commit 052163516bf91ab7bb53f4ec3c7b5621df515820) --- airflow/models/dag.py | 10 +++++++-- tests/models/test_dag.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 8bb32db..d77cdfc 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1116,13 +1116,15 @@ class DAG(LoggingMixin): session: Session = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, + dag_ids: List[str] = None, ) -> None: - query = session.query(DagRun).filter_by(dag_id=self.dag_id) + dag_ids = dag_ids or [self.dag_id] + query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)) if start_date: query = query.filter(DagRun.execution_date >= start_date) if end_date: query = query.filter(DagRun.execution_date <= end_date) - query.update({DagRun.state: state}) + query.update({DagRun.state: state}, synchronize_session='fetch') @provide_session def clear( @@ -1183,11 +1185,13 @@ class DAG(LoggingMixin): """ TI = TaskInstance tis = session.query(TI) + dag_ids = [] if include_subdags: # Crafting the right filter for dag_id and task_ids combo conditions = [] for dag in self.subdags + [self]: conditions.append((TI.dag_id == dag.dag_id) & TI.task_id.in_(dag.task_ids)) + dag_ids.append(dag.dag_id) tis = tis.filter(or_(*conditions)) else: tis = session.query(TI).filter(TI.dag_id == self.dag_id) @@ -1327,11 +1331,13 @@ class DAG(LoggingMixin): dag=self, activate_dag_runs=False, # We will set DagRun state later. ) + self.set_dag_runs_state( session=session, start_date=start_date, end_date=end_date, state=dag_run_state, + dag_ids=dag_ids, ) else: count = 0 diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 60171d8..c923241 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1299,6 +1299,61 @@ class TestDag(unittest.TestCase): assert dagrun.state == dag_run_state @parameterized.expand( + [ + (State.NONE,), + (State.RUNNING,), + ] + ) + def test_clear_set_dagrun_state_for_subdag(self, dag_run_state): + dag_id = 'test_clear_set_dagrun_state_subdag' + self._clean_up(dag_id) + task_id = 't1' + dag = DAG(dag_id, start_date=DEFAULT_DATE, max_active_runs=1) + t_1 = DummyOperator(task_id=task_id, dag=dag) + subdag = DAG(dag_id + '.test', start_date=DEFAULT_DATE, max_active_runs=1) + SubDagOperator(task_id='test', subdag=subdag, dag=dag) + t_2 = DummyOperator(task_id='task', dag=subdag) + + session = settings.Session() + dagrun_1 = dag.create_dagrun( + run_type=DagRunType.BACKFILL_JOB, + state=State.FAILED, + start_date=DEFAULT_DATE, + execution_date=DEFAULT_DATE, + ) + dagrun_2 = subdag.create_dagrun( + run_type=DagRunType.BACKFILL_JOB, + state=State.FAILED, + start_date=DEFAULT_DATE, + execution_date=DEFAULT_DATE, + ) + session.merge(dagrun_1) + session.merge(dagrun_2) + task_instance_1 = TI(t_1, execution_date=DEFAULT_DATE, state=State.RUNNING) + task_instance_2 = TI(t_2, execution_date=DEFAULT_DATE, state=State.RUNNING) + session.merge(task_instance_1) + session.merge(task_instance_2) + session.commit() + + dag.clear( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=1), + dag_run_state=dag_run_state, + include_subdags=True, + include_parentdag=False, + session=session, + ) + + dagrun = ( + session.query( + DagRun, + ) + .filter(DagRun.dag_id == subdag.dag_id) + .one() + ) + assert dagrun.state == dag_run_state + + @parameterized.expand( [(state, State.NONE) for state in State.task_states if state != State.RUNNING] + [(State.RUNNING, State.SHUTDOWN)] ) # type: ignore
