marclamberti opened a new issue, #54659:
URL: https://github.com/apache/airflow/issues/54659
### Apache Airflow version
3.0.4
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
When using dynamic task mapping with asset outlets, if multiple mapped task
instances are completed in quick succession (or at the same time) and produce
events for the same asset, the downstream asset-triggered DAG does not receive
all asset events in the triggering_asset_events context variable.
Context:
- DAG A has a dynamically mapped task with outlets=[Asset("example")]
- When the mapped task expands to 3 instances, each instance creates an
AssetEvent upon
completion
- DAG B is triggered by Asset("example")
- DAG B is triggered once (expected behavior due to batching events)
- However, triggering_asset_events in DAG B only contains 1 event instead
of all 3
- The missing events appear in the NEXT trigger of DAG B with an Asset
Event
I think this might occur because:
1. Each mapped task instance creates an AssetEvent and an AssetDagRunQueue
entry in separate
transactions
2. The AssetDagRunQueue table has a composite primary key (asset_id,
target_dag_id) preventing
duplicates
3. The scheduler uses MAX(AssetDagRunQueue.created_at) as the cutoff
timestamp for which events
to include
4. Due to transaction timing, not all AssetEvents may be committed before
the scheduler
processes the queue
### What you think should happen instead?
When DAG B is triggered by multiple asset events from DAG A's mapped tasks,
ALL events that caused the trigger should be available in the
triggering_asset_events context variable
Or, DAG B should be triggered more than once to "consume" all Asset Events
and not wait for another later Asset Event to process the previous ones.
### How to reproduce
1. Create DAG A with dynamic task mapping:
```
from airflow.sdk import Asset, task, dag
from datetime import datetime
asset = Asset("s3://bucket/asset_c")
@dag(
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
def dag_a():
@task(outlets=[asset])
def produce_asset_event(item):
print(f"Producing event for item {item}")
return {"item": item}
# This creates 3 mapped task instances
produce_asset_event.expand(item=[1, 2, 3])
dag_a()
```
2. Create DAG B triggered by the asset:
```
from datetime import datetime
@dag(
schedule=[asset]
)
def dag_b():
@task
def consume_events(triggering_asset_events=None):
print(f"Received events: {triggering_asset_events}")
# Count events for asset_c
asset_events = triggering_asset_events[asset]
print(f"Number of events for asset_c: {len(asset_events)}")
consume_events()
dag_b()
```
### Operating System
MacOS 15.5
### Versions of Apache Airflow Providers
_No response_
### Deployment
Astronomer
### Deployment details
Astro CLI
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]