This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b3055d7bb8c Add row lock to ADRQ before Dag run creation (#60773)
b3055d7bb8c is described below
commit b3055d7bb8cea89a77d7179b8896b84bac29de77
Author: Wei Lee <[email protected]>
AuthorDate: Tue Mar 17 11:35:04 2026 +0800
Add row lock to ADRQ before Dag run creation (#60773)
---
.../src/airflow/jobs/scheduler_job_runner.py | 45 ++++++++---
airflow-core/tests/unit/jobs/test_scheduler_job.py | 89 +++++++++++++++++++++-
2 files changed, 120 insertions(+), 14 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index b078cb183bc..f8c043f0c9f 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1838,7 +1838,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if asset_triggered_dags:
self._create_dag_runs_asset_triggered(
dag_models=[d for d in asset_triggered_dags if d.dag_id not in
partition_dag_ids],
- triggered_date_by_dag=triggered_date_by_dag,
session=session,
)
@@ -2005,30 +2004,44 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
def _create_dag_runs_asset_triggered(
self,
+ *,
dag_models: Collection[DagModel],
- triggered_date_by_dag: dict[str, datetime],
session: Session,
) -> None:
- """For DAGs that are triggered by assets, create dag runs."""
- triggered_dates: dict[str, DateTime] = {
- dag_id: timezone.coerce_datetime(last_asset_event_time)
- for dag_id, last_asset_event_time in triggered_date_by_dag.items()
- }
-
+ """For Dags that are triggered by assets, create Dag runs."""
for dag_model in dag_models:
dag = self._get_current_dag(dag_id=dag_model.dag_id,
session=session)
if not dag:
- self.log.error("DAG '%s' not found in serialized_dag table",
dag_model.dag_id)
+ self.log.error("Dag '%s' not found in serialized_dag table",
dag_model.dag_id)
continue
if not isinstance(dag.timetable, AssetTriggeredTimetable):
self.log.error(
- "DAG '%s' was asset-scheduled, but didn't have an
AssetTriggeredTimetable!",
+ "Dag '%s' was asset-scheduled, but didn't have an
AssetTriggeredTimetable!",
dag_model.dag_id,
)
continue
- triggered_date = triggered_dates[dag.dag_id]
+ queued_adrqs = session.scalars(
+ with_row_locks(
+ select(AssetDagRunQueue)
+ .where(AssetDagRunQueue.target_dag_id == dag.dag_id)
+ .order_by(AssetDagRunQueue.created_at.desc()),
+ of=AssetDagRunQueue,
+ skip_locked=True,
+ key_share=False,
+ session=session,
+ )
+ ).all()
+ # If another scheduler already locked these ADRQ rows, SKIP LOCKED
makes this scheduler skip them.
+ if not queued_adrqs:
+ self.log.debug(
+ "Skipping asset-triggered DagRun creation for Dag '%s'; no
queued assets remain.",
+ dag.dag_id,
+ )
+ continue
+
+ triggered_date: DateTime =
timezone.coerce_datetime(queued_adrqs[0].created_at)
cte = (
select(func.max(DagRun.run_after).label("previous_dag_run_run_after"))
.where(
@@ -2077,7 +2090,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
Stats.incr("asset.triggered_dagruns")
dag_run.consumed_asset_events.extend(asset_events)
-
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_run.dag_id))
+
+ # Delete only consumed ADRQ rows to avoid dropping newly queued
events
+ # (e.g. DagRun triggered by asset A while a new event for asset B
arrives).
+ adrq_pks = [(record.asset_id, record.target_dag_id) for record in
queued_adrqs]
+ session.execute(
+ delete(AssetDagRunQueue).where(
+ tuple_(AssetDagRunQueue.asset_id,
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
+ )
+ )
def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session)
-> dict[int, Backfill]:
"""
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 7c8d2ff4ffb..4511c333689 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -285,9 +285,9 @@ class TestSchedulerJob:
def set_instance_attrs(self) -> Generator:
# Speed up some tests by not running the tasks, just look at what we
# enqueue!
- self.null_exec: MockExecutor | None = MockExecutor()
+ self.null_exec: BaseExecutor = MockExecutor()
yield
- self.null_exec = None
+ self.null_exec = None # type: ignore[assignment]
@pytest.fixture
def mock_executors(self):
@@ -4858,6 +4858,91 @@ class TestSchedulerJob:
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+
+ session.add_all(
+ [
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
@pytest.mark.need_serialized_dag
def test_create_dag_runs_asset_alias_with_asset_event_attached(self,
session, dag_maker):
"""