Thank you for bringing this PR to my attention. I haven't studied the code but I ran a quick test on the branch and this completely ignores the limit on scheduled tasks per dag or dag_run. It grabbed 70 tasks from the first dag and then moved all 70 to QUEUED without any further checks.
This is how I tested it https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1 On Sun, Aug 3, 2025 at 1:44 PM asquator <[email protected]> wrote: > Hello, > > This is a known issue stemming from the optimistic scheduling strategy > used in Airflow. We do address this in the above-mentioned PR. I want to > note that there are many cases where this problem may appear—it was > originally detected with pools, but we are striving to fix it in all cases, > such as the one described here with max_active_tis_per_dag, by switching to > pessimistic scheduling with SQL window functions. While the current > strategy simply pulls the max_tis tasks and drops the ones that do not meet > the constraints, the new strategy will pull only the tasks that are > actually ready to be scheduled and that comply with all concurrency limits. > > It would be very helpful for pushing this change to production if you > could assist us in alpha-testing it. > > See also: > https://github.com/apache/airflow/discussions/49160 > > > Sent with Proton Mail secure email. > > On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif <[email protected]> > wrote: > > > i think most of your issues will be addressed by > > https://github.com/apache/airflow/pull/53492 > > The PR code can be tested with Breeze so you can set it up and see if it > > solves the problem this will also help with confirming it's the right > fix. > > > > On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias [email protected] > > > > wrote: > > > > > Hello, > > > > > > The scheduler is very efficient when running a large amount of dags > with up > > > to 1000 tasks each. But in our case, we have dags with as many as > 10.000 > > > tasks. And in that scenario the scheduler and worker throughput drops > > > significantly. Even if you have 1 such large dag with scheduled tasks, > the > > > performance hit becomes noticeable. > > > > > > We did some digging and we found that the issue comes from the > scheduler's > > > _executable_task_instances_to_queued > > > < > > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647 > > > > > > method. > > > In particular with the db query here > > > < > > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375 > > > > > > and > > > examining the results here > > > < > > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425 > > > > > > . > > > > > > If you have a very large dag, and its tasks have been scheduled, then > the > > > scheduler will keep examining the tasks for queueing, even if it has > > > reached the maximum number of active tasks for that particular dag. > Once > > > that fails, then it will move on to examine the scheduled tasks of the > next > > > dag or dag_run in line. > > > > > > This is inefficient and causes the throughput of the scheduler and the > > > workers to drop significantly. If there are available slots in the > pool and > > > the max parallelism hasn't been reached yet, then the scheduler should > stop > > > processing a dag that has already reached its max capacity of active > tasks. > > > > > > In addition, the number of scheduled tasks picked for examining, > should be > > > capped at the number of max active tasks if that's lower than the query > > > limit. If the active limit is 10 and we already have 5 running, then > we can > > > queue at most 5 tasks. In that case, we shouldn't examine more than > that. > > > > > > There is already a patch with the changes mentioned above. IMO, these > > > changes should be enabled/disabled with a config flag and not by > default > > > because not everyone has the same needs as us. In our testing, adding a > > > limit on the tasks retrieved from the db requires more processing on > the > > > query which actually makes things worse when you have multiple small > dags. > > > > > > Here is a simple test case that makes the benefits of the improvements > > > noticeable > > > > > > - we have 3 dags with thousands of tasks each > > > - for simplicity let's have 1 dag_run per dag > > > - triggering them takes some time and due to that, the FIFO order of > the > > > tasks is very clear > > > - e.g. 1000 tasks from dag1 were scheduled first and then 200 tasks > > > from dag2 etc. > > > - the executor has parallelism=100 and slots_available=100 which means > > > that it can run up to 100 tasks concurrently > > > - max_active_tasks_per_dag is 4 which means that we can have up to 4 > > > tasks running per dag. > > > - For 3 dags, it means that we can run up to 12 tasks at the same > > > time (4 tasks from each dag) > > > - max tis per query are set to 32, meaning that we can examine up to 32 > > > scheduled tasks if there are available pool slots > > > > > > If we were to run the scheduler loop repeatedly until it queues 12 > tasks > > > and test the part that examines the scheduled tasks and queues them, > then > > > > > > - with the query limit > > > - 1 iteration, total time 0.05 > > > - During the iteration > > > - we have parallelism 100, available slots 100 and query limit 32 > > > which means that it will examine up to 32 scheduled tasks > > > - it can queue up to 100 tasks > > > - examines 12 tasks (instead of 32) > > > - 4 tasks from dag1, reached max for the dag > > > - 4 tasks from dag2, reached max for the dag > > > - and 4 tasks from dag3, reached max for the dag > > > - queues 4 from dag1, reaches max for the dag and moves on > > > - queues 4 from dag2, reaches max for the dag and moves on > > > - queues 4 from dag3, reaches max for the dag and moves on > > > - stops queueing because we have reached the maximum per dag, > > > although there are slots for more tasks > > > - iteration finishes > > > - without > > > - 3 iterations, total time 0.29 > > > - During iteration 1 > > > - Examines 32 tasks, all from dag1 (due to FIFO) > > > - queues 4 from dag1 and tries to queue the other 28 but fails > > > - During iteration 2 > > > - examines the next 32 tasks from dag1 > > > - it can't queue any of them because it has reached the max for > > > dag1, since the previous 4 are still running > > > - examines 32 tasks from dag2 > > > - queues 4 from dag2 and tries to queue the other 28 but fails > > > - During iteration 3 > > > - examines the next 32 tasks from dag1, same tasks that were > > > examined in iteration 2 > > > - it can't queue any of them because it has reached the max for > > > dag1 and the first 4 are still running > > > - examines 32 tasks from dag2 , can't queue any of them because > > > it has reached max for dag2 as well > > > - examines 32 tasks from dag3 > > > - queues 4 from dag3 and tries to queue the other 28 but fails > > > > > > I used very low values for all the configs so that I can make the point > > > clear and easy to understand. If we increase them, then this patch also > > > makes the task selection more fair and the resource distribution more > even. > > > > > > I would appreciate it if anyone familiar with the scheduler's code can > > > confirm this and also provide any feedback. > > > > > > Additionally, I have one question regarding the query limit. Should it > be > > > per dag_run or per dag? I've noticed that max_active_tasks_per_dag has > > > been changed to provide a value per dag_run but the docs haven't been > > > updated. > > > > > > Thank you! > > > > > > Regards, > > > Christos Bisias > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
