Copilot commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3025383473
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4952,6 +4957,84 @@ def _lock_only_selected_asset(query, **_):
).one_or_none()
assert adrq_2 is not None
+ @pytest.mark.need_serialized_dag
+ @pytest.mark.backend("postgres", "mysql")
+ def test_create_dag_runs_when_concurrent_asset_events_created(self,
session: Session, dag_maker, caplog):
+ import random
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ ASSET_EVENT_COUNT = 100
+ asset = Asset(name="test_asset")
+ with dag_maker(dag_id="consumer", schedule=asset, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="simulate-asset-outlet", bash_command="echo
1")
+ dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+ futures = []
+ consumed_asset_events = []
+
+ def create_asset_events(sleep):
+ import time
+
+ from sqlalchemy import inspect
+
+ with create_session() as session:
+ now = timezone.utcnow()
+ asset_event = AssetEvent(asset_id=asset_id, timestamp=now)
+ session.add(asset_event)
+ session.commit()
+ time.sleep(sleep) # sleep to simulate slow perforamcne
+ asset_manager = AssetManager()
+ if inspect(session.get_bind()).dialect.name == "postgresql":
Review Comment:
This test uses `time.sleep()` to create timing windows. Sleep-based timing
tests are inherently flaky on CI (variable load, slower DBs) and can produce
intermittent timeouts or false negatives.
Consider coordinating concurrency deterministically (e.g.,
`threading.Barrier`/`Event` to pause workers at specific points) instead of
sleeping, so the test reliably reproduces the race without depending on
wall-clock delays.
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -552,15 +560,38 @@ def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
if queued_dag_ids := [r for r in queued_results if r is not None]:
cls.logger().debug("consuming dag ids %s", queued_dag_ids)
+ @classmethod
+ def _queue_dagruns_nonpartitioned_mysql(
+ cls, asset_id: int, dags_to_queue: set[DagModel], event: AssetEvent,
session: Session
+ ) -> None:
+ from sqlalchemy import case
+ from sqlalchemy.dialects.mysql import insert
+
Review Comment:
Imports are introduced inside `_queue_dagruns_nonpartitioned_mysql()`. In
this codebase imports are generally kept at module scope; keeping them inside
the function adds avoidable runtime overhead and makes static analysis harder.
Unless there is a specific circular-import or optional-dependency reason, move
these imports to the top of the module (or document why they must remain local).
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4952,6 +4957,84 @@ def _lock_only_selected_asset(query, **_):
).one_or_none()
assert adrq_2 is not None
+ @pytest.mark.need_serialized_dag
+ @pytest.mark.backend("postgres", "mysql")
+ def test_create_dag_runs_when_concurrent_asset_events_created(self,
session: Session, dag_maker, caplog):
+ import random
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ ASSET_EVENT_COUNT = 100
+ asset = Asset(name="test_asset")
+ with dag_maker(dag_id="consumer", schedule=asset, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="simulate-asset-outlet", bash_command="echo
1")
+ dr = dag_maker.create_dagrun(run_id="asset-producer-run")
Review Comment:
`dr = dag_maker.create_dagrun(run_id="asset-producer-run")` is assigned but
not used anywhere in this test. Removing the unused variable (or using it to
populate the `AssetEvent` source fields if that’s the intent) will reduce
confusion and keep the test focused on the concurrency behavior under test.
```suggestion
dag_maker.create_dagrun(run_id="asset-producer-run")
```
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4952,6 +4957,84 @@ def _lock_only_selected_asset(query, **_):
).one_or_none()
assert adrq_2 is not None
+ @pytest.mark.need_serialized_dag
+ @pytest.mark.backend("postgres", "mysql")
+ def test_create_dag_runs_when_concurrent_asset_events_created(self,
session: Session, dag_maker, caplog):
+ import random
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ ASSET_EVENT_COUNT = 100
+ asset = Asset(name="test_asset")
+ with dag_maker(dag_id="consumer", schedule=asset, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="simulate-asset-outlet", bash_command="echo
1")
+ dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+ futures = []
+ consumed_asset_events = []
+
+ def create_asset_events(sleep):
+ import time
+
+ from sqlalchemy import inspect
+
+ with create_session() as session:
+ now = timezone.utcnow()
+ asset_event = AssetEvent(asset_id=asset_id, timestamp=now)
+ session.add(asset_event)
+ session.commit()
+ time.sleep(sleep) # sleep to simulate slow perforamcne
+ asset_manager = AssetManager()
+ if inspect(session.get_bind()).dialect.name == "postgresql":
+ asset_manager._queue_dagruns_nonpartitioned_postgres(
+ asset_id=asset_id, dags_to_queue=[dag_model],
event=asset_event, session=session
+ )
+ elif inspect(session.get_bind()).dialect.name == "mysql":
+ asset_manager._queue_dagruns_nonpartitioned_mysql(
+ asset_id=asset_id, dags_to_queue=[dag_model],
event=asset_event, session=session
+ )
+
+ return asset_event
+
+ with (
+ ThreadPoolExecutor() as exec,
+ caplog.at_level(
+ "WARNING",
+ logger="airflow.jobs.scheduler_job_runner",
+ ),
+ ):
+ for _ in range(ASSET_EVENT_COUNT):
+ future = exec.submit(create_asset_events, random.randint(0, 1))
+ futures.append(future)
Review Comment:
The test introduces randomized delays (`random.randint(0, 1)`) to drive
concurrency. This makes the test non-deterministic and can lead to intermittent
failures (e.g., different ordering causing different numbers of scheduler
iterations or longer runtimes).
Prefer a deterministic delay pattern (or seed the RNG) and/or explicit
synchronization so the test outcome does not depend on randomness.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4952,6 +4957,84 @@ def _lock_only_selected_asset(query, **_):
).one_or_none()
assert adrq_2 is not None
+ @pytest.mark.need_serialized_dag
+ @pytest.mark.backend("postgres", "mysql")
+ def test_create_dag_runs_when_concurrent_asset_events_created(self,
session: Session, dag_maker, caplog):
+ import random
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ ASSET_EVENT_COUNT = 100
+ asset = Asset(name="test_asset")
+ with dag_maker(dag_id="consumer", schedule=asset, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="simulate-asset-outlet", bash_command="echo
1")
+ dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+ futures = []
+ consumed_asset_events = []
+
+ def create_asset_events(sleep):
+ import time
+
+ from sqlalchemy import inspect
+
+ with create_session() as session:
+ now = timezone.utcnow()
+ asset_event = AssetEvent(asset_id=asset_id, timestamp=now)
+ session.add(asset_event)
+ session.commit()
+ time.sleep(sleep) # sleep to simulate slow perforamcne
+ asset_manager = AssetManager()
+ if inspect(session.get_bind()).dialect.name == "postgresql":
+ asset_manager._queue_dagruns_nonpartitioned_postgres(
+ asset_id=asset_id, dags_to_queue=[dag_model],
event=asset_event, session=session
+ )
+ elif inspect(session.get_bind()).dialect.name == "mysql":
+ asset_manager._queue_dagruns_nonpartitioned_mysql(
+ asset_id=asset_id, dags_to_queue=[dag_model],
event=asset_event, session=session
+ )
+
+ return asset_event
+
+ with (
+ ThreadPoolExecutor() as exec,
+ caplog.at_level(
+ "WARNING",
+ logger="airflow.jobs.scheduler_job_runner",
+ ),
+ ):
+ for _ in range(ASSET_EVENT_COUNT):
+ future = exec.submit(create_asset_events, random.randint(0, 1))
+ futures.append(future)
+ scheduler_job = Job()
+ prev_dr = None
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ for future in as_completed(futures, timeout=60):
+ asset = future.result()
Review Comment:
Inside the `as_completed` loop, `asset = future.result()` shadows the
earlier `asset` variable (the `Asset` definition used for scheduling). Even
though the shadowed value isn’t used afterwards, this makes the test harder to
read.
Rename the loop variable (e.g., `asset_event`) or drop the assignment if
it’s unused.
```suggestion
future.result()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]