Nataneljpwd commented on code in PR #64109:
URL: https://github.com/apache/airflow/pull/64109#discussion_r3282660332


##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5883,41 +5971,55 @@ def _running_counts():
             EmptyOperator(task_id="mytask")
 
         dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.QUEUED)
-        for _ in range(9):
+        for _ in range(29):
             dr = dag_maker.create_dagrun_after(dr, 
run_type=DagRunType.SCHEDULED, state=State.QUEUED)
 
         # initial state -- nothing is running
         assert dag1_non_b_running == 0
         assert dag1_b_running == 0
         assert total_running == 0
-        assert session.scalar(select(func.count(DagRun.id))) == 46
+        assert session.scalar(select(func.count(DagRun.id))) == 66
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 36
 
         # now let's run it once
         self.job_runner._start_queued_dagruns(session)
         session.flush()
 
         # after running the scheduler one time, observe that only one dag run 
is started
-        # this is because there are 30 runs for dag 1 so neither the backfills 
nor
-        # any runs for dag2 get started
+        # and 3 backfill dagruns are started
+        # this is because there are 30 queued dagruns, many of which get 
filtered because their DAGs
+        # have already reached max_active_runs
+        # and so due to the default dagruns-to-examine limit, we look at the 
first 20 dagruns that CAN be run
+        # according to the max_active_runs parameter, meaning 3 backfill runs 
will start, 1 non-backfill,
+        # and all runnable dagruns for dag2
         assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20
         dag1_non_b_running, dag1_b_running, total_running = _running_counts()
         assert dag1_non_b_running == 1
-        assert dag1_b_running == 0
-        assert total_running == 1
-        assert session.scalar(select(func.count()).select_from(DagRun)) == 46
+        assert dag1_b_running == 3
+        assert total_running == 20
+        assert session.scalar(select(func.count()).select_from(DagRun)) == 66
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 36
+        # now we finish all lower priority backfill tasks, and observe new 
higher priority tasks are started

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to