kaxil commented on code in PR #64109:
URL: https://github.com/apache/airflow/pull/64109#discussion_r2981174784
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -690,8 +708,13 @@ def get_queued_dag_runs_to_set_running(cls, session:
Session) -> ScalarResult[Da
# the one done in this query verifies that the dag is not
maxed out
# it could return many more dag runs than runnable if there is
even
# capacity for 1. this could be improved.
- coalesce(running_drs.c.num_running, text("0"))
- < coalesce(Backfill.max_active_runs, DagModel.max_active_runs),
+ available_dagruns_rn.c.rn
+ <= coalesce(
+ Backfill.max_active_runs,
+ DagModel.max_active_runs,
+ airflow_conf.getint("core", "max_active_runs_per_dag"),
+ )
Review Comment:
Adding `airflow_conf.getint("core", "max_active_runs_per_dag")` as a third
fallback in the coalesce chain changes behavior from the original query, which
only used `coalesce(Backfill.max_active_runs, DagModel.max_active_runs)`.
`DagModel.max_active_runs` should always have a value (it defaults from
config at DAG parse time), so this fallback shouldn't be needed. If it IS
needed, that indicates a data integrity issue that should be investigated
separately rather than papered over here.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3229,6 +3221,7 @@ def _try_to_load_executor(
return executor
+ # TODO: remove as it moved to the get_queued_dag_runs_to_set_running
method in dagrun.py
Review Comment:
This TODO is incorrect. `_set_exceeds_max_active_runs` sets the
`exceeds_max_non_backfill` flag on `DagModel`, which is used by
`dags_needing_dagruns` (line 725 of dag.py) to prevent *creating* new QUEUED
runs. That's a different concern from limiting which QUEUED runs get promoted
to RUNNING.
They serve different purposes and both are needed:
- `_set_exceeds_max_active_runs` → controls DagRun *creation*
- `get_queued_dag_runs_to_set_running` → controls QUEUED→RUNNING *promotion*
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -656,9 +656,27 @@ def get_queued_dag_runs_to_set_running(cls, session:
Session) -> ScalarResult[Da
.subquery()
)
+ available_dagruns_rn = (
+ select(
+ DagRun.dag_id,
+ DagRun.id,
+ func.row_number()
+ .over(partition_by=[DagRun.dag_id, DagRun.backfill_id],
order_by=DagRun.logical_date)
+ .label("rn"),
+ )
+ .where(DagRun.state == DagRunState.QUEUED)
+ .subquery()
+ )
+
query = (
select(cls)
Review Comment:
The `row_number()` approach is a good direction for solving the starvation
problem. One thing to verify: when `max_active_runs - num_running` evaluates to
0 or negative (because runs started between the subquery snapshot and the outer
query), `rn <= 0` will correctly exclude all queued runs for that DAG. Worth a
test case for that edge case.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1913,14 +1913,6 @@ def _create_dag_runs(self, dag_models:
Collection[DagModel], session: Session) -
)
for dag_model in dag_models:
- if dag_model.exceeds_max_non_backfill:
Review Comment:
This `exceeds_max_non_backfill` check guards DagRun *creation* (moving to
QUEUED state), not the QUEUED→RUNNING promotion. The starvation fix belongs in
`get_queued_dag_runs_to_set_running` (which this PR also modifies), not here.
Removing this check means the scheduler will keep creating QUEUED DagRuns
even when the DAG is at `max_active_runs`. The `dags_needing_dagruns` query
also checks this flag (line 725 of dag.py), but it can go stale between the
query and the loop execution, so this in-loop check is the safety net.
I'd keep this guard — it's unrelated to the starvation fix.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -3230,33 +3230,6 @@ def
test_cleanup_methods_all_called_multiple_executors(self, mock_executors):
for executor in self.job_runner.executors:
executor.end.assert_called_once()
Review Comment:
These deleted tests
(`test_queued_dagruns_stops_creating_when_max_active_is_reached` and
`test_more_runs_are_not_created_when_max_active_runs_is_reached`) verify that
`_create_dag_runs` respects `max_active_runs`. Since the
`exceeds_max_non_backfill` guard in `_create_dag_runs` should be kept (see
other comment), these tests should stay too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]