uranusjr commented on code in PR #59691:
URL: https://github.com/apache/airflow/pull/59691#discussion_r3257221707


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -2367,6 +2375,54 @@ def _visit_relevant_relatives_for_mapped(mapped_tasks: 
Iterable[tuple[str, int]]
     return visited
 
 
+def _resolve_placeholder_map_index(
+    *,
+    task: Operator,
+    relative: Operator,
+    map_index: int,
+    run_id: str,
+    session: Session,
+) -> int | None:
+    """
+    Resolve the correct map_index for upstream dependency evaluation.
+
+    This handles the transition from map_index = -1 (pre-expansion placeholder)
+    to map_index = 0 (post-expansion placeholder successor).
+
+    Returns:
+        - 0 if the placeholder has transitioned from -1 to 0
+        - None if no override should be applied
+    """
+    if map_index != -1:
+        return None
+
+    rows = session.execute(
+        select(TaskInstance.task_id, TaskInstance.map_index).where(
+            TaskInstance.dag_id == relative.dag_id,
+            TaskInstance.run_id == run_id,
+            TaskInstance.task_id.in_([task.task_id, relative.task_id]),
+            TaskInstance.map_index.in_([-1, 0]),
+        )
+    ).all()
+
+    task_to_map_indexes: dict[str, list[int]] = defaultdict(list)

Review Comment:
   This should use a set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to