This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 9e462bac5d4 [v3-1-test] Add row lock to ADRQ before Dag run creation
(apache#60773) (#63776)
9e462bac5d4 is described below
commit 9e462bac5d421caffbb0f05be340af3226e95b69
Author: Wei Lee <[email protected]>
AuthorDate: Mon Mar 23 10:24:45 2026 +0800
[v3-1-test] Add row lock to ADRQ before Dag run creation (apache#60773)
(#63776)
* [v3-1-test] Add row lock to ADRQ before Dag run creation (#60773)
(cherry picked from commit b3055d7bb8cea89a77d7179b8896b84bac29de77)
Co-authored-by: Wei Lee <[email protected]>
* fixup! [v3-1-test] Add row lock to ADRQ before Dag run creation (#60773)
(cherry picked from commit b3055d7bb8cea89a77d7179b8896b84bac29de77)
---
.../src/airflow/jobs/scheduler_job_runner.py | 45 ++++++++---
airflow-core/tests/unit/jobs/test_scheduler_job.py | 89 +++++++++++++++++++++-
2 files changed, 119 insertions(+), 15 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index fc223d7ca80..587db7d0d2a 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1592,7 +1592,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if asset_triggered_dags:
self._create_dag_runs_asset_triggered(
dag_models=asset_triggered_dags,
- triggered_date_by_dag=triggered_date_by_dag,
session=session,
)
@@ -1705,30 +1704,44 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
def _create_dag_runs_asset_triggered(
self,
+ *,
dag_models: Collection[DagModel],
- triggered_date_by_dag: dict[str, datetime],
session: Session,
) -> None:
- """For DAGs that are triggered by assets, create dag runs."""
- triggered_dates: dict[str, DateTime] = {
- dag_id: timezone.coerce_datetime(last_asset_event_time)
- for dag_id, last_asset_event_time in triggered_date_by_dag.items()
- }
-
+ """For Dags that are triggered by assets, create Dag runs."""
for dag_model in dag_models:
dag = self._get_current_dag(dag_id=dag_model.dag_id,
session=session)
if not dag:
- self.log.error("DAG '%s' not found in serialized_dag table",
dag_model.dag_id)
+ self.log.error("Dag '%s' not found in serialized_dag table",
dag_model.dag_id)
continue
if not isinstance(dag.timetable, AssetTriggeredTimetable):
self.log.error(
- "DAG '%s' was asset-scheduled, but didn't have an
AssetTriggeredTimetable!",
+ "Dag '%s' was asset-scheduled, but didn't have an
AssetTriggeredTimetable!",
dag_model.dag_id,
)
continue
- triggered_date = triggered_dates[dag.dag_id]
+ queued_adrqs = session.scalars(
+ with_row_locks(
+ select(AssetDagRunQueue)
+ .where(AssetDagRunQueue.target_dag_id == dag.dag_id)
+ .order_by(AssetDagRunQueue.created_at.desc()),
+ of=AssetDagRunQueue,
+ skip_locked=True,
+ key_share=False,
+ session=session,
+ )
+ ).all()
+ # If another scheduler already locked these ADRQ rows, SKIP LOCKED
makes this scheduler skip them.
+ if not queued_adrqs:
+ self.log.debug(
+ "Skipping asset-triggered DagRun creation for Dag '%s'; no
queued assets remain.",
+ dag.dag_id,
+ )
+ continue
+
+ triggered_date: DateTime =
timezone.coerce_datetime(queued_adrqs[0].created_at)
cte = (
select(func.max(DagRun.run_after).label("previous_dag_run_run_after"))
.where(
@@ -1775,7 +1788,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
Stats.incr("asset.triggered_dagruns")
dag_run.consumed_asset_events.extend(asset_events)
-
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_run.dag_id))
+
+ # Delete only consumed ADRQ rows to avoid dropping newly queued
events
+ # (e.g. DagRun triggered by asset A while a new event for asset B
arrives).
+ adrq_pks = [(record.asset_id, record.target_dag_id) for record in
queued_adrqs]
+ session.execute(
+ delete(AssetDagRunQueue).where(
+ tuple_(AssetDagRunQueue.asset_id,
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
+ )
+ )
def _should_update_dag_next_dagruns(
self,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 0d6d2ddf360..4d91ed47cd1 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -36,7 +36,7 @@ import psutil
import pytest
import time_machine
from pytest import param
-from sqlalchemy import func, select, update
+from sqlalchemy import delete, func, select, update
from sqlalchemy.orm import joinedload
from airflow import settings
@@ -214,9 +214,9 @@ class TestSchedulerJob:
def set_instance_attrs(self) -> Generator:
# Speed up some tests by not running the tasks, just look at what we
# enqueue!
- self.null_exec: MockExecutor | None = MockExecutor()
+ self.null_exec: BaseExecutor = MockExecutor()
yield
- self.null_exec = None
+ self.null_exec = None # type: ignore[assignment]
@pytest.fixture
def mock_executors(self):
@@ -4587,6 +4587,89 @@ class TestSchedulerJob:
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def
test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(self,
session, dag_maker):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+
+ session.add_all(
+ [
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
@pytest.mark.need_serialized_dag
def test_create_dag_runs_asset_alias_with_asset_event_attached(self,
session, dag_maker):
"""