Copilot commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3025383473


##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4952,6 +4957,84 @@ def _lock_only_selected_asset(query, **_):
         ).one_or_none()
         assert adrq_2 is not None
 
+    @pytest.mark.need_serialized_dag
+    @pytest.mark.backend("postgres", "mysql")
+    def test_create_dag_runs_when_concurrent_asset_events_created(self, 
session: Session, dag_maker, caplog):
+        import random
+        from concurrent.futures import ThreadPoolExecutor, as_completed
+
+        ASSET_EVENT_COUNT = 100
+        asset = Asset(name="test_asset")
+        with dag_maker(dag_id="consumer", schedule=asset, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="simulate-asset-outlet", bash_command="echo 
1")
+        dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+        futures = []
+        consumed_asset_events = []
+
+        def create_asset_events(sleep):
+            import time
+
+            from sqlalchemy import inspect
+
+            with create_session() as session:
+                now = timezone.utcnow()
+                asset_event = AssetEvent(asset_id=asset_id, timestamp=now)
+                session.add(asset_event)
+                session.commit()
+                time.sleep(sleep)  # sleep to simulate slow perforamcne
+                asset_manager = AssetManager()
+                if inspect(session.get_bind()).dialect.name == "postgresql":

Review Comment:
   This test uses `time.sleep()` to create timing windows. Sleep-based timing 
tests are inherently flaky on CI (variable load, slower DBs) and can produce 
intermittent timeouts or false negatives.
   
   Consider coordinating concurrency deterministically (e.g., 
`threading.Barrier`/`Event` to pause workers at specific points) instead of 
sleeping, so the test reliably reproduces the race without depending on 
wall-clock delays.



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -552,15 +560,38 @@ def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
         if queued_dag_ids := [r for r in queued_results if r is not None]:
             cls.logger().debug("consuming dag ids %s", queued_dag_ids)
 
+    @classmethod
+    def _queue_dagruns_nonpartitioned_mysql(
+        cls, asset_id: int, dags_to_queue: set[DagModel], event: AssetEvent, 
session: Session
+    ) -> None:
+        from sqlalchemy import case
+        from sqlalchemy.dialects.mysql import insert
+

Review Comment:
   Imports are introduced inside `_queue_dagruns_nonpartitioned_mysql()`. In 
this codebase imports are generally kept at module scope; keeping them inside 
the function adds avoidable runtime overhead and makes static analysis harder. 
Unless there is a specific circular-import or optional-dependency reason, move 
these imports to the top of the module (or document why they must remain local).



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4952,6 +4957,84 @@ def _lock_only_selected_asset(query, **_):
         ).one_or_none()
         assert adrq_2 is not None
 
