Dev-iL commented on code in PR #61975:
URL: https://github.com/apache/airflow/pull/61975#discussion_r2810929613
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -921,6 +926,98 @@ def _iter_breadcrumbs() -> Iterator[dict[str, Any]]:
return TaskBreadcrumbsResponse(breadcrumbs=_iter_breadcrumbs())
+def _populate_task_group_map_index_context(
+ context: TIRunContext,
+ dag_id: str,
+ task_id: str,
+ map_index: int,
+ run_id: str,
+ session: SessionDep,
+ dag_bag: DagBagDep,
+) -> None:
+ """Populate task group map_index_template and expanded args on the
TIRunContext."""
+ try:
+ dag = get_latest_version_of_dag(dag_bag, dag_id, session)
+ except HTTPException:
+ return
+
+ task = dag.task_dict.get(task_id)
+ if not task:
+ return
+
+ for mtg in task.iter_mapped_task_groups():
+ if not mtg.map_index_template:
+ continue
+
+ context.task_group_map_index_template = mtg.map_index_template
+ context.task_group_expanded_args = _resolve_task_group_expand_args(
+ mtg._expand_input, map_index, run_id, session
+ )
+ break
Review Comment:
This stops at the first mapped task group with a template. I suggest to
document any assumptions on the iteration order and what it means for deeply
nested structures.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]