This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 73446f28e9 Add TaskInstance State 'REMOVED' to finished states and 
success states (#23797)
73446f28e9 is described below

commit 73446f28e9eb1e4c6f2f32c700147b61ab3da600
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Sat May 28 09:06:07 2022 +0100

    Add TaskInstance State 'REMOVED' to finished states and success states 
(#23797)
    
    Now that we support dynamic task mapping, we should have the 'REMOVED'
    state of task instances as a finished state because
    for dynamic tasks with a removed task instance, the dagrun would be stuck in
    running state if 'REMOVED' state is not in finished states.
---
 airflow/models/dagrun.py    |  2 +-
 airflow/utils/state.py      |  1 +
 tests/models/test_dagrun.py | 23 +++++++++++++++++++++++
 3 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 58974c39be..eeec4d5b09 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -540,7 +540,7 @@ class DagRun(Base, LoggingMixin):
                 )
 
         leaf_task_ids = {t.task_id for t in dag.leaves}
-        leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids]
+        leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids if 
ti.state != TaskInstanceState.REMOVED]
 
         # if all roots finished and at least one failed, the run failed
         if not unfinished_tis and any(leaf_ti.state in State.failed_states for 
leaf_ti in leaf_tis):
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index 8415dd1666..a79169f861 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -154,6 +154,7 @@ class State:
             TaskInstanceState.FAILED,
             TaskInstanceState.SKIPPED,
             TaskInstanceState.UPSTREAM_FAILED,
+            TaskInstanceState.REMOVED,
         ]
     )
     """
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index f73f5d1c45..14f4b7f34b 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -196,6 +196,29 @@ class TestDagRun:
         dag_run.update_state()
         assert DagRunState.SUCCESS == dag_run.state
 
+    def 
test_dagrun_not_stuck_in_running_when_all_tasks_instances_are_removed(self, 
session):
+        """
+        Tests that a DAG run succeeds when all tasks are removed
+        """
+        dag = DAG(dag_id='test_dagrun_success_when_all_skipped', 
start_date=timezone.datetime(2017, 1, 1))
+        dag_task1 = ShortCircuitOperator(
+            task_id='test_short_circuit_false', dag=dag, 
python_callable=lambda: False
+        )
+        dag_task2 = EmptyOperator(task_id='test_state_skipped1', dag=dag)
+        dag_task3 = EmptyOperator(task_id='test_state_skipped2', dag=dag)
+        dag_task1.set_downstream(dag_task2)
+        dag_task2.set_downstream(dag_task3)
+
+        initial_task_states = {
+            'test_short_circuit_false': TaskInstanceState.REMOVED,
+            'test_state_skipped1': TaskInstanceState.REMOVED,
+            'test_state_skipped2': TaskInstanceState.REMOVED,
+        }
+
+        dag_run = self.create_dag_run(dag=dag, 
task_states=initial_task_states, session=session)
+        dag_run.update_state()
+        assert DagRunState.SUCCESS == dag_run.state
+
     def test_dagrun_success_conditions(self, session):
         dag = DAG('test_dagrun_success_conditions', start_date=DEFAULT_DATE, 
default_args={'owner': 'owner1'})
 

Reply via email to