This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8cbe5d9e09483f87b74339329c3c587d68200dd9 Author: Tamara Janina Fingerlin <90063506+tja...@users.noreply.github.com> AuthorDate: Mon Oct 2 19:23:18 2023 +0200 Mark tasks with `all_skipped` trigger rule as `skipped` if any task is in `upstream_failed` state (#34392) --------- Co-authored-by: Hussein Awala <huss...@awala.fr> (cherry picked from commit c3a8828ebaf6256ae1481ddb35945a64dd6cbb3a) --- airflow/ti_deps/deps/trigger_rule_dep.py | 2 +- tests/ti_deps/deps/test_trigger_rule_dep.py | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index 0359880538..7fbc540adf 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -378,7 +378,7 @@ class TriggerRuleDep(BaseTIDep): if skipped: new_state = TaskInstanceState.SKIPPED elif trigger_rule == TR.ALL_SKIPPED: - if success or failed: + if success or failed or upstream_failed: new_state = TaskInstanceState.SKIPPED elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS: if upstream_done and upstream_setup and skipped_setup >= upstream_setup: diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py index 9b7114a400..d7f8d3c42e 100644 --- a/tests/ti_deps/deps/test_trigger_rule_dep.py +++ b/tests/ti_deps/deps/test_trigger_rule_dep.py @@ -808,6 +808,30 @@ class TestTriggerRuleDep: assert len(dep_statuses) == 1 assert not dep_statuses[0].passed + def test_all_skipped_tr_failure_upstream_failed(self, session, get_task_instance): + """ + All-skipped trigger rule failure if an upstream task is in a `upstream_failed` state + """ + ti = get_task_instance( + TriggerRule.ALL_SKIPPED, + success=0, + skipped=0, + failed=0, + removed=0, + upstream_failed=1, + done=1, + normal_tasks=["FakeTaskID"], + ) + dep_statuses = tuple( + TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + dep_context=DepContext(flag_upstream_failed=False), + session=session, + ) + ) + assert len(dep_statuses) == 1 + assert not dep_statuses[0].passed + @pytest.mark.parametrize("flag_upstream_failed", [True, False]) def test_all_skipped_tr_success(self, session, get_task_instance, flag_upstream_failed): """