This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 234690f5ae0 [v3-1-test] fix: `LatestOnlyOperator` not working if 
direct upstream of Dynamic Task Map (#62287) (#63417)
234690f5ae0 is described below

commit 234690f5ae00ab266305a5cee0448c5ca027b3c2
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 12 11:20:51 2026 +0200

    [v3-1-test] fix: `LatestOnlyOperator` not working if direct upstream of 
Dynamic Task Map (#62287) (#63417)
    
    * fix: use parent's map_index for XCom lookup in NotPreviouslySkippedDep
    When an unmapped SkipMixin parent (e.g., LatestOnlyOperator) is upstream
    of a dynamically mapped task, the child task must look up XCom using
    map_index=-1 (the parent's XCom storage index) rather than the child's
    own map_index. This fix ensures mapped downstream TIs are correctly
    skipped when appropriate.
    Fixes apache/airflow#62118
    
    * fix: apply ruff formatting and import sorting
    
    - Split long multi-argument function call across lines
    - Fix import order: move XComModel import before providers
    
    * ci: trigger CI re-run (transient docs build cache failure)
    (cherry picked from commit 41d897fa8fc4538167b294ec5497adff37cc374b)
    
    Co-authored-by: Stanislav Chernov <[email protected]>
---
 .../ti_deps/deps/not_previously_skipped_dep.py     |  9 +++-
 .../deps/test_not_previously_skipped_dep.py        | 55 +++++++++++++++++++++-
 2 files changed, 62 insertions(+), 2 deletions(-)

diff --git 
a/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
index 5238ca97d1c..907d9f02713 100644
--- a/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
@@ -58,8 +58,15 @@ class NotPreviouslySkippedDep(BaseTIDep):
                     # This can happen if the parent task has not yet run.
                     continue
 
+                # Use the parent's map context to look up the XCom. An 
unmapped parent
+                # (e.g. LatestOnlyOperator) writes XCom with map_index=-1, so 
we must
+                # query with -1 instead of the child's map_index.
+                xcom_map_index = ti.map_index if parent.is_mapped else -1
                 prev_result = ti.xcom_pull(
-                    task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY, 
session=session, map_indexes=ti.map_index
+                    task_ids=parent.task_id,
+                    key=XCOM_SKIPMIXIN_KEY,
+                    session=session,
+                    map_indexes=xcom_map_index,
                 )
 
                 if prev_result is None:
diff --git 
a/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py 
b/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
index d22dc6f1a58..35da4942899 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
@@ -21,10 +21,15 @@ import pendulum
 import pytest
 
 from airflow.models import DagRun, TaskInstance
+from airflow.models.xcom import XComModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import BranchPythonOperator
 from airflow.ti_deps.dep_context import DepContext
-from airflow.ti_deps.deps.not_previously_skipped_dep import 
NotPreviouslySkippedDep
+from airflow.ti_deps.deps.not_previously_skipped_dep import (
+    XCOM_SKIPMIXIN_FOLLOWED,
+    XCOM_SKIPMIXIN_KEY,
+    NotPreviouslySkippedDep,
+)
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 
@@ -161,3 +166,51 @@ def test_parent_not_executed(session, dag_maker):
     assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
     assert dep.is_met(ti2, session)
     assert ti2.state == State.NONE
+
+
+def test_unmapped_parent_skip_mapped_downstream(session, dag_maker):
+    """
+    When an unmapped SkipMixin parent writes XCom with map_index=-1,
+    mapped downstream TIs (map_index >= 0) should still be skipped
+    by NotPreviouslySkippedDep.
+
+    Regression test for https://github.com/apache/airflow/issues/62118
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    with dag_maker(
+        "test_unmapped_skip_mapped_dag",
+        schedule=None,
+        start_date=start_date,
+        session=session,
+    ):
+        op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: 
"op3")
+        op2 = EmptyOperator(task_id="op2")
+        op3 = EmptyOperator(task_id="op3")
+        op1 >> [op2, op3]
+
+    dr = dag_maker.create_dagrun(run_type=DagRunType.MANUAL, 
state=State.RUNNING)
+    tis = {ti.task_id: ti for ti in dr.task_instances}
+
+    # Simulate the unmapped branch operator having run: set it to SUCCESS
+    # and store XCom with map_index=-1 (as SkipMixin does for unmapped tasks).
+    tis["op1"].state = State.SUCCESS
+    session.merge(tis["op1"])
+    XComModel.set(
+        key=XCOM_SKIPMIXIN_KEY,
+        value={XCOM_SKIPMIXIN_FOLLOWED: ["op3"]},
+        dag_id=dr.dag_id,
+        task_id="op1",
+        run_id=dr.run_id,
+        map_index=-1,
+        session=session,
+    )
+
+    # Simulate a mapped downstream TI by changing map_index to 0.
+    tis["op2"].map_index = 0
+    session.merge(tis["op2"])
+    session.flush()
+
+    dep = NotPreviouslySkippedDep()
+    assert len(list(dep.get_dep_statuses(tis["op2"], session, DepContext()))) 
== 1
+    assert not dep.is_met(tis["op2"], session)
+    assert tis["op2"].state == State.SKIPPED

Reply via email to