Copilot commented on code in PR #64294:
URL: https://github.com/apache/airflow/pull/64294#discussion_r3025336864


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -628,27 +642,65 @@ def get_running_dag_runs_to_examine(cls, session: 
Session) -> ScalarResult[DagRu
         from airflow.models.backfill import BackfillDagRun
         from airflow.models.dag import DagModel
 
-        query = (
-            select(cls)
-            .with_hint(cls, "USE INDEX (idx_dag_run_running_dags)", 
dialect_name="mysql")
-            .where(cls.state == DagRunState.RUNNING)
-            .join(DagModel, DagModel.dag_id == cls.dag_id)
-            .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, 
isouter=True)
-            .where(
-                DagModel.is_paused == false(),
-                DagModel.is_stale == false(),
-            )
-            .order_by(
-                nulls_first(cast("ColumnElement[Any]", 
BackfillDagRun.sort_ordinal), session=session),
-                nulls_first(cast("ColumnElement[Any]", 
cls.last_scheduling_decision), session=session),
-                cls.run_after,
+        def _get_dagrun_query(
+            filters: list[ColumnElement[bool]], order_by: 
list[SQLColumnExpression[Any]], limit: int
+        ):
+            return (
+                select(DagRun)
+                .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", 
dialect_name="mysql")
+                .where(DagRun.state == DagRunState.RUNNING)
+                .join(DagModel, DagModel.dag_id == cls.dag_id)
+                .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, 
isouter=True)
+                .where(*filters)
+                .order_by(*order_by)
+                .limit(limit)
             )
