dingo4dev commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3005705703


##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5075,6 +5126,39 @@ def test_no_create_dag_runs_when_dag_disabled(self, 
session, dag_maker, disable,
         assert len(session.scalars(adrq_q).all()) == 1
         assert session.scalars(adrq_q).one().target_dag_id == "consumer"
 
+    @pytest.mark.need_serialized_dag
+    def test_no_create_dag_runs_when_no_asset_event(self, session: Session, 
dag_maker):
+        asset = Asset(name="test_asset")
+        with dag_maker(dag_id="consumer", schedule=asset, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+        # Simulate an ADRQ row being updated while the scheduler is creating 
asset-triggered DagRuns.
+        # Each asset event can update or insert ADRQ rows.
+        # Assume the matching asset events were already consumed by an earlier 
ADRQ row.
+        adrq = AssetDagRunQueue(
+            asset_id=asset_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+        )
+        session.add(adrq)
+        session.flush()
+        adrq.created_at = timezone.utcnow() + timedelta(seconds=1)
+        session.merge(adrq)
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+        with create_session() as session:
+            self.job_runner._create_dag_runs_asset_triggered(
+                dag_models=[dag_model],
+                session=session,
+            )
+        dr = session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none()
+        assert dr is None
+        _adrq = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.asset_id == asset_id, 
AssetDagRunQueue.target_dag_id == dag_model.dag_id

Review Comment:
   Added assert for scheduler log message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to