This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 41ebdf3e3b0 Revert #59691 due to broken trigger rules in mapped task
groups (#68418)
41ebdf3e3b0 is described below
commit 41ebdf3e3b0376f55c0302e279c5f17a7dedc545
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Jun 12 08:20:25 2026 +0200
Revert #59691 due to broken trigger rules in mapped task groups (#68418)
---
airflow-core/src/airflow/models/taskinstance.py | 56 +----------
.../tests/unit/models/test_taskinstance.py | 109 ---------------------
2 files changed, 1 insertion(+), 164 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index aeff2695f1b..740596f9d69 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -2421,19 +2421,7 @@ def _get_relevant_map_indexes(
# and "ti_count == ancestor_ti_count" does not work, since the further
# expansion may be of length 1.
if not _is_further_mapped_inside(relative, common_ancestor):
- # During mapped task group expansion, upstream placeholder task
instances
- # (map_index = -1) may already have been replaced by their first
expanded
- # successor (map_index = 0) while downstream task instances are still
- # unexpanded and continue resolving dependencies against the
placeholder index.
- resolved_map_index = (
- 0
- if _should_use_post_expansion_placeholder(
- task=task, relative=relative, map_index=ancestor_map_index,
run_id=run_id, session=session
- )
- else ancestor_map_index
- )
-
- return resolved_map_index
+ return ancestor_map_index
# Otherwise we need a partial aggregation for values from selected task
# instances in the ancestor's expansion context.
@@ -2507,48 +2495,6 @@ def find_relevant_relatives(
return visited
-def _should_use_post_expansion_placeholder(
- *,
- task: Operator,
- relative: Operator,
- map_index: int,
- run_id: str,
- session: Session,
-) -> bool:
- """
- Determine whether upstream dependency resolution should use map_index = 0.
-
- Returns True when the upstream placeholder task instance
- (map_index = -1) has already been replaced by its post-expansion
- successor (map_index = 0).
- """
- if map_index != -1:
- return False
-
- rows = session.execute(
- select(TaskInstance.task_id, TaskInstance.map_index).where(
- TaskInstance.dag_id == relative.dag_id,
- TaskInstance.run_id == run_id,
- TaskInstance.task_id.in_([task.task_id, relative.task_id]),
- TaskInstance.map_index.in_([-1, 0]),
- )
- ).all()
-
- task_to_map_indexes: dict[str, set[int]] = defaultdict(set)
- for task_id, mi in rows:
- task_to_map_indexes[task_id].add(mi)
-
- # We only rewrite when:
- # 1) the current task is still using the placeholder (-1)
- # 2) the upstream placeholder (-1) no longer exists
- # 3) the post-expansion placeholder (0) does exist
- return (
- -1 in task_to_map_indexes[task.task_id]
- and -1 not in task_to_map_indexes[relative.task_id]
- and 0 in task_to_map_indexes[relative.task_id]
- )
-
-
class TaskInstanceNote(Base):
"""For storage of arbitrary notes concerning the task instance."""
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index 987768da124..b9d8c85f613 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -3523,115 +3523,6 @@ def test_find_relevant_relatives(dag_maker, session,
normal_tasks, mapped_tasks,
assert result == expected
-def test_downstream_placeholder_handles_upstream_post_expansion(dag_maker,
session):
- """
- Test dynamic task mapping behavior when an upstream placeholder task
- (map_index = -1) has been replaced by the first expanded task
- (map_index = 0).
-
- This verifies that downstream mapped dependency resolution:
- - preserves placeholder behavior before upstream expansion
- - correctly resolves the post-expansion transition
- - preserves normal expanded task behavior afterwards
- """
-
- with dag_maker(session=session) as dag:
-
- @task
- def get_mapping_source():
- return ["one", "two", "three"]
-
- @task
- def mapped_task(x):
- output = f"{x}"
- return output
-
- @task_group(prefix_group_id=False)
- def the_task_group(x):
- start = MockOperator(task_id="start")
- upstream = mapped_task(x)
-
- # Downstream task inside the task group that does not directly
- # consume the expand input, but is still mapped via the mapped
- # task group context.
- downstream = MockOperator(task_id="downstream")
-
- start >> upstream >> downstream
-
- mapping_source = get_mapping_source()
- mapped_tg = the_task_group.expand(x=mapping_source)
-
- mapping_source >> mapped_tg
-
- # Create DAG run and execute prerequisites.
- dr = dag_maker.create_dagrun()
-
- dag_maker.run_ti("get_mapping_source", map_index=-1, dag_run=dr,
session=session)
-
- upstream_task = dag.get_task("mapped_task")
- downstream_task = dag.get_task("downstream")
-
- # Before upstream expansion occurs, mapped dependency resolution
- # should retain the existing placeholder semantics since no concrete
- # upstream/downstream map index pairing exists yet.
- downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1,
session=session)
- downstream_ti.refresh_from_task(downstream_task)
-
- result = downstream_ti.get_relevant_upstream_map_indexes(
- upstream=upstream_task,
- ti_count=1,
- session=session,
- )
-
- assert result == -3
-
- # Force expansion of the upstream mapped task.
- upstream_task = dag.get_task("mapped_task")
- _, max_index = TaskMap.expand_mapped_task(
- upstream_task,
- dr.run_id,
- session=session,
- )
- upstream_expanded_ti_count = max_index + 1
-
- downstream_task = dag.get_task("downstream")
-
- # Grab the downstream placeholder TI.
- downstream_ti = dr.get_task_instance(task_id="downstream", map_index=-1,
session=session)
- downstream_ti.refresh_from_task(downstream_task)
-
- result = downstream_ti.get_relevant_upstream_map_indexes(
- upstream=upstream_task,
- ti_count=upstream_expanded_ti_count,
- session=session,
- )
-
- assert result == 0
-
- # Now do the same for downstream expanded (map_index = 0) to ensure
existing behavior is not broken.
- # Force expansion of the downstream mapped task.
- _, max_index = TaskMap.expand_mapped_task(
- downstream_task,
- dr.run_id,
- session=session,
- )
- downstream_expanded_ti_count = max_index + 1
-
- # Grab the first expanded downstream task instance (map_index = 0).
- downstream_ti = dr.get_task_instance(task_id="downstream", map_index=0,
session=session)
- downstream_ti.refresh_from_task(downstream_task)
-
- result = downstream_ti.get_relevant_upstream_map_indexes(
- upstream=upstream_task,
- ti_count=downstream_expanded_ti_count,
- session=session,
- )
-
- # Verify behavior remains unchanged once the downstream task
- # itself has expanded.
- assert result == 0
-
-
def test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker,
session):
"""Test that specifying a non-mapped task as a tuple doesn't raise
NotMapped exception."""
# t1 -> t2 (non-mapped) -> t3