marclamberti opened a new issue, #54659:
URL: https://github.com/apache/airflow/issues/54659

   ### Apache Airflow version
   
   3.0.4
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When using dynamic task mapping with asset outlets, if multiple mapped task 
instances are completed in quick succession (or at the same time) and produce 
events for the same asset, the downstream asset-triggered DAG does not receive 
all asset events in the triggering_asset_events context variable.
   
   Context:
     - DAG A has a dynamically mapped task with outlets=[Asset("example")]
     - When the mapped task expands to 3 instances, each instance creates an 
AssetEvent upon
     completion
     - DAG B is triggered by Asset("example")
     - DAG B is triggered once (expected behavior due to batching events)
     - However, triggering_asset_events in DAG B only contains 1 event instead 
of all 3
     - The missing events appear in the NEXT trigger of DAG B with an Asset 
Event
   
   I think this might occur because:
     1. Each mapped task instance creates an AssetEvent and an AssetDagRunQueue 
entry in separate
     transactions
     2. The AssetDagRunQueue table has a composite primary key (asset_id, 
target_dag_id) preventing
     duplicates
     3. The scheduler uses MAX(AssetDagRunQueue.created_at) as the cutoff 
timestamp for which events
     to include
     4. Due to transaction timing, not all AssetEvents may be committed before 
the scheduler
     processes the queue
   
   ### What you think should happen instead?
   
   When DAG B is triggered by multiple asset events from DAG A's mapped tasks, 
ALL events that caused the trigger should be available in the 
triggering_asset_events context variable
   Or, DAG B should be triggered more than once to "consume" all Asset Events 
and not wait for another later Asset Event to process the previous ones.
   
   ### How to reproduce
   
   1. Create DAG A with dynamic task mapping:
   ```
   from airflow.sdk import Asset, task, dag
   from datetime import datetime
   
   asset = Asset("s3://bucket/asset_c")
   
   @dag(
         start_date=datetime(2024, 1, 1),
         schedule="@daily",
         catchup=False,
   def dag_a():
   
         @task(outlets=[asset])
         def produce_asset_event(item):
             print(f"Producing event for item {item}")
             return {"item": item}
   
         # This creates 3 mapped task instances
         produce_asset_event.expand(item=[1, 2, 3])
   
   dag_a()
   ```
   
   2. Create DAG B triggered by the asset:
    
   ```
   from datetime import datetime
   
   @dag(
         schedule=[asset]
   )
   def dag_b():
   
         @task
         def consume_events(triggering_asset_events=None):
             print(f"Received events: {triggering_asset_events}")
   
             # Count events for asset_c
             asset_events = triggering_asset_events[asset]
             print(f"Number of events for asset_c: {len(asset_events)}")
   
         consume_events()
   dag_b()
   ```
   
   ### Operating System
   
   MacOS 15.5
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   Astro CLI
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to