This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 16d95dd8879 Fix `none_failed_min_one_success` trigger rule checks
(#67873)
16d95dd8879 is described below
commit 16d95dd887964312013ab5fa34b225a5fe58f82c
Author: Aaron Chen <[email protected]>
AuthorDate: Mon Jun 8 11:05:35 2026 -0700
Fix `none_failed_min_one_success` trigger rule checks (#67873)
---
airflow-core/newsfragments/67873.bugfix.rst | 1 +
.../src/airflow/ti_deps/deps/trigger_rule_dep.py | 25 +++++++++-
.../unit/ti_deps/deps/test_trigger_rule_dep.py | 58 ++++++++++++++++++++--
3 files changed, 80 insertions(+), 4 deletions(-)
diff --git a/airflow-core/newsfragments/67873.bugfix.rst
b/airflow-core/newsfragments/67873.bugfix.rst
new file mode 100644
index 00000000000..5d691367860
--- /dev/null
+++ b/airflow-core/newsfragments/67873.bugfix.rst
@@ -0,0 +1 @@
+Fix ``none_failed_min_one_success`` trigger rule evaluation when no upstream
task succeeds, including mapped tasks whose upstream instances are all
``removed``.
diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index ba69f513152..9f9f9bd77aa 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -429,6 +429,8 @@ class TriggerRuleDep(BaseTIDep):
new_state = TaskInstanceState.UPSTREAM_FAILED
elif skipped == upstream:
new_state = TaskInstanceState.SKIPPED
+ elif upstream_done and success == 0:
+ new_state = TaskInstanceState.UPSTREAM_FAILED
elif trigger_rule == TR.NONE_SKIPPED:
if skipped:
new_state = TaskInstanceState.SKIPPED
@@ -536,7 +538,20 @@ class TriggerRuleDep(BaseTIDep):
f"upstream_task_ids={task.upstream_task_ids}"
)
)
- elif trigger_rule == TR.NONE_FAILED or trigger_rule ==
TR.NONE_FAILED_MIN_ONE_SUCCESS:
+ elif trigger_rule == TR.NONE_FAILED:
+ num_failures = upstream - success - skipped
+ if ti.map_index > -1:
+ num_failures -= removed
+ if num_failures > 0:
+ yield self._failing_status(
+ reason=(
+ f"Task's trigger rule '{trigger_rule_str}'
requires all upstream tasks to have "
+ f"succeeded or been skipped, but found
{num_failures} non-success(es). "
+ f"upstream_states={upstream_states}, "
+ f"upstream_task_ids={task.upstream_task_ids}"
+ )
+ )
+ elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
num_failures = upstream - success - skipped
if ti.map_index > -1:
num_failures -= removed
@@ -549,6 +564,14 @@ class TriggerRuleDep(BaseTIDep):
f"upstream_task_ids={task.upstream_task_ids}"
)
)
+ elif success <= 0:
+ yield self._failing_status(
+ reason=(
+ f"Task's trigger rule '{trigger_rule_str}'
requires at least one upstream task "
+ f"success, but none were found.
upstream_states={upstream_states}, "
+ f"upstream_task_ids={task.upstream_task_ids}"
+ )
+ )
elif trigger_rule == TR.NONE_SKIPPED:
if not upstream_done or (skipped > 0):
yield self._failing_status(
diff --git a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
index f0820eb5b46..bd4a576f9f1 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
@@ -620,12 +620,18 @@ class TestTriggerRuleDep:
)
_test_trigger_rule(ti=ti, session=session,
flag_upstream_failed=flag_upstream_failed)
- @pytest.mark.parametrize(("flag_upstream_failed", "expected_ti_state"),
[(True, SKIPPED), (False, None)])
+ @pytest.mark.parametrize(
+ ("flag_upstream_failed", "expected_ti_state", "expected_reason"),
+ [
+ (True, SKIPPED, "requires at least one upstream task success"),
+ (False, None, "requires at least one upstream task success"),
+ ],
+ )
def test_none_failed_min_one_success_tr_skipped(
- self, session, get_task_instance, flag_upstream_failed,
expected_ti_state
+ self, session, get_task_instance, flag_upstream_failed,
expected_ti_state, expected_reason
):
"""
- None failed min one success trigger rule success with all skipped
+ None failed min one success trigger rule with all skipped upstreams
"""
ti = get_task_instance(
TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
@@ -642,6 +648,7 @@ class TestTriggerRuleDep:
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_ti_state=expected_ti_state,
+ expected_reason=expected_reason,
)
@pytest.mark.parametrize(
@@ -1509,6 +1516,51 @@ class TestTriggerRuleDep:
_test_trigger_rule(ti=ti, session=session,
flag_upstream_failed=flag_upstream_failed)
+ @pytest.mark.flaky(reruns=5)
+ @pytest.mark.parametrize(
+ ("flag_upstream_failed", "expected_ti_state"),
+ [(True, UPSTREAM_FAILED), (False, None)],
+ )
+ def
test_mapped_task_upstream_all_removed_with_none_failed_min_one_success_trigger_rule(
+ self,
+ monkeypatch,
+ session,
+ get_mapped_task_dagrun,
+ flag_upstream_failed,
+ expected_ti_state,
+ ):
+ """
+ Test NONE_FAILED_MIN_ONE_SUCCESS trigger rule with all mapped upstream
tasks removed.
+ """
+ dr, task, _ = get_mapped_task_dagrun(
+ trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+ state=REMOVED,
+ )
+
+ # ti with removed upstream ti
+ ti = dr.get_task_instance(task_id="do_something_else", map_index=3,
session=session)
+ ti.task = task
+
+ upstream_states = _UpstreamTIStates(
+ success=0,
+ skipped=0,
+ failed=0,
+ removed=5,
+ upstream_failed=0,
+ done=5,
+ skipped_setup=0,
+ success_setup=0,
+ )
+ monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_:
upstream_states)
+
+ _test_trigger_rule(
+ ti=ti,
+ session=session,
+ flag_upstream_failed=flag_upstream_failed,
+ expected_reason="requires at least one upstream task success",
+ expected_ti_state=expected_ti_state,
+ )
+
def test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, session):
from airflow.sdk import task, task_group