To add more detail about the "last scheduling decision" for TI....

Right now there's a field on dag run called last_scheduling_decision.  It's
used when figuring out which dag runs to "schedule" which means, to figure
out which of their TIs to set to scheduled state.

After they are set to scheduled  state, then we get to the context of this
thread, _executable_task_instances_to_queued.

In this context we no longer use `last_scheduling_decision`.

We could add another field to dag run e.g. last_queueing_decision.  Then
whatever TIs we see, figure out the dag runs that they belong to, and
update that field.  Then the TIs for more recently considered dag runs
would sort after the less-recently considered ones.  And this would seem to
improve starvation problems.

I was not suggesting putting it on TI just cus that seems a bit low level
and not likely to be necessary and too much db churn but, it's possible.

In any case, I look forward to seeing the updated query.


On Fri, Aug 8, 2025 at 4:13 AM Ash Berlin-Taylor <[email protected]> wrote:

> Please please please: Give us DB benchmark figures. It almost doesnNothing
> else matters if this performance hits DBs too hard.
>
> What about Daniel’s idea in the slack about adding a
> “last_scheduling_decision” or similar to TaskInstance too, and order by
> that so that we don’t just repeatedly hit the same TI? On the surface, that
> seems like a much more maintainable solution with 90-99% of the same net
> effect (that TI's that can’t get queued don’t starve out everything else)
> without a complex query and possible DB load, not to mention possible big
> differences between mysql and Postgres.
>
> > On 8 Aug 2025, at 11:34, asquator <[email protected]> wrote:
> >
> > #53492 status update:
> >
> > We optimized the query significantly by abandoning the nesting of window
> functions and joining on TI.id in the outer query. It's still a heavier
> query than we had before for fetching tasks (with lighter python work), so
> benchmarking the DB is required, but it's somewhat difficult because there
> don't seem to be unified, agreed upon workloads for benchmarking the
> scheduler. Running dynamic benchmarks with multiple deployments as
> suggested by @ashb is challenging too due to high resource requirements. I
> remember there was an AIP-59 for similar cases, but I'm not sure if it's
> fit here. We're open for any suggestions as to how to advance.
> >
> > Regarding #54103, I think it's difficult to maintain and extend just
> because it solves just one out of four possible starvation cases. If it's
> merged, the TI per DAG issue is solved forever, but TI per pool starvation
> (issue #45636) will still be present. What this PR does is computing a
> lateral join on DAG runs and ensuring the query never fetches more TIs for
> a DAG run than it can run. It's roughly equivalent to one of the window
> functions in #53492. If we want to also solve pool starvation, we'll have
> to add another lateral join. It's all the same except performance, but
> let's be creative - we can reduce the performance overhead of #53492 to
> that  of #54103 by simply computing only one window per scheduler
> iteration, and it can be done in a round-robin fashion (for windows), such
> that every #concurrency_limits = 4 iterations we break starvation for at
> least one of the limits. This way we have the same performance but solve
> all the cases, at least once in 4 iterations. If we see that windows don't
> cause a big overhead, we can run two of them in every iteration. This can
> be another configuration called scheduler.optimism_level that defines how
> many window functions we include, while handling other limits
> optimistically. This requires lots of coding and testing, but the idea is
> clear.
> >
> > I say we should handle all the starvation issues in a comprehensive,
> encompassing logic change for the scheduler.
> >
> >
> > On Thursday, August 7th, 2025 at 4:43 AM, Christos Bisias <
> [email protected]> wrote:
> >
> >>> You're talking about https://github.com/apache/airflow/pull/53492/
> right?
> >>
> >>
> >> Yes.
> >>
> >>> Where is the PR from @Christos?
> >>
> >>
> >> https://github.com/apache/airflow/pull/54103
> >>
> >>
> >>
> >> On Wed, Aug 6, 2025 at 23:51 Daniel Standish
> >> [email protected] wrote:
> >>
> >>>> IMO, the approach on the patch isn't easily maintainable. Most of the
> >>>> calculations are performed by SQL in a huge query.
> >>>> It would be my preference to have many smaller queries and do part of
> the
> >>>> calculations in python. This will be easier to understand, maintain
> and
> >>>> debug in the future. Also, it will be easier to unit test.
> >>>
> >>> You're talking about https://github.com/apache/airflow/pull/53492/
> right?
> >>> I agree. I share the skepticism that it must be one big ugly query. At
> a
> >>> minimum it needs a lot more work and refinement. Not something that
> should
> >>> be merged in the current state, even as experimental.
> >>>
> >>> Where is the PR from @Christos?
> >>>
> >>> On Wed, Aug 6, 2025 at 12:54 PM Jens Scheffler
> <[email protected]
> >>>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I was (until now) not be able to re-read all the Slack discussion and
> >>>> like to make this latest at the weekend. I also like Jarek fear that
> the
> >>>> optimization makes the Scheduler rather hard to maintain. We also had
> >>>> some points where we_thought_ we can contribute some optimizations
> >>>> especially for Mapped Tasks and then considered the complexity of
> Mapped
> >>>> Task Groups where the Depth-First Strategy would defeat all our
> drafted
> >>>> optimizations. So also in our current apporach we are cutting down the
> >>>> Dags in manageable pieces.
> >>>>
> >>>> So far (I believ, but anybody correct me if I am wrong) the scaling
> was
> >>>> always documented only with options, no real upper boundary (other
> than
> >>>> soft limtis) existing in the code. So the delivered product never
> >>>> confirmed fixed upper limits. It might be good also to consider that
> we
> >>>> document where we know there are natural or structural boundaries. So
> >>>> hope that I can read more details the next days.
> >>>>
> >>>> Jens
> >>>>
> >>>> On 06.08.25 10:31, Jarek Potiuk wrote:
> >>>>
> >>>>>> My main issue and the topic of this thread, has been that the
> >>>>>> scheduler
> >>>>>> does unnecessary work that leads to decreased throughput. My
> solution
> >>>>>> has
> >>>>>> been to limit the results of the query to the dag cap of active
> tasks
> >>>>>> that
> >>>>>> the user has defined.
> >>>>>
> >>>>> Yes. I understand that. There are situations that cause this
> >>>>> "unnecessary
> >>>>> work" to be excessive and lead to lower performance and more memory
> >>>>> usage.
> >>>>> This is quite "normal". No system in the world is optimized for all
> >>>>> kinds
> >>>>> of scenarios and sometimes you need to make trade-offs - for example
> >>>>> lower
> >>>>> performance and maintainability (and support for MySQL and Postgres
> as
> >>>>> Ash
> >>>>> pointed out in some other threads) which we have to make. There are
> >>>>> various
> >>>>> optimisation goals we can chase: optimal performance and no wasted
> >>>>> resources in certain situations and configurations is one of (many)
> >>>>> goals
> >>>>> we might have. Other goals might include: easier maintainability,
> >>>>> better
> >>>>> community collaboration, simplicity, less code to maintain,
> >>>>> testability,
> >>>>> also (what I mentioned before) sometimes deliberate not handling
> >>>>> certain
> >>>>> scenarios and introducing friction might be deliberate decision we
> >>>>> can
> >>>>> take in order to push our users in the direction we want them to go.
> >>>>> Yes.
> >>>>> As community and maintainers we do not have to always "follow" our
> >>>>> users
> >>>>> behaviour - we can (and we often do) educate our users and show them
> >>>>> better
> >>>>> ways of doing things.
> >>>>>
> >>>>> For example we had a LONG discussion whether to introduce caching of
> >>>>> Variable values during Dag parsing - because we knew our users are
> >>>>> often
> >>>>> using Variables in top-level code of their Dags and this leads to a
> lot
> >>>>> of
> >>>>> waste and high CPU and I/O usage by Dag processor. We finally
> >>>>> implemented
> >>>>> it as an experimental feature, but it was not at all certain we will
> -
> >>>>> we
> >>>>> had to carefully consider what we are trading in exchange for that
> >>>>> performance - and whether it's worth it.
> >>>>>
> >>>>> Same here - I understand: there are some cases (arguably rather
> niche -
> >>>>> with very large Dags) where scheduler does unnecessary processing and
> >>>>> performance could be improved. Now - we need to understand what
> >>>>> trade-offs
> >>>>> we need to make as maintainers and community (including our users) if
> >>>>> we
> >>>>> want to address it. We need to know what complexity is involved,
> >>>>> whether
> >>>>> it
> >>>>> will work with Postgres/MySQL and SQlite, whether we will be able to
> >>>>> continue debugging and testing it. And whether we want to drive away
> >>>>> our
> >>>>> user from the modularisation strategy (smaller Dags) that we think
> >>>>> makes
> >>>>> more sense than bigger Dags. We have to think about what happens
> next.
> >>>>> If
> >>>>> we make "huge Dags" first-class-citizens, will it mean that we will
> >>>>> have
> >>>>> to
> >>>>> redesign our UI to support them? What should we do when someone opens
> >>>>> up
> >>>>> an
> >>>>> issue "I have this 1000000 task Dag and I cannot open Airflow UI - it
> >>>>> crashes hard and makes my Airflow instance unusable - please fix it
> >>>>> ASAP".
> >>>>> I certainly would like to avoid such a situation to stress our friend
> >>>>> maintainers who work on UI - so also they should have a say on how
> >>>>> feasible
> >>>>> it is to make it "easy" to have "huge Dags" for them.
> >>>>>
> >>>>> All those factors should be taken into account when you make a
> >>>>> "product"
> >>>>> decision. Performance gains for particular cases is just one of many
> >>>>> factors to consider - and often not the most important ones.
> >>>>>
> >>>>> J.
> >>>>>
> >>>>> On Wed, Aug 6, 2025 at 7:34 AM Christos Bisias [email protected]
> >>>>> wrote:
> >>>>>
> >>>>>> We also have a dag with dynamic task mapping that can grow
> immensely.
> >>>>>>
> >>>>>> I've been looking at https://github.com/apache/airflow/pull/53492.
> >>>>>>
> >>>>>> My main issue and the topic of this thread, has been that the
> >>>>>> scheduler
> >>>>>> does unnecessary work that leads to decreased throughput. My
> solution
> >>>>>> has
> >>>>>> been to limit the results of the query to the dag cap of active
> tasks
> >>>>>> that
> >>>>>> the user has defined.
> >>>>>>
> >>>>>> The patch is more focused on the available pool slots. I get the
> idea
> >>>>>> that
> >>>>>> if we can only examine and queue as many tasks as available slots,
> >>>>>> then
> >>>>>> we
> >>>>>> will be efficiently utilizing the available slots to the max, the
> >>>>>> throughput will increase and my issue will be solved as well.
> >>>>>>
> >>>>>> IMO, the approach on the patch isn't easily maintainable. Most of
> the
> >>>>>> calculations are performed by SQL in a huge query.
> >>>>>>
> >>>>>> It would be my preference to have many smaller queries and do part
> of
> >>>>>> the
> >>>>>> calculations in python. This will be easier to understand, maintain
> >>>>>> and
> >>>>>> debug in the future. Also, it will be easier to unit test.
> >>>>>>
> >>>>>> On Tue, Aug 5, 2025 at 10:20 PM Jarek Potiuk [email protected]
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Just a comment here - I am also not opposed as well if
> optimizations
> >>>>>>> will
> >>>>>>> be implemented without impacting the more "regular"cases. And -
> >>>>>>> important -
> >>>>>>> without adding huge complexity.
> >>>>>>>
> >>>>>>> The SQL queries I saw in recent PRs and discussions look both
> "smart"
> >>>>>>> and
> >>>>>>> "scary" at the same time. Optimizations like that tend to lead to
> >>>>>>> obfuscated, difficult to understand and reason code and "smart"
> >>>>>>> solutions -
> >>>>>>> sometimes "too smart". And when it ends up with one or two people
> >>>>>>> only
> >>>>>>> being able to debug and fix problems connected with those, things
> >>>>>>> become
> >>>>>>> a
> >>>>>>> little hairy. So whatever we do there, it must be not only
> >>>>>>> "smart"
> >>>>>>> but
> >>>>>>> also easy to read and well tested - so that anyone can run the
> tests
> >>>>>>> easily
> >>>>>>> and reproduce potential failure cases.
> >>>>>>>
> >>>>>>> And yes I know I am writing this as someone who - for years was the
> >>>>>>> only
> >>>>>>> one to understand our complex CI setup. But I think over the last
> two
> >>>>>>> years
> >>>>>>> we are definitely going into, simpler, easier to understand setup
> and
> >>>>>>> we
> >>>>>>> have more people on board who know how to deal with it and I think
> >>>>>>> that
> >>>>>>> is
> >>>>>>> a very good direction we are taking :). And I am sure that when I
> go
> >>>>>>> for
> >>>>>>> my
> >>>>>>> planned 3 weeks holidays before the summit, everything will work as
> >>>>>>> smoothly as when I am here - at least.
> >>>>>>>
> >>>>>>> Also I think there is quite a difference (when it comes to
> >>>>>>> scheduling)
> >>>>>>> when
> >>>>>>> you have mapped tasks versus "regular tasks". I think Airflow even
> >>>>>>> currently behaves rather differently in those two different cases,
> >>>>>>> and
> >>>>>>> also
> >>>>>>> it has a well-thought and optimized UI experience to handle
> thousands
> >>>>>>> of
> >>>>>>> them. Also the work of David Blain on Lazy Expandable Task Mapping
> >>>>>>> will
> >>>>>>> push the boundaries of what is possible there as well:
> >>>>>>> https://github.com/apache/airflow/pull/51391. Even if we solve
> >>>>>>> scheduling
> >>>>>>> optimization - the UI and ability to monitor such huge Dags is
> still
> >>>>>>> likely
> >>>>>>> not something our UI was designed for.
> >>>>>>>
> >>>>>>> And I am fully on board with "splitting to even smaller pieces" and
> >>>>>>> "modularizing" things - and "modularizing and splitting big Dags
> into
> >>>>>>> smaller Dags" feels like precisely what should be done. And I think
> >>>>>>> it
> >>>>>>> would be a nice idea to try it and follow and see if you can't
> >>>>>>> achieve
> >>>>>>> the
> >>>>>>> same results without adding complexity.
> >>>>>>>
> >>>>>>> J.
> >>>>>>>
> >>>>>>> On Tue, Aug 5, 2025 at 8:47 PM Ash Berlin-Taylor [email protected]
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Yeah dynamic task mapping is a good case where you could easily
> end
> >>>>>>>> up
> >>>>>>>> with thousands of tasksof in a dag.
> >>>>>>>>
> >>>>>>>> As I like to say, Airflow is a broad church and if we’re can
> >>>>>>>> reasonably
> >>>>>>>> support diverse workloads without impacting others (either the
> >>>>>>>> workloads
> >>>>>>>> out our available to support and maintain etc) then I’m all for
> it.
> >>>>>>>>
> >>>>>>>> In addition to your two items I’d like to add
> >>>>>>>>
> >>>>>>>> 3. That it doesn’t increase the db’s CPU disproportionally to the
> >>>>>>>> increased task throughput
> >>>>>>>>
> >>>>>>>>> On 5 Aug 2025, at 19:14, asquator [email protected]
> >>>>>>>>> wrote:
> >>>>>>>>> I'm glad this issue finally got enough attention and we can move
> >>>>>>>>> it
> >>>>>>>>> forward.
> >>>>>>>>> I took a look at @Christos's patch and it makes sense overall,
> it's
> >>>>>>>>> fine
> >>>>>>>>> for the specific problem they experienced with max_active_tasks
> >>>>>>>>> limit.
> >>>>>>>>> For those unfamiliar with the core problem, the bug has a plenty
> of
> >>>>>>>>> variations where starvation happens due to different concurrency
> >>>>>>>>> limitations being nearly satiated, which creates the opportunity
> for
> >>>>>>>>> the
> >>>>>>>>> scheduler to pull many tasks and schedule none of them.
> >>>>>>>>> To reproduce this bug, you need two conditions:
> >>>>>>>>> 1. Many tasks (>> max_tis) belonging to one "pool", where "pool"
> is
> >>>>>>>>> some
> >>>>>>>>> concurrency limitation of Airflow. Note that originally the bug
> was
> >>>>>>>>> discovered in context of task pools (see
> >>>>>>>>> https://github.com/apache/airflow/issues/45636).
> >>>>>>>>> 2. The tasks are short enough (or the parallelism is large
> enough)
> >>>>>>>>> for
> >>>>>>>>> the tasks from the nearly starved pool to free some slots in
> every
> >>>>>>>>> scheduler's iteration.
> >>>>>>>>> When we discovered a bug that starved our less prioritized pool,
> >>>>>>>>> even
> >>>>>>>>> when the most prioritized pool was almost full (thanks to
> >>>>>>>>> @nevcohen),
> >>>>>>>>> we
> >>>>>>>>> wanted to implement a similar patch @Christos suggested above,
> but
> >>>>>>>>> for
> >>>>>>>>> pools. But then we realized this issue can arise due to limits
> >>>>>>>>> different
> >>>>>>>>> from task pools, including:
> >>>>>>>>> max_active_tasks
> >>>>>>>>> max_active_tis_per_dag
> >>>>>>>>> max_active_tis_per_dagrun
> >>>>>>>>>
> >>>>>>>>> So we were able to predict the forecoming bug reports for
> different
> >>>>>>>>> kinds of starvation, and we started working on the most general
> >>>>>>>>> solution
> >>>>>>>>> which is the topic of this discussion.
> >>>>>>>>> I want to also answer @potiuk regarding "why you need such large
> >>>>>>>>> DAGs",
> >>>>>>>>> but I will be brief.
> >>>>>>>>> Airflow is an advanced tool for scheduling large data operations,
> >>>>>>>>> and
> >>>>>>>>> over the years it has pushed to production many features that
> lead
> >>>>>>>>> to
> >>>>>>>>> organizations writing DAGs that contain thousands of tasks. Most
> >>>>>>>>> prominent
> >>>>>>>>> one is dynamic task mapping. This feature made us realize we can
> >>>>>>>>> implement
> >>>>>>>>> a batching work queue pattern and create a task for every unit we
> >>>>>>>>> have
> >>>>>>>>> to
> >>>>>>>>> process, say it's a file in a specific folder, a path in the
> >>>>>>>>> filesystem,
> >>>>>>>>> a
> >>>>>>>>> pointer to some data stored in object storage, etc. We like to
> think
> >>>>>>>>> in
> >>>>>>>>> terms of splitting the work into many tasks. Is it good? I don't
> >>>>>>>>> know,
> >>>>>>>>> but
> >>>>>>>>> Airflow has already stepped onto this path, and we have to make
> it
> >>>>>>>>> technologically possible (if we can).
> >>>>>>>>> Nevertheless, even if such DAGs are considered too big and
> >>>>>>>>> splitting
> >>>>>>>>> them is a good idea (though you still have nothing to do with
> mapped
> >>>>>>>>> tasks
> >>>>>>>>> - we create tens of thousands of them sometimes and expect them
> to
> >>>>>>>>> be
> >>>>>>>>> processed in parallel), this issue does not only address the
> >>>>>>>>> described
> >>>>>>>>> case, but many others, including prioritized pools, mapped tasks
> or
> >>>>>>>>> max_active_runs starvation on large backfills.
> >>>>>>>>> The only part that's missing now is measuring query time (static
> >>>>>>>>> benchmarks) and measuring overall scheduling metrics in
> production
> >>>>>>>>> workloads (dynamic benchmarks).
> >>>>>>>>> We're working hard on this crucial part now.
> >>>>>>>>>
> >>>>>>>>> We'd be happy to have any assistance from the community as regard
> >>>>>>>>> to
> >>>>>>>>> the
> >>>>>>>>> dynamic benchmarks, because every workload is different and it's
> >>>>>>>>> pretty
> >>>>>>>>> difficult to simulate the general case in such a
> hard-to-reproduce
> >>>>>>>>> issue.
> >>>>>>>>> We have to make sure that:
> >>>>>>>>> 1. In a busy workload, the new logic boosts the scheduler's
> >>>>>>>>> throughput.
> >>>>>>>>> 2. In a light workload, the nested windowing doesn't
> significantly
> >>>>>>>>> slow
> >>>>>>>>> down the computation.
> >>>>>>>>>
> >>>>>>>>>> On Monday, August 4th, 2025 at 9:00 PM, Christos Bisias <
> >>>>>>>>>> [email protected]> wrote:
> >>>>>>>>>> I created a draft PR for anyone interested to take a look at the
> >>>>>>>>>> code
> >>>>>>>>>> https://github.com/apache/airflow/pull/54103
> >>>>>>>>>>
> >>>>>>>>>> I was able to demonstrate the issue in the unit test with much
> >>>>>>>>>> fewer
> >>>>>>>>>> tasks.
> >>>>>>>>>> All we need is the tasks brought back by the db query to belong
> to
> >>>>>>>>>> the
> >>>>>>>>>> same
> >>>>>>>>>> dag_run or dag. This can happen when the first SCHEDULED tasks
> in
> >>>>>>>>>> line
> >>>>>>>>>> to
> >>>>>>>>>> be examined are at least as many as the number of the tis per
> >>>>>>>>>> query.
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Aug 4, 2025 at 8:37 PM Daniel Standish
> >>>>>>>>>> [email protected] wrote:
> >>>>>>>>>>
> >>>>>>>>>>>> The configurability was my recommendation for
> >>>>>>>>>>>> https://github.com/apache/airflow/pull/53492
> >>>>>>>>>>>> Given the fact that this change is at the heart of Airflow I
> >>>>>>>>>>>> think
> >>>>>>>>>>>> the
> >>>>>>>>>>>> changes should be experimental where users can switch between
> >>>>>>>>>>>> different
> >>>>>>>>>>>> strategies/modes of the scheduler.
> >>>>>>>>>>>> If and when we have enough data to support that specific
> option
> >>>>>>>>>>>> is
> >>>>>>>>>>>> always
> >>>>>>>>>>>> better we can make decisions accordingly.
> >>>>>>>>>>>> Yeah I guess looking at #53492
> >>>>>>>>>>>> https://github.com/apache/airflow/pull/53492 it does seem too
> >>>>>>>>>>>> risky
> >>>>>>>>>>>> to
> >>>>>>>>>>>> just change the behavior in airflow without releasing it
> first as
> >>>>>>>>>>>> experimental.
> >>>>>>>>>>>
> >>>>>>>>>>> I doubt we can get sufficient real world testing without doing
> >>>>>>>>>>> that.
> >>>>>>>>>>> So if this is introduced, I think it should just be introduced
> as
> >>>>>>>>>>> experimental optimization. And the intention would be that
> >>>>>>>>>>> ultimately
> >>>>>>>>>>> there will only be one scheduling mode, and this is just a way
> to
> >>>>>>>>>>> test
> >>>>>>>>>>> this
> >>>>>>>>>>> out more widely. Not that we are intending to have two
> scheduling
> >>>>>>>>>>> code
> >>>>>>>>>>> paths on a permanent basis.
> >>>>>>>>>>>
> >>>>>>>>>>> WDYT
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Aug 4, 2025 at 12:50 AM Christos Bisias
> >>>>>>>>>>> [email protected]
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>>> So my question to you is: is it impossible, or just demanding
> >>>>>>>>>>>>> or
> >>>>>>>>>>>>> difficult
> >>>>>>>>>>>>> to split your Dags into smaller dags connected with asset
> aware
> >>>>>>>>>>>>> scheduling?
> >>>>>>>>>>>>> Jarek, I'm going to discuss this with the team and I will get
> >>>>>>>>>>>>> you
> >>>>>>>>>>>>> an
> >>>>>>>>>>>>> answer
> >>>>>>>>>>>>> on that.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I've shared this again on the thread
> >>>
> >>>
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
> >>>
> >>>>>>>>>>>> I haven't created a PR because this is just a POC and it's
> also
> >>>>>>>>>>>> setting a
> >>>>>>>>>>>> limit per dag. I would like to get feedback on whether it's
> >>>>>>>>>>>> better
> >>>>>>>>>>>> to
> >>>>>>>>>>>> make
> >>>>>>>>>>>> it per dag or per dag_run.
> >>>>>>>>>>>> I can create a draft PR if that's helpful and makes it easier
> to
> >>>>>>>>>>>> add
> >>>>>>>>>>>> comments.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Let me try to explain the issue better. From a high level
> >>>>>>>>>>>> overview,
> >>>>>>>>>>>> the
> >>>>>>>>>>>> scheduler
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. moves tasks to SCHEDULED
> >>>>>>>>>>>> 2. runs a query to fetch SCHEDULED tasks from the db
> >>>>>>>>>>>> 3. examines the tasks
> >>>>>>>>>>>> 4. moves tasks to QUEUED
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm focusing on step 2 and afterwards. The current code
> doesn't
> >>>>>>>>>>>> take
> >>>>>>>>>>>> into
> >>>>>>>>>>>> account the max_active_tasks_per_dag. When it runs the query
> it
> >>>>>>>>>>>> fetches
> >>>>>>>>>>>> up to max_tis which is determined here
> >>>>>>>>>>>> <
> >>>
> >>>
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L697-L705
> >>>
> >>>>>>>>>>>> .
> >>>>>>>>>>>>
> >>>>>>>>>>>> For example,
> >>>>>>>>>>>>
> >>>>>>>>>>>> - if the query number is 32
> >>>>>>>>>>>> - all 32 tasks in line belong to the same dag, dag1
> >>>>>>>>>>>> - we are not concerned how the scheduler picks them
> >>>>>>>>>>>> - dag1 has max_active_tasks set to 5
> >>>>>>>>>>>>
> >>>>>>>>>>>> The current code will
> >>>>>>>>>>>>
> >>>>>>>>>>>> - get 32 tasks from dag1
> >>>>>>>>>>>> - start examining them one by one
> >>>>>>>>>>>> - once 5 are moved to QUEUED, it won't stop, it will keep
> >>>>>>>>>>>> examining
> >>>>>>>>>>>> the other 27 but won't be able to queue them because it has
> >>>>>>>>>>>> reached
> >>>>>>>>>>>> the
> >>>>>>>>>>>> limit
> >>>>>>>>>>>>
> >>>>>>>>>>>> In the next loop, although we have reached the maximum number
> of
> >>>>>>>>>>>> tasks
> >>>>>>>>>>>> for
> >>>>>>>>>>>> dag1, the query will fetch again 32 tasks from dag1 to examine
> >>>>>>>>>>>> them
> >>>>>>>>>>>> and
> >>>>>>>>>>>> try to queue them.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The issue is that it gets more tasks than it can queue from
> the
> >>>>>>>>>>>> db
> >>>>>>>>>>>> and
> >>>>>>>>>>>> then
> >>>>>>>>>>>> examines them all.
> >>>>>>>>>>>>
> >>>>>>>>>>>> This all leads to unnecessary processing that builds up and
> the
> >>>>>>>>>>>> more
> >>>>>>>>>>>> load
> >>>>>>>>>>>> there is on the system, the more the throughput drops for the
> >>>>>>>>>>>> scheduler
> >>>>>>>>>>>> and
> >>>>>>>>>>>> the workers.
> >>>>>>>>>>>>
> >>>>>>>>>>>> What I'm proposing is to adjust the query in step 2, to check
> >>>>>>>>>>>> the
> >>>>>>>>>>>> max_active_tasks_per_dag
> >>>>>>>>>>>>
> >>>>>>>>>>>>> run a query to fetch SCHEDULED tasks from the db
> >>>>>>>>>>>>> If a dag has already reached the maximum number of tasks in
> >>>>>>>>>>>>> active
> >>>>>>>>>>>>> states,
> >>>>>>>>>>>>> it will be skipped by the query.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Don't we already stop examining at that point? I guess there's
> >>>>>>>>>>>> two
> >>>>>>>>>>>> things
> >>>>>>>>>>>>
> >>>>>>>>>>>>> you might be referring to. One is, which TIs come out of the
> db
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>> into
> >>>>>>>>>>>>> python, and the other is, what we do in python. Just might be
> >>>>>>>>>>>>> helpful
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>> be clear about the specific enhancements & changes you are
> >>>>>>>>>>>>> making.
> >>>>>>>>>>>>> I think that if we adjust the query and fetch the right
> number
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>> tasks,
> >>>>>>>>>>>>> then we won't have to make changes to what is done in python.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Aug 4, 2025 at 8:01 AM Daniel Standish
> >>>>>>>>>>>> [email protected] wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> @Christos Bisias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If you have a very large dag, and its tasks have been
> >>>>>>>>>>>>> scheduled,
> >>>>>>>>>>>>> then
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> scheduler will keep examining the tasks for queueing, even
> if
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>> has
> >>>>>>>>>>>>>> reached the maximum number of active tasks for that
> particular
> >>>>>>>>>>>>>> dag.
> >>>>>>>>>>>>>> Once
> >>>>>>>>>>>>>> that fails, then it will move on to examine the scheduled
> >>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> next
> >>>>>>>>>>>>>> dag or dag_run in line.
> >>>>>>>>>>>>>> Can you make this a little more precise? There's some
> >>>>>>>>>>>>>> protection
> >>>>>>>>>>>>>> against
> >>>>>>>>>>>>>> "starvation" i.e. dag runs recently considered should go to
> the
> >>>>>>>>>>>>>> back
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> line next time.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Maybe you could clarify why / how that's not working / not
> >>>>>>>>>>>>> optimal
> >>>>>>>>>>>>> /
> >>>>>>>>>>>>> how
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>> improve.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If there are available slots in the pool and
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> the max parallelism hasn't been reached yet, then the
> >>>>>>>>>>>>>> scheduler
> >>>>>>>>>>>>>> should
> >>>>>>>>>>>>>> stop
> >>>>>>>>>>>>>> processing a dag that has already reached its max capacity
> of
> >>>>>>>>>>>>>> active
> >>>>>>>>>>>>>> tasks.
> >>>>>>>>>>>>>> If a dag run (or dag) is already at max capacity, it doesn't
> >>>>>>>>>>>>>> really
> >>>>>>>>>>>>>> matter
> >>>>>>>>>>>>>> if there are slots available or parallelism isn't reached --
> >>>>>>>>>>>>>> shouldn't
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>> stop anyway?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In addition, the number of scheduled tasks picked for
> >>>>>>>>>>>>> examining,
> >>>>>>>>>>>>> should
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> capped at the number of max active tasks if that's lower
> than
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> query
> >>>>>>>>>>>>>> limit. If the active limit is 10 and we already have 5
> >>>>>>>>>>>>>> running,
> >>>>>>>>>>>>>> then
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>> queue at most 5 tasks. In that case, we shouldn't examine
> more
> >>>>>>>>>>>>>> than
> >>>>>>>>>>>>>> that.
> >>>>>>>>>>>>>> Don't we already stop examining at that point? I guess
> there's
> >>>>>>>>>>>>>> two
> >>>>>>>>>>>>>> things
> >>>>>>>>>>>>>> you might be referring to. One is, which TIs come out of
> the db
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>> into
> >>>>>>>>>>>>>> python, and the other is, what we do in python. Just might
> be
> >>>>>>>>>>>>>> helpful
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>> be clear about the specific enhancements & changes you are
> >>>>>>>>>>>>>> making.
> >>>>>>>>>>>>>> There is already a patch with the changes mentioned above.
> IMO,
> >>>>>>>>>>>>>> these
> >>>>>>>>>>>>>> changes should be enabled/disabled with a config flag and
> not
> >>>>>>>>>>>>>> by
> >>>>>>>>>>>>>> default
> >>>>>>>>>>>>>> because not everyone has the same needs as us. In our
> testing,
> >>>>>>>>>>>>>> adding a
> >>>>>>>>>>>>>> limit on the tasks retrieved from the db requires more
> >>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>> on
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> query which actually makes things worse when you have
> multiple
> >>>>>>>>>>>>>> small
> >>>>>>>>>>>>>> dags.
> >>>>>>>>>>>>>> I would like to see a stronger case made for
> configurability.
> >>>>>>>>>>>>>> Why
> >>>>>>>>>>>>>> make
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>> configurable? If the performance is always better, it should
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>> made
> >>>>>>>>>>>>>> configurable. Unless it's merely released as an opt-in
> >>>>>>>>>>>>>> experimental
> >>>>>>>>>>>>>> feature. If it is worse in some profiles, let's be clear
> about
> >>>>>>>>>>>>>> that.
> >>>>>>>>>>>>>> I did not read anything after `Here is a simple test case
> that makes the benefits of the improvements noticeable` because, it seemed
> >>>>>>>>>>>>>> rather
> >>>>>>>>>>>>>> long
> >>>>>>>>>>>>>> winded detail about a test
> >>>>>>>>>>>>>> case. A higher level summary might be helpful to your
> audience.
> >>>>>>>>>>>>>> Is
> >>>>>>>>>>>>>> there
> >>>>>>>>>>>>>> a PR with your optimization. You wrote "there is a patch"
> but
> >>>>>>>>>>>>>> did
> >>>>>>>>>>>>>> not,
> >>>>>>>>>>>>>> unless I miss something, share it. I would take a look if
> you
> >>>>>>>>>>>>>> share
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>> though.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sun, Aug 3, 2025 at 5:08 PM Daniel Standish <
> >>>>>>>>>>>>> [email protected]> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Yes Ui is another part of this.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> At some point the grid and graph views completely stop
> making
> >>>>>>>>>>>>>> sense
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>> that volume, and another type of view would be required both
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>> usability
> >>>>>>>>>>>>>> and performance
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 11:04 AM Jens Scheffler
> >>>>>>>>>>>>>> [email protected]
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> We also have a current demand to have a workflow to execute
> >>>>>>>>>>>>>>> 10k
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> 100k
> >>>>>>>>>>>>>>> tasks. Together with @AutomationDev85 we are working on a
> >>>>>>>>>>>>>>> local
> >>>>>>>>>>>>>>> solution
> >>>>>>>>>>>>>>> because we also saw problems in the Scheduler that are not
> >>>>>>>>>>>>>>> linearly
> >>>>>>>>>>>>>>> scaling. And for sure not easy to be fixed. But from our
> >>>>>>>>>>>>>>> investigation
> >>>>>>>>>>>>>>> also there are other problems to be considered like UI will
> >>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>> potentially have problems.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I am a bit sceptic that PR 49160 completely fixes the
> >>>>>>>>>>>>>>> problems
> >>>>>>>>>>>>>>> mentioned
> >>>>>>>>>>>>>>> here and made some comments. I do not want to stop
> enthusiasm
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> fix
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> improve things but the Scheduler is quite complex and
> changed
> >>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>> made with care.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Actually I like the patch
> >>>
> >>>
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
> >>>
> >>>>>>>>>>>>>>> as it just adds some limit preventing scheduler to focus on
> >>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>> run. But complexity is a bit big for a "patch" :-D
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'd also propose atm the way that Jarek described and
> >>>>>>>>>>>>>>> split-up
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> Dag
> >>>>>>>>>>>>>>> into multiple parts (divide and conquer) for the moment.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Otherwise if there is a concrete demand on such large
> Dags...
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> maybe
> >>>>>>>>>>>>>>> need rather a broader initiative if we want to ensure 10k,
> >>>>>>>>>>>>>>> 100k,
> >>>>>>>>>>>>>>> 1M?
> >>>>>>>>>>>>>>> tasks are supported per Dag. Because depending on the
> >>>>>>>>>>>>>>> magnitude
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> strive for different approaches are needed.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jens
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 03.08.25 16:33, Daniel Standish wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Definitely an area of the scheduler with some opportunity
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> performance
> >>>>>>>>>>>>>>>> improvement.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I would just mention that, you should also attempt to
> >>>>>>>>>>>>>>>> include
> >>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>> performance testing at load / scale because, window
> >>>>>>>>>>>>>>>> functions
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>> going
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> be more expensive.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> What happens when you have many dags, many historical dag
> >>>>>>>>>>>>>>>> runs &
> >>>>>>>>>>>>>>>> TIs,
> >>>>>>>>>>>>>>>> lots
> >>>>>>>>>>>>>>>> of stuff running concurrently. You need to be mindful of
> the
> >>>>>>>>>>>>>>>> overall
> >>>>>>>>>>>>>>>> impact of such a change, and not look only at the time
> spent
> >>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>> scheduling
> >>>>>>>>>>>>>>>> this particular dag.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I did not look at the PRs yet, maybe you've covered this,
> >>>>>>>>>>>>>>>> but,
> >>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>> important.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias<
> >>>>>>>>>>>>>>>> [email protected]>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I'm going to review the PR code and test it more
> thoroughly
> >>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>> leaving
> >>>>>>>>>>>>>>>>> a comment.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> This is my code for reference
> >>>
> >>>
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
> >>>
> >>>>>>>>>>>>>>>>> The current version is setting a limit per dag, across
> all
> >>>>>>>>>>>>>>>>> dag_runs.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Please correct me if I'm wrong, but the PR looks like
> it's
> >>>>>>>>>>>>>>>>> changing
> >>>>>>>>>>>>>>>>> the way
> >>>>>>>>>>>>>>>>> that tasks are prioritized to avoid starvation. If that's
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> case,
> >>>>>>>>>>>>>>>>> I'm not
> >>>>>>>>>>>>>>>>> sure that this is the same issue. My proposal is that, if
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>> reached
> >>>>>>>>>>>>>>>>> the max resources assigned to a dag, then stop processing
> >>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> move on to the next one. I'm not changing how or which
> >>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>> picked.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 3:23 PM asquator<
> [email protected]
> >>>>>>>>>>>>>>>>> .invalid>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thank you for the feedback.
> >>>>>>>>>>>>>>>>>> Please, describe the case with failing limit checks in
> the
> >>>>>>>>>>>>>>>>>> PR
> >>>>>>>>>>>>>>>>>> (DAG's
> >>>>>>>>>>>>>>>>>> parameters and it's tasks' parameters and what fails to
> be
> >>>>>>>>>>>>>>>>>> checked)
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> we'll try to fix it ASAP before you can test it again.
> >>>>>>>>>>>>>>>>>> Let's
> >>>>>>>>>>>>>>>>>> continue
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> PR-related discussion in the PR itself.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias
> <
> >>>>>>>>>>>>>>>>>> [email protected]> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thank you for bringing this PR to my attention.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I haven't studied the code but I ran a quick test on
> the
> >>>>>>>>>>>>>>>>>>> branch
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> completely ignores the limit on scheduled tasks per dag
> >>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>> dag_run.
> >>>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>> grabbed 70 tasks from the first dag and then moved all
> 70
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> QUEUED
> >>>>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>> any further checks.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> This is how I tested it
> >>>
> >>>
> https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1
> >>>
> >>>>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 1:44 PM
> >>>>>>>>>>>>>>>>>>> [email protected]
> >>>>>>>>>>>>>>>>>>> .invalid
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> This is a known issue stemming from the optimistic
> >>>>>>>>>>>>>>>>>>>> scheduling
> >>>>>>>>>>>>>>>>>>>> strategy
> >>>>>>>>>>>>>>>>>>>> used in Airflow. We do address this in the
> >>>>>>>>>>>>>>>>>>>> above-mentioned
> >>>>>>>>>>>>>>>>>>>> PR. I
> >>>>>>>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> note that there are many cases where this problem may
> >>>>>>>>>>>>>>>>>>>> appear—it
> >>>>>>>>>>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>> originally detected with pools, but we are striving to
> >>>>>>>>>>>>>>>>>>>> fix
> >>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>> cases,
> >>>>>>>>>>>>>>>>>>>> such as the one described here with
> >>>>>>>>>>>>>>>>>>>> max_active_tis_per_dag,
> >>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>> switching to
> >>>>>>>>>>>>>>>>>>>> pessimistic scheduling with SQL window functions.
> While
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>> strategy simply pulls the max_tis tasks and drops the
> >>>>>>>>>>>>>>>>>>>> ones
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> meet
> >>>>>>>>>>>>>>>>>>>> the constraints, the new strategy will pull only the
> >>>>>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> actually ready to be scheduled and that comply with
> all
> >>>>>>>>>>>>>>>>>>>> concurrency
> >>>>>>>>>>>>>>>>>>>> limits.
> >>>>>>>>>>>>>>>>>>>> It would be very helpful for pushing this change to
> >>>>>>>>>>>>>>>>>>>> production
> >>>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> could assist us in alpha-testing it.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> See also:
> >>>>>>>>>>>>>>>>>>>> https://github.com/apache/airflow/discussions/49160
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Sent with Proton Mail secure email.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif
> >>>>>>>>>>>>>>>>>>>> [email protected]
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> i think most of your issues will be addressed by
> >>>>>>>>>>>>>>>>>>>>> https://github.com/apache/airflow/pull/53492
> >>>>>>>>>>>>>>>>>>>>> The PR code can be tested with Breeze so you can set
> it
> >>>>>>>>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> see
> >>>>>>>>>>>>>>>>>>>>> if it
> >>>>>>>>>>>>>>>>>>>>> solves the problem this will also help with
> confirming
> >>>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> right
> >>>>>>>>>>>>>>>>>>>>> fix.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias
> >>>>>>>>>>>>>>>>>>>>> [email protected]
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The scheduler is very efficient when running a large
> >>>>>>>>>>>>>>>>>>>>>> amount
> >>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> dags
> >>>>>>>>>>>>>>>>>>>>>> with up
> >>>>>>>>>>>>>>>>>>>>>> to 1000 tasks each. But in our case, we have dags
> with
> >>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>> many
> >>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>> 10.000
> >>>>>>>>>>>>>>>>>>>>>> tasks. And in that scenario the scheduler and worker
> >>>>>>>>>>>>>>>>>>>>>> throughput
> >>>>>>>>>>>>>>>>>>>>>> drops
> >>>>>>>>>>>>>>>>>>>>>> significantly. Even if you have 1 such large dag
> with
> >>>>>>>>>>>>>>>>>>>>>> scheduled
> >>>>>>>>>>>>>>>>>>>>>> tasks,
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> performance hit becomes noticeable.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> We did some digging and we found that the issue
> comes
> >>>>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> scheduler's
> >>>>>>>>>>>>>>>>>>>>>> _executable_task_instances_to_queued
> >>>>>>>>>>>>>>>>>>>>>> <
> >>>
> >>>
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
> >>>
> >>>>>>>>>>>>>>>>>>>>>> method.
> >>>>>>>>>>>>>>>>>>>>>> In particular with the db query here
> >>>>>>>>>>>>>>>>>>>>>> <
> >>>
> >>>
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
> >>>
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> examining the results here
> >>>>>>>>>>>>>>>>>>>>>> <
> >>>
> >>>
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
> >>>
> >>>>>>>>>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> If you have a very large dag, and its tasks have
> been
> >>>>>>>>>>>>>>>>>>>>>> scheduled,
> >>>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> scheduler will keep examining the tasks for
> queueing,
> >>>>>>>>>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>>>>>> reached the maximum number of active tasks for that
> >>>>>>>>>>>>>>>>>>>>>> particular
> >>>>>>>>>>>>>>>>>>>>>> dag.
> >>>>>>>>>>>>>>>>>>>>>> Once
> >>>>>>>>>>>>>>>>>>>>>> that fails, then it will move on to examine the
> >>>>>>>>>>>>>>>>>>>>>> scheduled
> >>>>>>>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>>>>>>>> dag or dag_run in line.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> This is inefficient and causes the throughput of the
> >>>>>>>>>>>>>>>>>>>>>> scheduler
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> workers to drop significantly. If there are
> available
> >>>>>>>>>>>>>>>>>>>>>> slots
> >>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> pool and
> >>>>>>>>>>>>>>>>>>>>>> the max parallelism hasn't been reached yet, then
> the
> >>>>>>>>>>>>>>>>>>>>>> scheduler
> >>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>> stop
> >>>>>>>>>>>>>>>>>>>>>> processing a dag that has already reached its max
> >>>>>>>>>>>>>>>>>>>>>> capacity
> >>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> active
> >>>>>>>>>>>>>>>>>>>>>> tasks.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> In addition, the number of scheduled tasks picked
> for
> >>>>>>>>>>>>>>>>>>>>>> examining,
> >>>>>>>>>>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>>>>>>>> capped at the number of max active tasks if that's
> >>>>>>>>>>>>>>>>>>>>>> lower
> >>>>>>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> query
> >>>>>>>>>>>>>>>>>>>>>> limit. If the active limit is 10 and we already
> have 5
> >>>>>>>>>>>>>>>>>>>>>> running,
> >>>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>> we can
> >>>>>>>>>>>>>>>>>>>>>> queue at most 5 tasks. In that case, we shouldn't
> >>>>>>>>>>>>>>>>>>>>>> examine
> >>>>>>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>> that.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> There is already a patch with the changes mentioned
> >>>>>>>>>>>>>>>>>>>>>> above.
> >>>>>>>>>>>>>>>>>>>>>> IMO,
> >>>>>>>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>> changes should be enabled/disabled with a config
> flag
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>>>>>> because not everyone has the same needs as us. In
> our
> >>>>>>>>>>>>>>>>>>>>>> testing,
> >>>>>>>>>>>>>>>>>>>>>> adding a
> >>>>>>>>>>>>>>>>>>>>>> limit on the tasks retrieved from the db requires
> more
> >>>>>>>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> query which actually makes things worse when you
> have
> >>>>>>>>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>>>> small
> >>>>>>>>>>>>>>>>>>>>>> dags.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Here is a simple test case that makes the benefits
> of
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> improvements
> >>>>>>>>>>>>>>>>>>>>>> noticeable
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> - we have 3 dags with thousands of tasks each
> >>>>>>>>>>>>>>>>>>>>>> - for simplicity let's have 1 dag_run per dag
> >>>>>>>>>>>>>>>>>>>>>> - triggering them takes some time and due to that,
> the
> >>>>>>>>>>>>>>>>>>>>>> FIFO
> >>>>>>>>>>>>>>>>>>>>>> order
> >>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> tasks is very clear
> >>>>>>>>>>>>>>>>>>>>>> - e.g. 1000 tasks from dag1 were scheduled first and
> >>>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>> 200
> >>>>>>>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>>>>>>> from dag2 etc.
> >>>>>>>>>>>>>>>>>>>>>> - the executor has parallelism=100 and
> >>>>>>>>>>>>>>>>>>>>>> slots_available=100
> >>>>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>> means
> >>>>>>>>>>>>>>>>>>>>>> that it can run up to 100 tasks concurrently
> >>>>>>>>>>>>>>>>>>>>>> - max_active_tasks_per_dag is 4 which means that we
> >>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> 4
> >>>>>>>>>>>>>>>>>>>>>> tasks running per dag.
> >>>>>>>>>>>>>>>>>>>>>> - For 3 dags, it means that we can run up to 12
> tasks
> >>>>>>>>>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>> time (4 tasks from each dag)
> >>>>>>>>>>>>>>>>>>>>>> - max tis per query are set to 32, meaning that we
> can
> >>>>>>>>>>>>>>>>>>>>>> examine
> >>>>>>>>>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>>>> to 32
> >>>>>>>>>>>>>>>>>>>>>> scheduled tasks if there are available pool slots
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> If we were to run the scheduler loop repeatedly
> until
> >>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> queues
> >>>>>>>>>>>>>>>>>>>>>> 12
> >>>>>>>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>>>>>>> and test the part that examines the scheduled tasks
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> queues
> >>>>>>>>>>>>>>>>>>>>>> them,
> >>>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> - with the query limit
> >>>>>>>>>>>>>>>>>>>>>> - 1 iteration, total time 0.05
> >>>>>>>>>>>>>>>>>>>>>> - During the iteration
> >>>>>>>>>>>>>>>>>>>>>> - we have parallelism 100, available slots 100 and
> >>>>>>>>>>>>>>>>>>>>>> query
> >>>>>>>>>>>>>>>>>>>>>> limit
> >>>>>>>>>>>>>>>>>>>>>> 32
> >>>>>>>>>>>>>>>>>>>>>> which means that it will examine up to 32 scheduled
> >>>>>>>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>>>>>>> - it can queue up to 100 tasks
> >>>>>>>>>>>>>>>>>>>>>> - examines 12 tasks (instead of 32)
> >>>>>>>>>>>>>>>>>>>>>> - 4 tasks from dag1, reached max for the dag
> >>>>>>>>>>>>>>>>>>>>>> - 4 tasks from dag2, reached max for the dag
> >>>>>>>>>>>>>>>>>>>>>> - and 4 tasks from dag3, reached max for the dag
> >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag1, reaches max for the dag and
> >>>>>>>>>>>>>>>>>>>>>> moves
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag2, reaches max for the dag and
> >>>>>>>>>>>>>>>>>>>>>> moves
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag3, reaches max for the dag and
> >>>>>>>>>>>>>>>>>>>>>> moves
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> - stops queueing because we have reached the maximum
> >>>>>>>>>>>>>>>>>>>>>> per
> >>>>>>>>>>>>>>>>>>>>>> dag,
> >>>>>>>>>>>>>>>>>>>>>> although there are slots for more tasks
> >>>>>>>>>>>>>>>>>>>>>> - iteration finishes
> >>>>>>>>>>>>>>>>>>>>>> - without
> >>>>>>>>>>>>>>>>>>>>>> - 3 iterations, total time 0.29
> >>>>>>>>>>>>>>>>>>>>>> - During iteration 1
> >>>>>>>>>>>>>>>>>>>>>> - Examines 32 tasks, all from dag1 (due to FIFO)
> >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag1 and tries to queue the other 28
> >>>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>> fails
> >>>>>>>>>>>>>>>>>>>>>> - During iteration 2
> >>>>>>>>>>>>>>>>>>>>>> - examines the next 32 tasks from dag1
> >>>>>>>>>>>>>>>>>>>>>> - it can't queue any of them because it has reached
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> max
> >>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> dag1, since the previous 4 are still running
> >>>>>>>>>>>>>>>>>>>>>> - examines 32 tasks from dag2
> >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag2 and tries to queue the other 28
> >>>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>> fails
> >>>>>>>>>>>>>>>>>>>>>> - During iteration 3
> >>>>>>>>>>>>>>>>>>>>>> - examines the next 32 tasks from dag1, same tasks
> >>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> were
> >>>>>>>>>>>>>>>>>>>>>> examined in iteration 2
> >>>>>>>>>>>>>>>>>>>>>> - it can't queue any of them because it has reached
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> max
> >>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> dag1 and the first 4 are still running
> >>>>>>>>>>>>>>>>>>>>>> - examines 32 tasks from dag2 , can't queue any of
> >>>>>>>>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>>>> it has reached max for dag2 as well
> >>>>>>>>>>>>>>>>>>>>>> - examines 32 tasks from dag3
> >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag3 and tries to queue the other 28
> >>>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>> fails
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I used very low values for all the configs so that I
> >>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> point
> >>>>>>>>>>>>>>>>>>>>>> clear and easy to understand. If we increase them,
> >>>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>> patch
> >>>>>>>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>>> makes the task selection more fair and the resource
> >>>>>>>>>>>>>>>>>>>>>> distribution
> >>>>>>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>> even.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I would appreciate it if anyone familiar with the
> >>>>>>>>>>>>>>>>>>>>>> scheduler's
> >>>>>>>>>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>> confirm this and also provide any feedback.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Additionally, I have one question regarding the
> query
> >>>>>>>>>>>>>>>>>>>>>> limit.
> >>>>>>>>>>>>>>>>>>>>>> Should it
> >>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> per dag_run or per dag? I've noticed that
> >>>>>>>>>>>>>>>>>>>>>> max_active_tasks_per_dag
> >>>>>>>>>>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>>>>>> been changed to provide a value per dag_run but the
> >>>>>>>>>>>>>>>>>>>>>> docs
> >>>>>>>>>>>>>>>>>>>>>> haven't
> >>>>>>>>>>>>>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>>>>>> updated.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thank you!
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>> Christos Bisias
> >>>>>>
> >>>>>>
> ---------------------------------------------------------------------
> >>>>>>
> >>>>>>>>>>>>>>>>>>>> To unsubscribe,
> >>>>>>>>>>>>>>>>>>>> e-mail:[email protected]
> >>>>>>>>>>>>>>>>>>>> For additional commands,
> >>>>>>>>>>>>>>>>>>>> e-mail:[email protected]
> >>>>>>>
> >>>>>>>
> ---------------------------------------------------------------------
> >>>>>>>
> >>>>>>>>>>>>>>>>>> To unsubscribe,
> e-mail:[email protected]
> >>>>>>>>>>>>>>>>>> For additional commands,
> >>>>>>>>>>>>>>>>>> e-mail:[email protected]
> >>>
> >>> ---------------------------------------------------------------------
> >>>
> >>>>>>>>> To unsubscribe, e-mail: [email protected]
> >>>>>>>>> For additional commands, e-mail: [email protected]
> >>>
> >>> ---------------------------------------------------------------------
> >>>
> >>>>>>>> To unsubscribe, e-mail: [email protected]
> >>>>>>>> For additional commands, e-mail: [email protected]
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe, e-mail: [email protected]
> >>>> For additional commands, e-mail: [email protected]
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [email protected]
> > For additional commands, e-mail: [email protected]
> >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to