This is an automated email from the ASF dual-hosted git repository.

dabla pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a5ffa6c7949 Fix upstream map index resolution after placeholder 
expansion (#59691)
a5ffa6c7949 is described below

commit a5ffa6c7949e07b13fd410256b99e77621d53b7b
Author: SameerMesiah97 <[email protected]>
AuthorDate: Thu Jun 11 18:22:20 2026 +0100

    Fix upstream map index resolution after placeholder expansion (#59691)
    
    * Fix upstream map index resolution after placeholder expansion with unit 
test.
    
    * Rename placeholder map index helper to reflect boolean semantics,
    return bool instead of 0/None, and simplify the associated
    resolution logic and documentation.
    
    ---------
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 airflow-core/src/airflow/models/taskinstance.py    |  56 ++++++++++-
 .../tests/unit/models/test_taskinstance.py         | 109 +++++++++++++++++++++
 2 files changed, 164 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 740596f9d69..aeff2695f1b 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -2421,7 +2421,19 @@ def _get_relevant_map_indexes(
     # and "ti_count == ancestor_ti_count" does not work, since the further
     # expansion may be of length 1.
     if not _is_further_mapped_inside(relative, common_ancestor):
-        return ancestor_map_index
+        # During mapped task group expansion, upstream placeholder task 
instances
+        # (map_index = -1) may already have been replaced by their first 
expanded
+        # successor (map_index = 0) while downstream task instances are still
+        # unexpanded and continue resolving dependencies against the 
placeholder index.
+        resolved_map_index = (
+            0
+            if _should_use_post_expansion_placeholder(
+                task=task, relative=relative, map_index=ancestor_map_index, 
run_id=run_id, session=session
+            )
+            else ancestor_map_index
+        )
+
+        return resolved_map_index
 
     # Otherwise we need a partial aggregation for values from selected task
     # instances in the ancestor's expansion context.
@@ -2495,6 +2507,48 @@ def find_relevant_relatives(
     return visited
 
 
+def _should_use_post_expansion_placeholder(
+    *,
+    task: Operator,
+    relative: Operator,
+    map_index: int,
+    run_id: str,
+    session: Session,
+) -> bool:
+    """
+    Determine whether upstream dependency resolution should use map_index = 0.
+
+    Returns True when the upstream placeholder task instance
+    (map_index = -1) has already been replaced by its post-expansion
+    successor (map_index = 0).
+    """
+    if map_index != -1:
+        return False
+
+    rows = session.execute(
+        select(TaskInstance.task_id, TaskInstance.map_index).where(
+            TaskInstance.dag_id == relative.dag_id,
+            TaskInstance.run_id == run_id,
+            TaskInstance.task_id.in_([task.task_id, relative.task_id]),
+            TaskInstance.map_index.in_([-1, 0]),
+        )
+    ).all()
+
+    task_to_map_indexes: dict[str, set[int]] = defaultdict(set)
+    for task_id, mi in rows:
+        task_to_map_indexes[task_id].add(mi)
+
+    # We only rewrite when:
+    # 1) the current task is still using the placeholder (-1)
+    # 2) the upstream placeholder (-1) no longer exists
+    # 3) the post-expansion placeholder (0) does exist
+    return (
+        -1 in task_to_map_indexes[task.task_id]
+        and -1 not in task_to_map_indexes[relative.task_id]
+        and 0 in task_to_map_indexes[relative.task_id]
+    )
+
+
 class TaskInstanceNote(Base):
     """For storage of arbitrary notes concerning the task instance."""
 
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index b9d8c85f613..987768da124 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -3523,6 +3523,115 @@ def test_find_relevant_relatives(dag_maker, session, 
normal_tasks, mapped_tasks,
     assert result == expected
 
 
+def test_downstream_placeholder_handles_upstream_post_expansion(dag_maker, 
session):
+    """
+    Test dynamic task mapping behavior when an upstream placeholder task
+    (map_index = -1) has been replaced by the first expanded task
+    (map_index = 0).
+
+    This verifies that downstream mapped dependency resolution:
+    - preserves placeholder behavior before upstream expansion
+    - correctly resolves the post-expansion transition
+    - preserves normal expanded task behavior afterwards
+    """
+
+    with dag_maker(session=session) as dag:
+
+        @task
+        def get_mapping_source():
+            return ["one", "two", "three"]
+
+        @task
+        def mapped_task(x):
+            output = f"{x}"
+            return output
+
+        @task_group(prefix_group_id=False)
+        def the_task_group(x):
+            start = MockOperator(task_id="start")
+            upstream = mapped_task(x)
+
+            # Downstream task inside the task group that does not directly
+            # consume the expand input, but is still mapped via the mapped
+            # task group context.
+            downstream = MockOperator(task_id="downstream")
+
+            start >> upstream >> downstream
+
+        mapping_source = get_mapping_source()
+        mapped_tg = the_task_group.expand(x=mapping_source)
+
+        mapping_source >> mapped_tg
+
+    # Create DAG run and execute prerequisites.
+    dr = dag_maker.create_dagrun()
+
+    dag_maker.run_ti("get_mapping_source", map_index=-1, dag_run=dr, 
session=session)
+
+    upstream_task = dag.get_task("mapped_task")
+    downstream_task = dag.get_task("downstream")
+
+    # Before upstream expansion occurs, mapped dependency resolution
+    # should retain the existing placeholder semantics since no concrete
+    # upstream/downstream map index pairing exists yet.
+    downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1, 
session=session)
+    downstream_ti.refresh_from_task(downstream_task)
+
+    result = downstream_ti.get_relevant_upstream_map_indexes(
+        upstream=upstream_task,
+        ti_count=1,
+        session=session,
+    )
+
+    assert result == -3
+
+    # Force expansion of the upstream mapped task.
+    upstream_task = dag.get_task("mapped_task")
+    _, max_index = TaskMap.expand_mapped_task(
+        upstream_task,
+        dr.run_id,
+        session=session,
+    )
+    upstream_expanded_ti_count = max_index + 1
+
+    downstream_task = dag.get_task("downstream")
+
+    # Grab the downstream placeholder TI.
+    downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1, 
session=session)
+    downstream_ti.refresh_from_task(downstream_task)
+
+    result = downstream_ti.get_relevant_upstream_map_indexes(
+        upstream=upstream_task,
+        ti_count=upstream_expanded_ti_count,
+        session=session,
+    )
+
+    assert result == 0
+
+    # Now do the same for downstream expanded (map_index = 0) to ensure 
existing behavior is not broken.
+    # Force expansion of the downstream mapped task.
+    _, max_index = TaskMap.expand_mapped_task(
+        downstream_task,
+        dr.run_id,
+        session=session,
+    )
+    downstream_expanded_ti_count = max_index + 1
+
+    # Grab the first expanded downstream task instance (map_index = 0).
+    downstream_ti = dr.get_task_instance(task_id="downstream", map_index=0, 
session=session)
+    downstream_ti.refresh_from_task(downstream_task)
+
+    result = downstream_ti.get_relevant_upstream_map_indexes(
+        upstream=upstream_task,
+        ti_count=downstream_expanded_ti_count,
+        session=session,
+    )
+
+    # Verify behavior remains unchanged once the downstream task
+    # itself has expanded.
+    assert result == 0
+
+
 def test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, 
session):
     """Test that specifying a non-mapped task as a tuple doesn't raise 
NotMapped exception."""
     # t1 -> t2 (non-mapped) -> t3

Reply via email to