This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b3055d7bb8c Add row lock to ADRQ before Dag run creation (#60773)
b3055d7bb8c is described below

commit b3055d7bb8cea89a77d7179b8896b84bac29de77
Author: Wei Lee <[email protected]>
AuthorDate: Tue Mar 17 11:35:04 2026 +0800

    Add row lock to ADRQ before Dag run creation (#60773)
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 45 ++++++++---
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 89 +++++++++++++++++++++-
 2 files changed, 120 insertions(+), 14 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index b078cb183bc..f8c043f0c9f 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1838,7 +1838,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         if asset_triggered_dags:
             self._create_dag_runs_asset_triggered(
                 dag_models=[d for d in asset_triggered_dags if d.dag_id not in 
partition_dag_ids],
-                triggered_date_by_dag=triggered_date_by_dag,
                 session=session,
             )
 
@@ -2005,30 +2004,44 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
     def _create_dag_runs_asset_triggered(
         self,
+        *,
         dag_models: Collection[DagModel],
-        triggered_date_by_dag: dict[str, datetime],
         session: Session,
     ) -> None:
-        """For DAGs that are triggered by assets, create dag runs."""
-        triggered_dates: dict[str, DateTime] = {
-            dag_id: timezone.coerce_datetime(last_asset_event_time)
-            for dag_id, last_asset_event_time in triggered_date_by_dag.items()
-        }
-
+        """For Dags that are triggered by assets, create Dag runs."""
         for dag_model in dag_models:
             dag = self._get_current_dag(dag_id=dag_model.dag_id, 
session=session)
             if not dag:
-                self.log.error("DAG '%s' not found in serialized_dag table", 
dag_model.dag_id)
+                self.log.error("Dag '%s' not found in serialized_dag table", 
dag_model.dag_id)
                 continue
 
             if not isinstance(dag.timetable, AssetTriggeredTimetable):
                 self.log.error(
-                    "DAG '%s' was asset-scheduled, but didn't have an 
AssetTriggeredTimetable!",
+                    "Dag '%s' was asset-scheduled, but didn't have an 
AssetTriggeredTimetable!",
                     dag_model.dag_id,
                 )
                 continue
 
-            triggered_date = triggered_dates[dag.dag_id]
+            queued_adrqs = session.scalars(
+                with_row_locks(
+                    select(AssetDagRunQueue)
+                    .where(AssetDagRunQueue.target_dag_id == dag.dag_id)
+                    .order_by(AssetDagRunQueue.created_at.desc()),
+                    of=AssetDagRunQueue,
+                    skip_locked=True,
+                    key_share=False,
+                    session=session,
+                )
+            ).all()
+            # If another scheduler already locked these ADRQ rows, SKIP LOCKED 
makes this scheduler skip them.
+            if not queued_adrqs:
+                self.log.debug(
+                    "Skipping asset-triggered DagRun creation for Dag '%s'; no 
queued assets remain.",
+                    dag.dag_id,
+                )
+                continue
+
+            triggered_date: DateTime = 
timezone.coerce_datetime(queued_adrqs[0].created_at)
             cte = (
                 
select(func.max(DagRun.run_after).label("previous_dag_run_run_after"))
                 .where(
@@ -2077,7 +2090,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
             Stats.incr("asset.triggered_dagruns")
             dag_run.consumed_asset_events.extend(asset_events)
-            
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_run.dag_id))
+
+            # Delete only consumed ADRQ rows to avoid dropping newly queued 
events
+            # (e.g. DagRun triggered by asset A while a new event for asset B 
arrives).
+            adrq_pks = [(record.asset_id, record.target_dag_id) for record in 
queued_adrqs]
+            session.execute(
+                delete(AssetDagRunQueue).where(
+                    tuple_(AssetDagRunQueue.asset_id, 
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
+                )
+            )
 
     def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) 
-> dict[int, Backfill]:
         """
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 7c8d2ff4ffb..4511c333689 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -285,9 +285,9 @@ class TestSchedulerJob:
     def set_instance_attrs(self) -> Generator:
         # Speed up some tests by not running the tasks, just look at what we
         # enqueue!
-        self.null_exec: MockExecutor | None = MockExecutor()
+        self.null_exec: BaseExecutor = MockExecutor()
         yield
-        self.null_exec = None
+        self.null_exec = None  # type: ignore[assignment]
 
     @pytest.fixture
     def mock_executors(self):
@@ -4858,6 +4858,91 @@ class TestSchedulerJob:
 
         assert created_run.creating_job_id == scheduler_job.id
 
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self, 
session, dag_maker):
+        asset = Asset(uri="test://asset-for-stale-trigger-date", 
name="asset-for-stale-trigger-date")
+        with dag_maker(dag_id="asset-consumer-stale-trigger-date", 
schedule=[asset], session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+
+        queued_at = timezone.utcnow()
+        session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id, 
asset_id=asset_id, created_at=queued_at))
+        session.flush()
+
+        # Simulate another scheduler consuming ADRQ rows after we computed 
triggered_date_by_dag.
+        
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_model.dag_id))
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+        self.job_runner._create_dag_runs_asset_triggered(
+            dag_models=[dag_model],
+            session=session,
+        )
+
+        # We do not create a new DagRun seems the ADRQ has already been 
consumed
+        assert session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none() is None
+
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+        self, session: Session, dag_maker
+    ):
+        asset_1 = Asset("ready-to-trigger-a-Dag-run")
+        asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+        with dag_maker(dag_id="asset-consumer-delete-selected", 
schedule=asset_1 | asset_2, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_1.name))
+        asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_2.name))
+
+        session.add_all(
+            [
+                # The ADRQ that should triggers the Dag run creation
+                AssetDagRunQueue(
+                    asset_id=asset_1_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+                # The ADRQ that arrives after the Dag run creation but before 
ADRQ clean up
+                # This situation is simluarted by _lock_only_selected_asset 
below
+                AssetDagRunQueue(
+                    asset_id=asset_2_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+            ]
+        )
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        def _lock_only_selected_asset(query, **_):
+            # Simulate SKIP LOCKED behavior where this scheduler can only 
consume one ADRQ row.
+            return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+        with patch("airflow.jobs.scheduler_job_runner.with_row_locks", 
side_effect=_lock_only_selected_asset):
+            self.job_runner._create_dag_runs_asset_triggered(
+                dag_models=[dag_model],
+                session=session,
+            )
+
+        dr = session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none()
+
+        assert dr is not None
+
+        adrq_1 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_1_id,
+            )
+        ).one_or_none()
+        assert adrq_1 is None
+        adrq_2 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_2_id,
+            )
+        ).one_or_none()
+        assert adrq_2 is not None
+
     @pytest.mark.need_serialized_dag
     def test_create_dag_runs_asset_alias_with_asset_event_attached(self, 
session, dag_maker):
         """

Reply via email to