This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 234690f5ae0 [v3-1-test] fix: `LatestOnlyOperator` not working if
direct upstream of Dynamic Task Map (#62287) (#63417)
234690f5ae0 is described below
commit 234690f5ae00ab266305a5cee0448c5ca027b3c2
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 12 11:20:51 2026 +0200
[v3-1-test] fix: `LatestOnlyOperator` not working if direct upstream of
Dynamic Task Map (#62287) (#63417)
* fix: use parent's map_index for XCom lookup in NotPreviouslySkippedDep
When an unmapped SkipMixin parent (e.g., LatestOnlyOperator) is upstream
of a dynamically mapped task, the child task must look up XCom using
map_index=-1 (the parent's XCom storage index) rather than the child's
own map_index. This fix ensures mapped downstream TIs are correctly
skipped when appropriate.
Fixes apache/airflow#62118
* fix: apply ruff formatting and import sorting
- Split long multi-argument function call across lines
- Fix import order: move XComModel import before providers
* ci: trigger CI re-run (transient docs build cache failure)
(cherry picked from commit 41d897fa8fc4538167b294ec5497adff37cc374b)
Co-authored-by: Stanislav Chernov <[email protected]>
---
.../ti_deps/deps/not_previously_skipped_dep.py | 9 +++-
.../deps/test_not_previously_skipped_dep.py | 55 +++++++++++++++++++++-
2 files changed, 62 insertions(+), 2 deletions(-)
diff --git
a/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
b/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
index 5238ca97d1c..907d9f02713 100644
--- a/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
@@ -58,8 +58,15 @@ class NotPreviouslySkippedDep(BaseTIDep):
# This can happen if the parent task has not yet run.
continue
+ # Use the parent's map context to look up the XCom. An
unmapped parent
+ # (e.g. LatestOnlyOperator) writes XCom with map_index=-1, so
we must
+ # query with -1 instead of the child's map_index.
+ xcom_map_index = ti.map_index if parent.is_mapped else -1
prev_result = ti.xcom_pull(
- task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY,
session=session, map_indexes=ti.map_index
+ task_ids=parent.task_id,
+ key=XCOM_SKIPMIXIN_KEY,
+ session=session,
+ map_indexes=xcom_map_index,
)
if prev_result is None:
diff --git
a/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
b/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
index d22dc6f1a58..35da4942899 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
@@ -21,10 +21,15 @@ import pendulum
import pytest
from airflow.models import DagRun, TaskInstance
+from airflow.models.xcom import XComModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import BranchPythonOperator
from airflow.ti_deps.dep_context import DepContext
-from airflow.ti_deps.deps.not_previously_skipped_dep import
NotPreviouslySkippedDep
+from airflow.ti_deps.deps.not_previously_skipped_dep import (
+ XCOM_SKIPMIXIN_FOLLOWED,
+ XCOM_SKIPMIXIN_KEY,
+ NotPreviouslySkippedDep,
+)
from airflow.utils.state import State
from airflow.utils.types import DagRunType
@@ -161,3 +166,51 @@ def test_parent_not_executed(session, dag_maker):
assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
assert dep.is_met(ti2, session)
assert ti2.state == State.NONE
+
+
+def test_unmapped_parent_skip_mapped_downstream(session, dag_maker):
+ """
+ When an unmapped SkipMixin parent writes XCom with map_index=-1,
+ mapped downstream TIs (map_index >= 0) should still be skipped
+ by NotPreviouslySkippedDep.
+
+ Regression test for https://github.com/apache/airflow/issues/62118
+ """
+ start_date = pendulum.datetime(2020, 1, 1)
+ with dag_maker(
+ "test_unmapped_skip_mapped_dag",
+ schedule=None,
+ start_date=start_date,
+ session=session,
+ ):
+ op1 = BranchPythonOperator(task_id="op1", python_callable=lambda:
"op3")
+ op2 = EmptyOperator(task_id="op2")
+ op3 = EmptyOperator(task_id="op3")
+ op1 >> [op2, op3]
+
+ dr = dag_maker.create_dagrun(run_type=DagRunType.MANUAL,
state=State.RUNNING)
+ tis = {ti.task_id: ti for ti in dr.task_instances}
+
+ # Simulate the unmapped branch operator having run: set it to SUCCESS
+ # and store XCom with map_index=-1 (as SkipMixin does for unmapped tasks).
+ tis["op1"].state = State.SUCCESS
+ session.merge(tis["op1"])
+ XComModel.set(
+ key=XCOM_SKIPMIXIN_KEY,
+ value={XCOM_SKIPMIXIN_FOLLOWED: ["op3"]},
+ dag_id=dr.dag_id,
+ task_id="op1",
+ run_id=dr.run_id,
+ map_index=-1,
+ session=session,
+ )
+
+ # Simulate a mapped downstream TI by changing map_index to 0.
+ tis["op2"].map_index = 0
+ session.merge(tis["op2"])
+ session.flush()
+
+ dep = NotPreviouslySkippedDep()
+ assert len(list(dep.get_dep_statuses(tis["op2"], session, DepContext())))
== 1
+ assert not dep.is_met(tis["op2"], session)
+ assert tis["op2"].state == State.SKIPPED