kaxil commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3025401784


##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -230,7 +230,7 @@ def register_asset_change(
 
         asset_event = AssetEvent(**event_kwargs)
         session.add(asset_event)
-        session.flush()  # Ensure the event is written earlier than ADRQ 
entries below.
+        session.commit()  # Ensure the event is written earlier than ADRQ 
entries below.

Review Comment:
   Still present after the latest push. `session.commit()` mid-function breaks 
transactional atomicity for callers. `register_asset_change` is called from the 
Execution API (part of a TI state transition), the trigger model (looping 
across multiple assets), and the Core API -- all passing their own session.
   
   The commit persists the event even if subsequent work in the caller fails. 
It also breaks `run_with_db_retries` -- committed data can't be retried on 
transient DB errors.
   
   The timestamp-based ADRQ upserts don't require this commit; 
`session.flush()` already makes the row visible within the same transaction and 
provides the ordering needed by the downstream upsert logic.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2074,29 +2074,31 @@ def _create_dag_runs_asset_triggered(
                     .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
                 )
             )
-
-            dag_run = dag.create_dagrun(
-                run_id=DagRun.generate_run_id(
-                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=triggered_date
-                ),
-                logical_date=None,
-                data_interval=None,
-                run_after=triggered_date,
-                run_type=DagRunType.ASSET_TRIGGERED,
-                triggered_by=DagRunTriggeredByType.ASSET,
-                state=DagRunState.QUEUED,
-                creating_job_id=self.job.id,
-                session=session,
-            )
-            Stats.incr("asset.triggered_dagruns")
-            dag_run.consumed_asset_events.extend(asset_events)
+            if asset_events:
+                dag_run = dag.create_dagrun(
+                    run_id=DagRun.generate_run_id(
+                        run_type=DagRunType.ASSET_TRIGGERED, 
logical_date=None, run_after=triggered_date
+                    ),
+                    logical_date=None,
+                    data_interval=None,
+                    run_after=triggered_date,
+                    run_type=DagRunType.ASSET_TRIGGERED,
+                    triggered_by=DagRunTriggeredByType.ASSET,
+                    state=DagRunState.QUEUED,
+                    creating_job_id=self.job.id,
+                    session=session,
+                )
+                Stats.incr("asset.triggered_dagruns")
+                dag_run.consumed_asset_events.extend(asset_events)
 
             # Delete only consumed ADRQ rows to avoid dropping newly queued 
events
-            # (e.g. DagRun triggered by asset A while a new event for asset B 
arrives).
+            # (e.g. 1. DagRun triggered by asset A while a new event for asset 
B arrives.
+            # 2. DagRun triggered by asset A while new event for asset A 
upsert to ADRQ)
             adrq_pks = [(record.asset_id, record.target_dag_id) for record in 
queued_adrqs]

Review Comment:
   The `created_at <= triggered_date` filter is a good addition -- it prevents 
deleting rows that were upserted with a newer timestamp.
   
   However, the deletion still runs **outside** the `if asset_events:` block. 
When no DagRun is created (the empty-events path at line 2124), ADRQ rows with 
`created_at <= triggered_date` are still deleted. Those rows represent pending 
work that gets dropped without a DagRun ever consuming them.
   
   Move the deletion (and the associated log) inside the `if asset_events:` 
block so ADRQ rows are only cleaned up when a DagRun actually consumed their 
events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to