+    @pytest.mark.need_serialized_dag
+    @pytest.mark.backend("postgres", "mysql")
+    def test_create_dag_runs_when_concurrent_asset_events_created(self, 
session: Session, dag_maker, caplog):
+        import random
+        from concurrent.futures import ThreadPoolExecutor, as_completed
+
+        ASSET_EVENT_COUNT = 100
+        asset = Asset(name="test_asset")
+        with dag_maker(dag_id="consumer", schedule=asset, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="simulate-asset-outlet", bash_command="echo 
1")
+        dr = dag_maker.create_dagrun(run_id="asset-producer-run")

Review Comment:
   `dr = dag_maker.create_dagrun(run_id="asset-producer-run")` is assigned but 
not used anywhere in this test. Removing the unused variable (or using it to 
populate the `AssetEvent` source fields if that’s the intent) will reduce 
confusion and keep the test focused on the concurrency behavior under test.
   ```suggestion
           dag_maker.create_dagrun(run_id="asset-producer-run")
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4952,6 +4957,84 @@ def _lock_only_selected_asset(query, **_):
         ).one_or_none()
         assert adrq_2 is not None
 
+    @pytest.mark.need_serialized_dag
+    @pytest.mark.backend("postgres", "mysql")
+    def test_create_dag_runs_when_concurrent_asset_events_created(self, 
session: Session, dag_maker, caplog):
+        import random
+        from concurrent.futures import ThreadPoolExecutor, as_completed
+
+        ASSET_EVENT_COUNT = 100
+        asset = Asset(name="test_asset")
+        with dag_maker(dag_id="consumer", schedule=asset, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="simulate-asset-outlet", bash_command="echo 
1")
+        dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+        futures = []
+        consumed_asset_events = []
+
+        def create_asset_events(sleep):
+            import time
+
+            from sqlalchemy import inspect
+
+            with create_session() as session:
+                now = timezone.utcnow()
+                asset_event = AssetEvent(asset_id=asset_id, timestamp=now)
+                session.add(asset_event)
+                session.commit()
+                time.sleep(sleep)  # sleep to simulate slow perforamcne
+                asset_manager = AssetManager()
+                if inspect(session.get_bind()).dialect.name == "postgresql":
+                    asset_manager._queue_dagruns_nonpartitioned_postgres(
+                        asset_id=asset_id, dags_to_queue=[dag_model], 
event=asset_event, session=session
+                    )
+                elif inspect(session.get_bind()).dialect.name == "mysql":
+                    asset_manager._queue_dagruns_nonpartitioned_mysql(
+                        asset_id=asset_id, dags_to_queue=[dag_model], 
event=asset_event, session=session
+                    )
+
+            return asset_event
+
+        with (
+            ThreadPoolExecutor() as exec,
+            caplog.at_level(
+                "WARNING",
+                logger="airflow.jobs.scheduler_job_runner",
+            ),
+        ):
+            for _ in range(ASSET_EVENT_COUNT):
+                future = exec.submit(create_asset_events, random.randint(0, 1))
+                futures.append(future)

Review Comment:
   The test introduces randomized delays (`random.randint(0, 1)`) to drive 
concurrency. This makes the test non-deterministic and can lead to intermittent 
failures (e.g., different ordering causing different numbers of scheduler 
iterations or longer runtimes).
   
   Prefer a deterministic delay pattern (or seed the RNG) and/or explicit 
synchronization so the test outcome does not depend on randomness.



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4952,6 +4957,84 @@ def _lock_only_selected_asset(query, **_):
         ).one_or_none()
         assert adrq_2 is not None
 
+    @pytest.mark.need_serialized_dag
+    @pytest.mark.backend("postgres", "mysql")
+    def test_create_dag_runs_when_concurrent_asset_events_created(self, 
session: Session, dag_maker, caplog):
+        import random
+        from concurrent.futures import ThreadPoolExecutor, as_completed
+
+        ASSET_EVENT_COUNT = 100
+        asset = Asset(name="test_asset")
+        with dag_maker(dag_id="consumer", schedule=asset, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="simulate-asset-outlet", bash_command="echo 
1")
+        dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+        futures = []
+        consumed_asset_events = []
+
+        def create_asset_events(sleep):
+            import time
+
+            from sqlalchemy import inspect
+
+            with create_session() as session:
+                now = timezone.utcnow()
+                asset_event = AssetEvent(asset_id=asset_id, timestamp=now)
+                session.add(asset_event)
+                session.commit()
+                time.sleep(sleep)  # sleep to simulate slow perforamcne
+                asset_manager = AssetManager()
+                if inspect(session.get_bind()).dialect.name == "postgresql":
+                    asset_manager._queue_dagruns_nonpartitioned_postgres(
+                        asset_id=asset_id, dags_to_queue=[dag_model], 
event=asset_event, session=session
+                    )
+                elif inspect(session.get_bind()).dialect.name == "mysql":
+                    asset_manager._queue_dagruns_nonpartitioned_mysql(
+                        asset_id=asset_id, dags_to_queue=[dag_model], 
event=asset_event, session=session
+                    )
+
+            return asset_event
+
+        with (
+            ThreadPoolExecutor() as exec,
+            caplog.at_level(
+                "WARNING",
+                logger="airflow.jobs.scheduler_job_runner",
+            ),
+        ):
+            for _ in range(ASSET_EVENT_COUNT):
+                future = exec.submit(create_asset_events, random.randint(0, 1))
+                futures.append(future)
+            scheduler_job = Job()
+            prev_dr = None
+            self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+            for future in as_completed(futures, timeout=60):
+                asset = future.result()

Review Comment:
   Inside the `as_completed` loop, `asset = future.result()` shadows the 
earlier `asset` variable (the `Asset` definition used for scheduling). Even 
though the shadowed value isn’t used afterwards, this makes the test harder to 
read.
   
   Rename the loop variable (e.g., `asset_event`) or drop the assignment if 
it’s unused.
   ```suggestion
                   future.result()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to