This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 16d95dd8879 Fix `none_failed_min_one_success` trigger rule checks 
(#67873)
16d95dd8879 is described below

commit 16d95dd887964312013ab5fa34b225a5fe58f82c
Author: Aaron Chen <[email protected]>
AuthorDate: Mon Jun 8 11:05:35 2026 -0700

    Fix `none_failed_min_one_success` trigger rule checks (#67873)
---
 airflow-core/newsfragments/67873.bugfix.rst        |  1 +
 .../src/airflow/ti_deps/deps/trigger_rule_dep.py   | 25 +++++++++-
 .../unit/ti_deps/deps/test_trigger_rule_dep.py     | 58 ++++++++++++++++++++--
 3 files changed, 80 insertions(+), 4 deletions(-)

diff --git a/airflow-core/newsfragments/67873.bugfix.rst 
b/airflow-core/newsfragments/67873.bugfix.rst
new file mode 100644
index 00000000000..5d691367860
--- /dev/null
+++ b/airflow-core/newsfragments/67873.bugfix.rst
@@ -0,0 +1 @@
+Fix ``none_failed_min_one_success`` trigger rule evaluation when no upstream 
task succeeds, including mapped tasks whose upstream instances are all 
``removed``.
diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index ba69f513152..9f9f9bd77aa 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -429,6 +429,8 @@ class TriggerRuleDep(BaseTIDep):
                         new_state = TaskInstanceState.UPSTREAM_FAILED
                     elif skipped == upstream:
                         new_state = TaskInstanceState.SKIPPED
+                    elif upstream_done and success == 0:
+                        new_state = TaskInstanceState.UPSTREAM_FAILED
                 elif trigger_rule == TR.NONE_SKIPPED:
                     if skipped:
                         new_state = TaskInstanceState.SKIPPED
@@ -536,7 +538,20 @@ class TriggerRuleDep(BaseTIDep):
                             f"upstream_task_ids={task.upstream_task_ids}"
                         )
                     )
-            elif trigger_rule == TR.NONE_FAILED or trigger_rule == 
TR.NONE_FAILED_MIN_ONE_SUCCESS:
+            elif trigger_rule == TR.NONE_FAILED:
+                num_failures = upstream - success - skipped
+                if ti.map_index > -1:
+                    num_failures -= removed
+                if num_failures > 0:
+                    yield self._failing_status(
+                        reason=(
+                            f"Task's trigger rule '{trigger_rule_str}' 
requires all upstream tasks to have "
+                            f"succeeded or been skipped, but found 
{num_failures} non-success(es). "
+                            f"upstream_states={upstream_states}, "
+                            f"upstream_task_ids={task.upstream_task_ids}"
+                        )
+                    )
+            elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
                 num_failures = upstream - success - skipped
                 if ti.map_index > -1:
                     num_failures -= removed
@@ -549,6 +564,14 @@ class TriggerRuleDep(BaseTIDep):
                             f"upstream_task_ids={task.upstream_task_ids}"
                         )
                     )
+                elif success <= 0:
+                    yield self._failing_status(
+                        reason=(
+                            f"Task's trigger rule '{trigger_rule_str}' 
requires at least one upstream task "
+                            f"success, but none were found. 
upstream_states={upstream_states}, "
+                            f"upstream_task_ids={task.upstream_task_ids}"
+                        )
+                    )
             elif trigger_rule == TR.NONE_SKIPPED:
                 if not upstream_done or (skipped > 0):
                     yield self._failing_status(
diff --git a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py 
b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
index f0820eb5b46..bd4a576f9f1 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
@@ -620,12 +620,18 @@ class TestTriggerRuleDep:
         )
         _test_trigger_rule(ti=ti, session=session, 
flag_upstream_failed=flag_upstream_failed)
 
-    @pytest.mark.parametrize(("flag_upstream_failed", "expected_ti_state"), 
[(True, SKIPPED), (False, None)])
+    @pytest.mark.parametrize(
+        ("flag_upstream_failed", "expected_ti_state", "expected_reason"),
+        [
+            (True, SKIPPED, "requires at least one upstream task success"),
+            (False, None, "requires at least one upstream task success"),
+        ],
+    )
     def test_none_failed_min_one_success_tr_skipped(
-        self, session, get_task_instance, flag_upstream_failed, 
expected_ti_state
+        self, session, get_task_instance, flag_upstream_failed, 
expected_ti_state, expected_reason
     ):
         """
-        None failed min one success trigger rule success with all skipped
+        None failed min one success trigger rule with all skipped upstreams
         """
         ti = get_task_instance(
             TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
@@ -642,6 +648,7 @@ class TestTriggerRuleDep:
             session=session,
             flag_upstream_failed=flag_upstream_failed,
             expected_ti_state=expected_ti_state,
+            expected_reason=expected_reason,
         )
 
     @pytest.mark.parametrize(
@@ -1509,6 +1516,51 @@ class TestTriggerRuleDep:
 
         _test_trigger_rule(ti=ti, session=session, 
flag_upstream_failed=flag_upstream_failed)
 
+    @pytest.mark.flaky(reruns=5)
+    @pytest.mark.parametrize(
+        ("flag_upstream_failed", "expected_ti_state"),
+        [(True, UPSTREAM_FAILED), (False, None)],
+    )
+    def 
test_mapped_task_upstream_all_removed_with_none_failed_min_one_success_trigger_rule(
+        self,
+        monkeypatch,
+        session,
+        get_mapped_task_dagrun,
+        flag_upstream_failed,
+        expected_ti_state,
+    ):
+        """
+        Test NONE_FAILED_MIN_ONE_SUCCESS trigger rule with all mapped upstream 
tasks removed.
+        """
+        dr, task, _ = get_mapped_task_dagrun(
+            trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+            state=REMOVED,
+        )
+
+        # ti with removed upstream ti
+        ti = dr.get_task_instance(task_id="do_something_else", map_index=3, 
session=session)
+        ti.task = task
+
+        upstream_states = _UpstreamTIStates(
+            success=0,
+            skipped=0,
+            failed=0,
+            removed=5,
+            upstream_failed=0,
+            done=5,
+            skipped_setup=0,
+            success_setup=0,
+        )
+        monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_: 
upstream_states)
+
+        _test_trigger_rule(
+            ti=ti,
+            session=session,
+            flag_upstream_failed=flag_upstream_failed,
+            expected_reason="requires at least one upstream task success",
+            expected_ti_state=expected_ti_state,
+        )
+
 
 def test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, session):
     from airflow.sdk import task, task_group

Reply via email to