joshOberhaus opened a new issue, #67265:
URL: https://github.com/apache/airflow/issues/67265
### Under which category would you file this issue?
Airflow Core
### Apache Airflow version
3.2.1
### What happened and how to reproduce it?
In the following minimal example, run_optional_step runs for each sibling
(item_type `full`, and item_type `partial`), however `partial` is intended to
be skipped::
```
"""Minimal example: @task.branch inside a dynamically mapped task group.
This example replicates the structure of a real-world DAG, where:
- One outer task group (process_items) fetches items and expands
an inner task group (handle_item) via expand_kwargs.
- Inside handle_item, a @task.branch decides whether to run
run_optional_step based on item_type.
Expected behavior
------------------
item_type == "full" → branch returns absolute task ID → run_optional_step
should run
item_type == "partial" → branch returns None → run_optional_step
should be skipped
Actual behavior
-------------------------
run_optional_step runs for ALL items.
"""
from __future__ import annotations
import pendulum
from airflow.sdk import DAG, task, task_group
from airflow.providers.standard.operators.empty import EmptyOperator
with DAG(
dag_id="branch_in_mapped_taskgroup_example",
schedule=None,
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
catchup=False,
) as dag:
# ------------------------------------------------------------------ #
# Inner task group: handles one mapped item. #
# The branch should skip or run an optional step per mapped instance. #
# ------------------------------------------------------------------ #
@task_group(group_id="handle_item")
def tg_handle_item(item_id: str, item_type: str):
"""Process one item. Only 'full' items need the extra step."""
@task(task_id="process")
def process(item_id: str, item_type: str) -> str:
print(f"Processing {item_id=} {item_type=}")
return item_type
result = process(item_id, item_type)
@task.branch(task_id="select_optional_step")
def select_optional_step(item_type: str):
if item_type == "full":
return ["process_items.handle_item.run_optional_step"]
return None
branch = select_optional_step(result)
run_optional_step = EmptyOperator(task_id="run_optional_step")
branch >> run_optional_step
# ------------------------------------------------------------------ #
# Outer task group: loads items and expands the inner group once per #
# item. There is only one outer group instance in the DAG. #
# ------------------------------------------------------------------ #
@task_group(group_id="process_items")
def tg_process_items():
@task(task_id="get_items")
def get_items() -> list[dict]:
return [
{"item_id": "item_1", "item_type": "full"}, # optional
step should run
{"item_id": "item_2", "item_type": "partial"}, # optional
step should be skipped
]
tg_handle_item.expand_kwargs(get_items())
tg_process_items()
```
### What you think should happen instead?
item_type == "full" → branch returns absolute task ID → run_optional_step
should run
item_type == "partial" → branch returns None → run_optional_step
should be skipped
This is what happens up through 3.1.7.
### Operating System
_No response_
### Deployment
None
### Apache Airflow Provider(s)
_No response_
### Versions of Apache Airflow Providers
_No response_
### Official Helm Chart version
Not Applicable
### Kubernetes Version
_No response_
### Helm Chart configuration
_No response_
### Docker Image customizations
_No response_
### Anything else?
This was originally discussed in
https://github.com/apache/airflow/issues/65745, however, it seems like that
issue is either not-reproducible or describing another issue.
[Further context from
@martijn-exads](https://github.com/apache/airflow/issues/65745#issuecomment-4497401392):
We think we've traced it to https://github.com/apache/airflow/pull/62287,
specifically this line in NotPreviouslySkippedDep:
xcom_map_index = ti.map_index if parent.is_mapped else -1
For a branch operator inside a mapped @task_group, the operator itself has
no .expand() so parent.is_mapped is False, but its TIs run with map_index >= 0
and SkipMixin writes XCOM_SKIPMIXIN_KEY at the parent TI's per-map map_index.
The dep then queries map_indexes=-1, finds nothing, returns "not skipped", and
the non-selected sibling proceeds.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]