Copilot commented on code in PR #64294:
URL: https://github.com/apache/airflow/pull/64294#discussion_r3025336864
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -628,27 +642,65 @@ def get_running_dag_runs_to_examine(cls, session:
Session) -> ScalarResult[DagRu
from airflow.models.backfill import BackfillDagRun
from airflow.models.dag import DagModel
- query = (
- select(cls)
- .with_hint(cls, "USE INDEX (idx_dag_run_running_dags)",
dialect_name="mysql")
- .where(cls.state == DagRunState.RUNNING)
- .join(DagModel, DagModel.dag_id == cls.dag_id)
- .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id,
isouter=True)
- .where(
- DagModel.is_paused == false(),
- DagModel.is_stale == false(),
- )
- .order_by(
- nulls_first(cast("ColumnElement[Any]",
BackfillDagRun.sort_ordinal), session=session),
- nulls_first(cast("ColumnElement[Any]",
cls.last_scheduling_decision), session=session),
- cls.run_after,
+ def _get_dagrun_query(
+ filters: list[ColumnElement[bool]], order_by:
list[SQLColumnExpression[Any]], limit: int
+ ):
+ return (
+ select(DagRun)
+ .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)",
dialect_name="mysql")
+ .where(DagRun.state == DagRunState.RUNNING)
+ .join(DagModel, DagModel.dag_id == cls.dag_id)
+ .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id,
isouter=True)
+ .where(*filters)
+ .order_by(*order_by)
+ .limit(limit)
)
- .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
+
+ filters = [
+ DagRun.run_after <= func.now(),
+ DagModel.is_paused == false(),
+ DagModel.is_stale == false(),
+ ]
+
+ order = [
+ nulls_first(cast("ColumnElement[Any]",
BackfillDagRun.sort_ordinal), session=session),
+ nulls_first(cast("ColumnElement[Any]",
DagRun.last_scheduling_decision), session=session),
+ DagRun.run_after,
+ ]
+
+ new_dagruns_to_examine = cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE
+ dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE
+
+ if new_dagruns_to_examine < 0:
+ log.warning("'max_new_dagruns_per_loop_to_schedule' is smaller
than 0, ignoring configuration")
+ new_dagruns_to_examine = 0
Review Comment:
If `max_new_dagruns_per_loop_to_schedule` is configured as a negative value,
this `warning` will be emitted on every scheduler loop, potentially spamming
logs. Consider clamping/validating the config once when
`DEFAULT_NEW_DAGRUNS_TO_EXAMINE` is initialized (and logging once), instead of
warning on every call.
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -993,6 +993,121 @@ def test_wait_for_downstream(self, dag_maker, session,
prev_ti_state, is_ti_sche
schedulable_tis = [ti.task_id for ti in decision.schedulable_tis]
assert (upstream.task_id in schedulable_tis) == is_ti_schedulable
+ def
test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0(
+ self, session, dag_maker
+ ):
+
+ DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0
Review Comment:
These tests mutate the class-level `DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE`
but never restore it, which can make later tests order-dependent. Please use
`monkeypatch.setattr(...)` (or save/restore the original value) so the change
is scoped to the test.
```suggestion
self, session, dag_maker, monkeypatch
):
monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 0)
```
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -39,6 +39,7 @@
Index,
Integer,
PrimaryKeyConstraint,
+ SQLColumnExpression,
String,
Text,
Review Comment:
`SQLColumnExpression` is only used for typing in `_get_dagrun_query`, and
this file already keeps most SQLAlchemy typing-only imports under
`TYPE_CHECKING`. Consider moving this import under `TYPE_CHECKING` (or using an
already-imported typing like `ColumnElement[Any]`) to avoid adding an extra
runtime dependency/import surface.
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -993,6 +993,121 @@ def test_wait_for_downstream(self, dag_maker, session,
prev_ti_state, is_ti_sche
schedulable_tis = [ti.task_id for ti in decision.schedulable_tis]
assert (upstream.task_id in schedulable_tis) == is_ti_schedulable
+ def
test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0(
+ self, session, dag_maker
+ ):
+
+ DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0
+
+ def create_dagruns(
+ last_scheduling_decision: datetime.datetime | None = None,
+ count: int = 20,
+ ):
+ dagrun = dag_maker.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ run_after=datetime.datetime(2024, 1, 1),
+ )
+ dagrun.last_scheduling_decision = last_scheduling_decision
+ session.merge(dagrun)
+ for _ in range(count - 1):
+ dagrun = dag_maker.create_dagrun_after(
+ dagrun,
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ run_after=datetime.datetime(2024, 1, 1),
+ )
+
+ dagrun.last_scheduling_decision = last_scheduling_decision
+ session.merge(dagrun)
+
+ with dag_maker(
+ dag_id="dummy_dag",
+ schedule=datetime.timedelta(days=1),
+ start_date=datetime.datetime(2024, 1, 1),
+ session=session,
+ ):
+ EmptyOperator(task_id="dummy_task")
+
+ create_dagruns(None, 10)
+
+ with dag_maker(
+ dag_id="dummy_dag2",
+ schedule=datetime.timedelta(days=1),
+ start_date=datetime.datetime(2024, 1, 1),
+ session=session,
+ ):
+ EmptyOperator(task_id="dummy_task2")
+
+ create_dagruns(func.now(), 20)
+
+ session.flush()
+
+ dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session))
+
+ assert len([dagrun for dagrun in dagruns if
dagrun.last_scheduling_decision is None]) == 10
+
+ assert len([dagrun for dagrun in dagruns if
dagrun.last_scheduling_decision is not None]) == 10
+
Review Comment:
This test name implies it covers the "< 0" configuration path, but it sets
`DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0`, so the warning/clamping branch is
never exercised. Set a negative value here (e.g. -1) and assert the expected
warning (via `caplog`) to actually cover the behavior.
```suggestion
self, session, dag_maker, caplog
):
original_value = DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE
try:
# Set a negative value to exercise the "< 0" clamping and
warning path.
DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = -1
# Capture warnings emitted when handling the negative
configuration value.
caplog.set_level("WARNING", logger="airflow.models.dagrun")
def create_dagruns(
last_scheduling_decision: datetime.datetime | None = None,
count: int = 20,
):
dagrun = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)
dagrun.last_scheduling_decision = last_scheduling_decision
session.merge(dagrun)
for _ in range(count - 1):
dagrun = dag_maker.create_dagrun_after(
dagrun,
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
run_after=datetime.datetime(2024, 1, 1),
)
dagrun.last_scheduling_decision =
last_scheduling_decision
session.merge(dagrun)
with dag_maker(
dag_id="dummy_dag",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task")
create_dagruns(None, 10)
with dag_maker(
dag_id="dummy_dag2",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2024, 1, 1),
session=session,
):
EmptyOperator(task_id="dummy_task2")
create_dagruns(func.now(), 20)
session.flush()
dagruns =
list(DagRun.get_running_dag_runs_to_examine(session=session))
# Verify that the negative value was ignored/clamped by checking
for the warning.
assert any(
"DEFAULT_NEW_DAGRUNS_TO_EXAMINE" in record.getMessage()
and ("negative" in record.getMessage() or "< 0" in
record.getMessage())
for record in caplog.records
)
assert len([dagrun for dagrun in dagruns if
dagrun.last_scheduling_decision is None]) == 10
assert len([dagrun for dagrun in dagruns if
dagrun.last_scheduling_decision is not None]) == 10
finally:
DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = original_value
```
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -993,6 +993,121 @@ def test_wait_for_downstream(self, dag_maker, session,
prev_ti_state, is_ti_sche
schedulable_tis = [ti.task_id for ti in decision.schedulable_tis]
assert (upstream.task_id in schedulable_tis) == is_ti_schedulable
+ def
test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0(
+ self, session, dag_maker
+ ):
+
+ DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0
+
+ def create_dagruns(
+ last_scheduling_decision: datetime.datetime | None = None,
+ count: int = 20,
+ ):
+ dagrun = dag_maker.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ run_after=datetime.datetime(2024, 1, 1),
+ )
+ dagrun.last_scheduling_decision = last_scheduling_decision
+ session.merge(dagrun)
+ for _ in range(count - 1):
+ dagrun = dag_maker.create_dagrun_after(
+ dagrun,
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ run_after=datetime.datetime(2024, 1, 1),
+ )
+
+ dagrun.last_scheduling_decision = last_scheduling_decision
+ session.merge(dagrun)
+
+ with dag_maker(
+ dag_id="dummy_dag",
+ schedule=datetime.timedelta(days=1),
+ start_date=datetime.datetime(2024, 1, 1),
+ session=session,
+ ):
+ EmptyOperator(task_id="dummy_task")
+
+ create_dagruns(None, 10)
+
+ with dag_maker(
+ dag_id="dummy_dag2",
+ schedule=datetime.timedelta(days=1),
+ start_date=datetime.datetime(2024, 1, 1),
+ session=session,
+ ):
+ EmptyOperator(task_id="dummy_task2")
+
+ create_dagruns(func.now(), 20)
+
+ session.flush()
+
+ dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session))
+
+ assert len([dagrun for dagrun in dagruns if
dagrun.last_scheduling_decision is None]) == 10
+
+ assert len([dagrun for dagrun in dagruns if
dagrun.last_scheduling_decision is not None]) == 10
+
+ def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self,
session, dag_maker):
+
+ DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 10
Review Comment:
Same issue here: `DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE` is modified without
being restored, which can leak state across tests. Please scope this via
`monkeypatch` or restore the previous value in a `finally` block.
```suggestion
def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self,
session, dag_maker, monkeypatch):
monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 10)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]