This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5b7dbaa8058120b76bb39ab69e03748c04808d1d Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Sat May 28 09:06:07 2022 +0100 Add TaskInstance State 'REMOVED' to finished states and success states (#23797) Now that we support dynamic task mapping, we should have the 'REMOVED' state of task instances as a finished state because for dynamic tasks with a removed task instance, the dagrun would be stuck in running state if 'REMOVED' state is not in finished states. (cherry picked from commit 73446f28e9eb1e4c6f2f32c700147b61ab3da600) --- airflow/models/dagrun.py | 2 +- airflow/utils/state.py | 1 + tests/models/test_dagrun.py | 23 +++++++++++++++++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 3be82b9b6d..d9c4eeb726 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -542,7 +542,7 @@ class DagRun(Base, LoggingMixin): ) leaf_task_ids = {t.task_id for t in dag.leaves} - leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids] + leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids if ti.state != TaskInstanceState.REMOVED] # if all roots finished and at least one failed, the run failed if not unfinished_tis and any(leaf_ti.state in State.failed_states for leaf_ti in leaf_tis): diff --git a/airflow/utils/state.py b/airflow/utils/state.py index 8415dd1666..a79169f861 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -154,6 +154,7 @@ class State: TaskInstanceState.FAILED, TaskInstanceState.SKIPPED, TaskInstanceState.UPSTREAM_FAILED, + TaskInstanceState.REMOVED, ] ) """ diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index d45fd41370..6c3cc1c91c 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -198,6 +198,29 @@ class TestDagRun: dag_run.update_state() assert DagRunState.SUCCESS == dag_run.state + def test_dagrun_not_stuck_in_running_when_all_tasks_instances_are_removed(self, session): + """ + Tests that a DAG run succeeds when all tasks are removed + """ + dag = DAG(dag_id='test_dagrun_success_when_all_skipped', start_date=timezone.datetime(2017, 1, 1)) + dag_task1 = ShortCircuitOperator( + task_id='test_short_circuit_false', dag=dag, python_callable=lambda: False + ) + dag_task2 = EmptyOperator(task_id='test_state_skipped1', dag=dag) + dag_task3 = EmptyOperator(task_id='test_state_skipped2', dag=dag) + dag_task1.set_downstream(dag_task2) + dag_task2.set_downstream(dag_task3) + + initial_task_states = { + 'test_short_circuit_false': TaskInstanceState.REMOVED, + 'test_state_skipped1': TaskInstanceState.REMOVED, + 'test_state_skipped2': TaskInstanceState.REMOVED, + } + + dag_run = self.create_dag_run(dag=dag, task_states=initial_task_states, session=session) + dag_run.update_state() + assert DagRunState.SUCCESS == dag_run.state + def test_dagrun_success_conditions(self, session): dag = DAG('test_dagrun_success_conditions', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})