Thank you for bringing this PR to my attention.

I haven't studied the code but I ran a quick test on the branch and this
completely ignores the limit on scheduled tasks per dag or dag_run. It
grabbed 70 tasks from the first dag and then moved all 70 to QUEUED without
any further checks.

This is how I tested it
https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1

On Sun, Aug 3, 2025 at 1:44 PM asquator <[email protected]> wrote:

> Hello,
>
> This is a known issue stemming from the optimistic scheduling strategy
> used in Airflow. We do address this in the above-mentioned PR. I want to
> note that there are many cases where this problem may appear—it was
> originally detected with pools, but we are striving to fix it in all cases,
> such as the one described here with max_active_tis_per_dag, by switching to
> pessimistic scheduling with SQL window functions. While the current
> strategy simply pulls the max_tis tasks and drops the ones that do not meet
> the constraints, the new strategy will pull only the tasks that are
> actually ready to be scheduled and that comply with all concurrency limits.
>
> It would be very helpful for pushing this change to production if you
> could assist us in alpha-testing it.
>
> See also:
> https://github.com/apache/airflow/discussions/49160
>
>
> Sent with Proton Mail secure email.
>
> On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif <[email protected]>
> wrote:
>
> > i think most of your issues will be addressed by
> > https://github.com/apache/airflow/pull/53492
> > The PR code can be tested with Breeze so you can set it up and see if it
> > solves the problem this will also help with confirming it's the right
> fix.
> >
> > On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias [email protected]
> >
> > wrote:
> >
> > > Hello,
> > >
> > > The scheduler is very efficient when running a large amount of dags
> with up
> > > to 1000 tasks each. But in our case, we have dags with as many as
> 10.000
> > > tasks. And in that scenario the scheduler and worker throughput drops
> > > significantly. Even if you have 1 such large dag with scheduled tasks,
> the
> > > performance hit becomes noticeable.
> > >
> > > We did some digging and we found that the issue comes from the
> scheduler's
> > > _executable_task_instances_to_queued
> > > <
> > >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
> > >
> > > method.
> > > In particular with the db query here
> > > <
> > >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
> > >
> > > and
> > > examining the results here
> > > <
> > >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
> > >
> > > .
> > >
> > > If you have a very large dag, and its tasks have been scheduled, then
> the
> > > scheduler will keep examining the tasks for queueing, even if it has
> > > reached the maximum number of active tasks for that particular dag.
> Once
> > > that fails, then it will move on to examine the scheduled tasks of the
> next
> > > dag or dag_run in line.
> > >
> > > This is inefficient and causes the throughput of the scheduler and the
> > > workers to drop significantly. If there are available slots in the
> pool and
> > > the max parallelism hasn't been reached yet, then the scheduler should
> stop
> > > processing a dag that has already reached its max capacity of active
> tasks.
> > >
> > > In addition, the number of scheduled tasks picked for examining,
> should be
> > > capped at the number of max active tasks if that's lower than the query
> > > limit. If the active limit is 10 and we already have 5 running, then
> we can
> > > queue at most 5 tasks. In that case, we shouldn't examine more than
> that.
> > >
> > > There is already a patch with the changes mentioned above. IMO, these
> > > changes should be enabled/disabled with a config flag and not by
> default
> > > because not everyone has the same needs as us. In our testing, adding a
> > > limit on the tasks retrieved from the db requires more processing on
> the
> > > query which actually makes things worse when you have multiple small
> dags.
> > >
> > > Here is a simple test case that makes the benefits of the improvements
> > > noticeable
> > >
> > > - we have 3 dags with thousands of tasks each
> > > - for simplicity let's have 1 dag_run per dag
> > > - triggering them takes some time and due to that, the FIFO order of
> the
> > > tasks is very clear
> > > - e.g. 1000 tasks from dag1 were scheduled first and then 200 tasks
> > > from dag2 etc.
> > > - the executor has parallelism=100 and slots_available=100 which means
> > > that it can run up to 100 tasks concurrently
> > > - max_active_tasks_per_dag is 4 which means that we can have up to 4
> > > tasks running per dag.
> > > - For 3 dags, it means that we can run up to 12 tasks at the same
> > > time (4 tasks from each dag)
> > > - max tis per query are set to 32, meaning that we can examine up to 32
> > > scheduled tasks if there are available pool slots
> > >
> > > If we were to run the scheduler loop repeatedly until it queues 12
> tasks
> > > and test the part that examines the scheduled tasks and queues them,
> then
> > >
> > > - with the query limit
> > > - 1 iteration, total time 0.05
> > > - During the iteration
> > > - we have parallelism 100, available slots 100 and query limit 32
> > > which means that it will examine up to 32 scheduled tasks
> > > - it can queue up to 100 tasks
> > > - examines 12 tasks (instead of 32)
> > > - 4 tasks from dag1, reached max for the dag
> > > - 4 tasks from dag2, reached max for the dag
> > > - and 4 tasks from dag3, reached max for the dag
> > > - queues 4 from dag1, reaches max for the dag and moves on
> > > - queues 4 from dag2, reaches max for the dag and moves on
> > > - queues 4 from dag3, reaches max for the dag and moves on
> > > - stops queueing because we have reached the maximum per dag,
> > > although there are slots for more tasks
> > > - iteration finishes
> > > - without
> > > - 3 iterations, total time 0.29
> > > - During iteration 1
> > > - Examines 32 tasks, all from dag1 (due to FIFO)
> > > - queues 4 from dag1 and tries to queue the other 28 but fails
> > > - During iteration 2
> > > - examines the next 32 tasks from dag1
> > > - it can't queue any of them because it has reached the max for
> > > dag1, since the previous 4 are still running
> > > - examines 32 tasks from dag2
> > > - queues 4 from dag2 and tries to queue the other 28 but fails
> > > - During iteration 3
> > > - examines the next 32 tasks from dag1, same tasks that were
> > > examined in iteration 2
> > > - it can't queue any of them because it has reached the max for
> > > dag1 and the first 4 are still running
> > > - examines 32 tasks from dag2 , can't queue any of them because
> > > it has reached max for dag2 as well
> > > - examines 32 tasks from dag3
> > > - queues 4 from dag3 and tries to queue the other 28 but fails
> > >
> > > I used very low values for all the configs so that I can make the point
> > > clear and easy to understand. If we increase them, then this patch also
> > > makes the task selection more fair and the resource distribution more
> even.
> > >
> > > I would appreciate it if anyone familiar with the scheduler's code can
> > > confirm this and also provide any feedback.
> > >
> > > Additionally, I have one question regarding the query limit. Should it
> be
> > > per dag_run or per dag? I've noticed that max_active_tasks_per_dag has
> > > been changed to provide a value per dag_run but the docs haven't been
> > > updated.
> > >
> > > Thank you!
> > >
> > > Regards,
> > > Christos Bisias
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to