This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7ec3e92580a83235f91652692f6bc16951887e63
Author: Tamara Janina Fingerlin <90063506+tja...@users.noreply.github.com>
AuthorDate: Mon Oct 2 19:23:18 2023 +0200

    Mark tasks with `all_skipped` trigger rule as `skipped` if any task is in 
`upstream_failed` state (#34392)
    
    ---------
    
    Co-authored-by: Hussein Awala <huss...@awala.fr>
    (cherry picked from commit c3a8828ebaf6256ae1481ddb35945a64dd6cbb3a)
---
 airflow/ti_deps/deps/trigger_rule_dep.py    |  2 +-
 tests/ti_deps/deps/test_trigger_rule_dep.py | 24 ++++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow/ti_deps/deps/trigger_rule_dep.py
index 0359880538..7fbc540adf 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -378,7 +378,7 @@ class TriggerRuleDep(BaseTIDep):
                     if skipped:
                         new_state = TaskInstanceState.SKIPPED
                 elif trigger_rule == TR.ALL_SKIPPED:
-                    if success or failed:
+                    if success or failed or upstream_failed:
                         new_state = TaskInstanceState.SKIPPED
                 elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS:
                     if upstream_done and upstream_setup and skipped_setup >= 
upstream_setup:
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py 
b/tests/ti_deps/deps/test_trigger_rule_dep.py
index 9b7114a400..d7f8d3c42e 100644
--- a/tests/ti_deps/deps/test_trigger_rule_dep.py
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -808,6 +808,30 @@ class TestTriggerRuleDep:
         assert len(dep_statuses) == 1
         assert not dep_statuses[0].passed
 
+    def test_all_skipped_tr_failure_upstream_failed(self, session, 
get_task_instance):
+        """
+        All-skipped trigger rule failure if an upstream task is in a 
`upstream_failed` state
+        """
+        ti = get_task_instance(
+            TriggerRule.ALL_SKIPPED,
+            success=0,
+            skipped=0,
+            failed=0,
+            removed=0,
+            upstream_failed=1,
+            done=1,
+            normal_tasks=["FakeTaskID"],
+        )
+        dep_statuses = tuple(
+            TriggerRuleDep()._evaluate_trigger_rule(
+                ti=ti,
+                dep_context=DepContext(flag_upstream_failed=False),
+                session=session,
+            )
+        )
+        assert len(dep_statuses) == 1
+        assert not dep_statuses[0].passed
+
     @pytest.mark.parametrize("flag_upstream_failed", [True, False])
     def test_all_skipped_tr_success(self, session, get_task_instance, 
flag_upstream_failed):
         """

Reply via email to