dingo4dev commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3025685973


##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -230,7 +230,7 @@ def register_asset_change(
 
         asset_event = AssetEvent(**event_kwargs)
         session.add(asset_event)
-        session.flush()  # Ensure the event is written earlier than ADRQ 
entries below.
+        session.commit()  # Ensure the event is written earlier than ADRQ 
entries below.

Review Comment:
   @kaxil Thanks for the review!
   
   I have tested both session.commit() and session.flush() in the context of 
this PR. To verify the behavior, I added a test case to simulate this scenario: 
[test_scheduler_job.py#L4985-L4987](https://github.com/dingo4dev/airflow/blob/f0d5a6571f3f8582e6b498a645bd5ed2e068da5f/airflow-core/tests/unit/jobs/test_scheduler_job.py#L4985-L4987)
 
   
   During testing, I found that the race condition persisted even with 
`session.flush()` . Specifically, asset events become orphaned when a newer 
AssetDagRunQueue (ADRQ) record is consumed by the scheduler before the 
corresponding asset event is fully persisted and visible.
   
   This happens because the scheduler fetches asset events based on the 
`run_after` timestamp of the previous DAG run and the `triggered_date` of the 
ADRQ record. If these aren't populated and committed immediately, a subsequent 
scheduler loop can "skip" or misinterpret the sequence, leading to a permanent 
misfire of the event.
   
   Any suggestion on separate short-lived session for asset event, should asset 
event be handled there to keep the transaction as light as possible? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to