kaxil commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3025401784
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -230,7 +230,7 @@ def register_asset_change(
asset_event = AssetEvent(**event_kwargs)
session.add(asset_event)
- session.flush() # Ensure the event is written earlier than ADRQ
entries below.
+ session.commit() # Ensure the event is written earlier than ADRQ
entries below.
Review Comment:
Still present after the latest push. `session.commit()` mid-function breaks
transactional atomicity for callers. `register_asset_change` is called from the
Execution API (part of a TI state transition), the trigger model (looping
across multiple assets), and the Core API -- all passing their own session.
The commit persists the event even if subsequent work in the caller fails.
It also breaks `run_with_db_retries` -- committed data can't be retried on
transient DB errors.
The timestamp-based ADRQ upserts don't require this commit;
`session.flush()` already makes the row visible within the same transaction and
provides the ordering needed by the downstream upsert logic.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2074,29 +2074,31 @@ def _create_dag_runs_asset_triggered(
.order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
)
)
-
- dag_run = dag.create_dagrun(
- run_id=DagRun.generate_run_id(
- run_type=DagRunType.ASSET_TRIGGERED, logical_date=None,
run_after=triggered_date
- ),
- logical_date=None,
- data_interval=None,
- run_after=triggered_date,
- run_type=DagRunType.ASSET_TRIGGERED,
- triggered_by=DagRunTriggeredByType.ASSET,
- state=DagRunState.QUEUED,
- creating_job_id=self.job.id,
- session=session,
- )
- Stats.incr("asset.triggered_dagruns")
- dag_run.consumed_asset_events.extend(asset_events)
+ if asset_events:
+ dag_run = dag.create_dagrun(
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.ASSET_TRIGGERED,
logical_date=None, run_after=triggered_date
+ ),
+ logical_date=None,
+ data_interval=None,
+ run_after=triggered_date,
+ run_type=DagRunType.ASSET_TRIGGERED,
+ triggered_by=DagRunTriggeredByType.ASSET,
+ state=DagRunState.QUEUED,
+ creating_job_id=self.job.id,
+ session=session,
+ )
+ Stats.incr("asset.triggered_dagruns")
+ dag_run.consumed_asset_events.extend(asset_events)
# Delete only consumed ADRQ rows to avoid dropping newly queued
events
- # (e.g. DagRun triggered by asset A while a new event for asset B
arrives).
+ # (e.g. 1. DagRun triggered by asset A while a new event for asset
B arrives.
+ # 2. DagRun triggered by asset A while new event for asset A
upsert to ADRQ)
adrq_pks = [(record.asset_id, record.target_dag_id) for record in
queued_adrqs]
Review Comment:
The `created_at <= triggered_date` filter is a good addition -- it prevents
deleting rows that were upserted with a newer timestamp.
However, the deletion still runs **outside** the `if asset_events:` block.
When no DagRun is created (the empty-events path at line 2124), ADRQ rows with
`created_at <= triggered_date` are still deleted. Those rows represent pending
work that gets dropped without a DagRun ever consuming them.
Move the deletion (and the associated log) inside the `if asset_events:`
block so ADRQ rows are only cleaned up when a DagRun actually consumed their
events.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]