-            .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
+
+        filters = [
+            DagRun.run_after <= func.now(),
+            DagModel.is_paused == false(),
+            DagModel.is_stale == false(),
+        ]
+
+        order = [
+            nulls_first(cast("ColumnElement[Any]", 
BackfillDagRun.sort_ordinal), session=session),
+            nulls_first(cast("ColumnElement[Any]", 
DagRun.last_scheduling_decision), session=session),
+            DagRun.run_after,
+        ]
+
+        new_dagruns_to_examine = cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE
+        dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE
+
+        if new_dagruns_to_examine < 0:
+            log.warning("'max_new_dagruns_per_loop_to_schedule' is smaller 
than 0, ignoring configuration")
+            new_dagruns_to_examine = 0

Review Comment:
   If `max_new_dagruns_per_loop_to_schedule` is configured as a negative value, 
this `warning` will be emitted on every scheduler loop, potentially spamming 
logs. Consider clamping/validating the config once when 
`DEFAULT_NEW_DAGRUNS_TO_EXAMINE` is initialized (and logging once), instead of 
warning on every call.



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -993,6 +993,121 @@ def test_wait_for_downstream(self, dag_maker, session, 
prev_ti_state, is_ti_sche
         schedulable_tis = [ti.task_id for ti in decision.schedulable_tis]
         assert (upstream.task_id in schedulable_tis) == is_ti_schedulable
 
+    def 
test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0(
+        self, session, dag_maker
+    ):
+
+        DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0

Review Comment:
   These tests mutate the class-level `DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE` 
but never restore it, which can make later tests order-dependent. Please use 
`monkeypatch.setattr(...)` (or save/restore the original value) so the change 
is scoped to the test.
   ```suggestion
           self, session, dag_maker, monkeypatch
       ):
   
           monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 0)
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -39,6 +39,7 @@
     Index,
     Integer,
     PrimaryKeyConstraint,
+    SQLColumnExpression,
     String,
     Text,

Review Comment:
   `SQLColumnExpression` is only used for typing in `_get_dagrun_query`, and 
this file already keeps most SQLAlchemy typing-only imports under 
`TYPE_CHECKING`. Consider moving this import under `TYPE_CHECKING` (or using an 
already-imported typing like `ColumnElement[Any]`) to avoid adding an extra 
runtime dependency/import surface.



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -993,6 +993,121 @@ def test_wait_for_downstream(self, dag_maker, session, 
prev_ti_state, is_ti_sche
         schedulable_tis = [ti.task_id for ti in decision.schedulable_tis]
         assert (upstream.task_id in schedulable_tis) == is_ti_schedulable
 
+    def 
test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0(
+        self, session, dag_maker
+    ):
+
+        DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0
+
+        def create_dagruns(
+            last_scheduling_decision: datetime.datetime | None = None,
+            count: int = 20,
+        ):
+            dagrun = dag_maker.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                state=State.RUNNING,
+                run_after=datetime.datetime(2024, 1, 1),
+            )
+            dagrun.last_scheduling_decision = last_scheduling_decision
+            session.merge(dagrun)
+            for _ in range(count - 1):
+                dagrun = dag_maker.create_dagrun_after(
+                    dagrun,
+                    run_type=DagRunType.SCHEDULED,
+                    state=State.RUNNING,
+                    run_after=datetime.datetime(2024, 1, 1),
+                )
+
+                dagrun.last_scheduling_decision = last_scheduling_decision
+                session.merge(dagrun)
+
+        with dag_maker(
+            dag_id="dummy_dag",
+            schedule=datetime.timedelta(days=1),
+            start_date=datetime.datetime(2024, 1, 1),
+            session=session,
+        ):
+            EmptyOperator(task_id="dummy_task")
+
+        create_dagruns(None, 10)
+
+        with dag_maker(
+            dag_id="dummy_dag2",
+            schedule=datetime.timedelta(days=1),
+            start_date=datetime.datetime(2024, 1, 1),
+            session=session,
+        ):
+            EmptyOperator(task_id="dummy_task2")
+
+        create_dagruns(func.now(), 20)
+
+        session.flush()
+
+        dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session))
+
+        assert len([dagrun for dagrun in dagruns if 
dagrun.last_scheduling_decision is None]) == 10
+
+        assert len([dagrun for dagrun in dagruns if 
dagrun.last_scheduling_decision is not None]) == 10
+

Review Comment:
   This test name implies it covers the "< 0" configuration path, but it sets 
`DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0`, so the warning/clamping branch is 
never exercised. Set a negative value here (e.g. -1) and assert the expected 
warning (via `caplog`) to actually cover the behavior.
   ```suggestion
           self, session, dag_maker, caplog
       ):
   
           original_value = DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE
           try:
               # Set a negative value to exercise the "< 0" clamping and 
warning path.
               DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = -1
   
               # Capture warnings emitted when handling the negative 
configuration value.
               caplog.set_level("WARNING", logger="airflow.models.dagrun")
   
               def create_dagruns(
                   last_scheduling_decision: datetime.datetime | None = None,
                   count: int = 20,
               ):
                   dagrun = dag_maker.create_dagrun(
                       run_type=DagRunType.SCHEDULED,
                       state=State.RUNNING,
                       run_after=datetime.datetime(2024, 1, 1),
                   )
                   dagrun.last_scheduling_decision = last_scheduling_decision
                   session.merge(dagrun)
                   for _ in range(count - 1):
                       dagrun = dag_maker.create_dagrun_after(
                           dagrun,
                           run_type=DagRunType.SCHEDULED,
                           state=State.RUNNING,
                           run_after=datetime.datetime(2024, 1, 1),
                       )
   
                       dagrun.last_scheduling_decision = 
last_scheduling_decision
                       session.merge(dagrun)
   
               with dag_maker(
                   dag_id="dummy_dag",
                   schedule=datetime.timedelta(days=1),
                   start_date=datetime.datetime(2024, 1, 1),
                   session=session,
               ):
                   EmptyOperator(task_id="dummy_task")
   
               create_dagruns(None, 10)
   
               with dag_maker(
                   dag_id="dummy_dag2",
                   schedule=datetime.timedelta(days=1),
                   start_date=datetime.datetime(2024, 1, 1),
                   session=session,
               ):
                   EmptyOperator(task_id="dummy_task2")
   
               create_dagruns(func.now(), 20)
   
               session.flush()
   
               dagruns = 
list(DagRun.get_running_dag_runs_to_examine(session=session))
   
               # Verify that the negative value was ignored/clamped by checking 
for the warning.
               assert any(
                   "DEFAULT_NEW_DAGRUNS_TO_EXAMINE" in record.getMessage()
                   and ("negative" in record.getMessage() or "< 0" in 
record.getMessage())
                   for record in caplog.records
               )
   
               assert len([dagrun for dagrun in dagruns if 
dagrun.last_scheduling_decision is None]) == 10
   
               assert len([dagrun for dagrun in dagruns if 
dagrun.last_scheduling_decision is not None]) == 10
           finally:
               DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = original_value
   ```



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -993,6 +993,121 @@ def test_wait_for_downstream(self, dag_maker, session, 
prev_ti_state, is_ti_sche
         schedulable_tis = [ti.task_id for ti in decision.schedulable_tis]
         assert (upstream.task_id in schedulable_tis) == is_ti_schedulable
 
+    def 
test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0(
+        self, session, dag_maker
+    ):
+
+        DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0
+
+        def create_dagruns(
+            last_scheduling_decision: datetime.datetime | None = None,
+            count: int = 20,
+        ):
+            dagrun = dag_maker.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                state=State.RUNNING,
+                run_after=datetime.datetime(2024, 1, 1),
+            )
+            dagrun.last_scheduling_decision = last_scheduling_decision
+            session.merge(dagrun)
+            for _ in range(count - 1):
+                dagrun = dag_maker.create_dagrun_after(
+                    dagrun,
+                    run_type=DagRunType.SCHEDULED,
+                    state=State.RUNNING,
+                    run_after=datetime.datetime(2024, 1, 1),
+                )
+
+                dagrun.last_scheduling_decision = last_scheduling_decision
+                session.merge(dagrun)
+
+        with dag_maker(
+            dag_id="dummy_dag",
+            schedule=datetime.timedelta(days=1),
+            start_date=datetime.datetime(2024, 1, 1),
+            session=session,
+        ):
+            EmptyOperator(task_id="dummy_task")
+
+        create_dagruns(None, 10)
+
+        with dag_maker(
+            dag_id="dummy_dag2",
+            schedule=datetime.timedelta(days=1),
+            start_date=datetime.datetime(2024, 1, 1),
+            session=session,
+        ):
+            EmptyOperator(task_id="dummy_task2")
+
+        create_dagruns(func.now(), 20)
+
+        session.flush()
+
+        dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session))
+
+        assert len([dagrun for dagrun in dagruns if 
dagrun.last_scheduling_decision is None]) == 10
+
+        assert len([dagrun for dagrun in dagruns if 
dagrun.last_scheduling_decision is not None]) == 10
+
+    def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, 
session, dag_maker):
+
+        DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 10

Review Comment:
   Same issue here: `DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE` is modified without 
being restored, which can leak state across tests. Please scope this via 
`monkeypatch` or restore the previous value in a `finally` block.
   ```suggestion
       def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, 
session, dag_maker, monkeypatch):
   
           monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 10)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to