It took me a bit time - I wanted to go a bit deeper and look closer aht e

I also share a bit of Niko's concerns. Running executor code from the
Triggerer significantly changes the architectural approach and boundaries.
And yeah - it's not really tied to multi-team, it's a general issue (in
multi-team triggerer is per team, so you can be sure that "what is in team,
stays in team").

But it does change which system components communicate with each other. For
example, the Triggerer needs access to the Celery queue for Celery
Executor, which is otherwise unnecessary. Similarly, the Triggerer will
need access to start Pods if the K8S executor is used. I'm not even sure
what would happen with the Edge executor. From what I understand, the Edge
Executor actively loops and checks for tasks, purges jobs etc. It pulls
data from the database so I can imagine it simply enqueues the workload and
then the "real" Edge executor takes over (I believe that will be the case).
However, this significantly crosses the current boundaries of which
component does what.

Not mentioning the secutity implication - the JWT token generator in your
solution works on triggerer, and that's pretty much breaks the (future)
assumption that Triggerer does not need to know the secret necessary to
generate the token - and has a lot implications (for multi-team for example
- but not only) - generally a lot of the future task isolation work is
based that the user code has no chances to see the secret to generate the
tokens.

While I think this is a good temporary solution for you, I understand the
use case and its merit; it's not a niche one. Pretty much anyone with KPO
and lots of deferred tasks will have this case. But I've been thinking
outside of the box actually. I am not sure if I'm right, but this seems to
*really* be a problem with how KPO's Xcom passing is done via the sidecar
for this Triggerer -> next() handover. But it does not have to be this way.

Conceptually, I see absolutely no problem saving the XCom where it belongs:
either the DB or XCom Backend. This operation is almost exclusively
I/O-bound, so it can easily be done asynchronously. It could be done in
KPOTrigger via the supervising process (which has DB access) instead of
passing things back for scheduling. If KPOTriggerer sees that the Pod is
completed it could simply ask the supervisor to perform all the tasks
currently done in KPO's `trigger_reentry`. And that it would not even cause
any re-entry, KPOTriggerer could wait for the supervisor to perform the
action and write to the XCom database or XCom backend, and would simply
"complete" the task. Triggerer could even have a separate event loop for
such "finalization," and all of it could be done asynchronously to scale
things.

There would be no "reentry" workload to run at all, because all completion
could happen in the Triggerer. And I think the Triggerer, similar to the
worker, can communicate with all components. It has access to the DB
(through triggerer supervisor process) and can also communicate with the
XCom backend (you should be able to perform XCom Push from the Triggerer
supervisor (not necessarily from the event loop process). Aside probably
much more notwork traffic for the Triggerer to save/retrieve XComs from
multiple deferred KPOs. I can't see any problem with it ( and it can be
nicely scaled out by adding more triggerers)

Or am I missing something obvious ?

J.


On Sat, Mar 21, 2026 at 12:17 PM Jens Scheffler <[email protected]> wrote:

> Hey Niko,
>
> thanks as well for the response with reasoning. I understand that you do
> not prefer a coupling for a +0.
>
> Is there any other option (hint: some where listed "Options we
> considered") that you would propose as better solution? Or is the +0
> just the "least of worse"?
>
> Jens
>
> On 20.03.26 02:03, Oliveira, Niko wrote:
> > Hey Jens et al.,
> >
> > That is three datapoints now so there is indeed some demand. Not sure if
> that pushes it over the limit of needing this, but it's more compelling now.
> >
> > To your 2) in your last reply Jens: My concern wasn't Multi-Team
> specifically or anything to do with config. More just that this changes the
> (mostly unwritten) contract that the scheduler wholly owns and interacts
> with executors. Up until this change one could depend on this being true
> and that there is only ever one instance of any executor (per team now,
> since two teams can have the same executor between them). But now your
> changes are instantiating executors and queuing work with them. So all
> future executor/scheduling changes need to keep this in mind. It simply
> increases the surface area of execution/scheduling in Airflow and we now
> have a multi-writer situation which always brings extra complexity. If
> there is enough demand then we should do it. Seems to be a few requesters
> now, which I think brings me to a +0 on this one.
> >
> > Thanks again for the discussion!
> >
> > ________________________________
> > From: André Ahlert<[email protected]>
> > Sent: Thursday, March 19, 2026 1:15 PM
> > To:[email protected] <[email protected]>
> > Subject: RE: [EXT] [DISCUSS] PR: Option to directly submit to worker
> queue from Triggerer
> >
> > CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
> >
> >
> >
> > AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur
> externe. Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous
> ne pouvez pas confirmer l’identité de l’expéditeur et si vous n’êtes pas
> certain que le contenu ne présente aucun risque.
> >
> >
> >
> > Hi all,
> >
> > This discussion resonates with problems we've seen on a fintech client's
> > setup. Heavy KPO usage in deferred mode, Celery executor, pools with
> > include_deferred=True. When a batch of tasks finishes on the cluster and
> > flips back to SCHEDULED, the pool slots are released and tasks have to
> > recompete through the scheduler for what amounts to seconds of cleanup
> > work. In our case the cloud cost was not the main concern, but it raised
> a
> > real architectural question: why does a task that already passed all
> > concurrency checks need to go through the entire scheduling loop again
> just
> > to collect XCom and delete a pod?
> >
> > On the architectural concerns, I think the mini-scheduler comparison from
> > Airflow 2 does not hold. That feature made full scheduling decisions
> > (concurrency, pools, priority). This PR makes none. It re-enqueues a task
> > that allready satisfied all those checks. The task already had a slot,
> > already was running. We are just sending it back to finish.
> >
> > Fact that it is opt-in with a safe fallback to SCHEDULED makes this very
> > low risk. If nobody configures direct_queueing_executors, behavior is
> > identical to today. Marking it experimental for a release or two would
> also
> > be fine.
> >
> > On the use case being narrow: any deployment using deferred mode with
> > Celery at scale will eventually hit this. Issue #57210 shows independent
> > reports of the same pattern. It is inherent to the DEFERRED -> SCHEDULED
> ->
> > QUEUED state machine under pool contention, not specific to one setup.
> >
> > +1 (non-binding) on getting this into main.
> >
> > Thanks Jens for bringing this one.
> >
> > André Ahlert
> >
> > Em qui., 19 de mar. de 2026 às 14:24, Jens Scheffler<[email protected]
> >
> > escreveu:
> >
> >> Hi Niko,
> >>
> >> thanks for the feedback.
> >>
> >> (1) I think the use case is not really narrow, @tirkarthi also pointed
> >> to issuehttps://github.com/apache/airflow/issues/57210 - So this would
> >> be closed as well
> >>
> >> (2) I aimed to include support for Multi-Team (that was even adding some
> >> complexity compared to the local patch in 3.1.7. Yes so the config
> >> property is atm global such that if you set it enabled for
> >> CeleryExecutor it would be for all Executors in all teams. But if you
> >> wish and see a reason we can also model the config being team specific
> >> (e.g. only team_a uses CeleryExecutor and team_b does not optimize -
> >> though not sure if there is a need to separate). Routing and queueing
> >> for sure is respectinv the correct Executor/queue instance to route to
> >> in the PR.
> >> See
> >>
> >>
> https://github.com/apache/airflow/pull/63489/changes#diff-c30603fe0a3527e23af541ba115c91c85c9c213e6d105af6a48c88a7018a5799R333
> >> and
> >>
> >>
> https://github.com/apache/airflow/pull/63489/changes#diff-c30603fe0a3527e23af541ba115c91c85c9c213e6d105af6a48c88a7018a5799R347
> >>
> >> Jens
> >>
> >> On 18.03.26 23:34, Oliveira, Niko wrote:
> >>> Thanks for the write-up Jens, it helps to have the full context of your
> >> thought process.
> >>> The code changes themselves are small and fairly elegant. But this
> >> breaks the invariant that there is only ever one instance of each
> executor
> >> (per team) and that they live in the scheduler process and the
> scheduler is
> >> the only thing that interacts with them in a scheduling capacity. To me
> >> it's quite a large logical change in Airflow behaviour/operation. When
> >> reasoning about execution and scheduling there is now another source
> that
> >> will always need to be considered, ensuring it works and is tested when
> >> executor related changes are made, etc. This has been fraught for us in
> the
> >> past in ways that were hard to predict beforehand.
> >>> The usecase seems quite narrow and focused on how you folks use
> Airflow.
> >> Are you hearing from any other users who are asking for something like
> >> this? I'm just not sure I see enough evidence that it truly belongs in
> >> apache/airflow main.
> >>> Cheers,
> >>> Niko
> >>>
> >>> ________________________________
> >>> From: Jens Scheffler<[email protected]>
> >>> Sent: Wednesday, March 18, 2026 2:56 PM
> >>> To:[email protected] <[email protected]>
> >>> Subject: [EXT] [DISCUSS] PR: Option to directly submit to worker queue
> >> from Triggerer
> >>> CAUTION: This email originated from outside of the organization. Do not
> >> click links or open attachments unless you can confirm the sender and
> know
> >> the content is safe.
> >>>
> >>>
> >>> AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur
> >> externe. Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si
> vous
> >> ne pouvez pas confirmer l’identité de l’expéditeur et si vous n’êtes pas
> >> certain que le contenu ne présente aucun risque.
> >>>
> >>>
> >>> Dear Airflow Devs!
> >>>
> >>> TLDR: Because of operational problems in processing workload we propose
> >>> an extension allowing to directly re-queue tasks from triggerer. The PR
> >>> raised demand to discuss to ensure awareness for the change is
> available.
> >>>
> >>> Pull Request: Allow direct queueing from triggerer
> >>> <https://github.com/apache/airflow/pull/63489#top> #63489
> >>> https://github.com/apache/airflow/pull/63489
> >>>
> >>> The Use Case/Problem Statement:
> >>>
> >>> We use Airflow for many workflows of scaled long and large Dags in
> >>> running 80% KPO workload. To ensure KPO can run scaled and long w/o
> >>> operation interruptions (worker restart due to re-deployment, Pods with
> >>> workload sometimes running 4-10h) and to be able to scale to thousands
> >>> of running KPO Pods we need to use and leverage deferred mode
> >> excessively.
> >>> In KPO with deferred a task is first scheduled to a (Celery in our
> case)
> >>> worker which prepares the Pod manifest and starts the Pod. From there
> it
> >>> hands-over to triggerer which monitors the Pod running and tails the
> log
> >>> so that a user can watch progress. Once the Pod is completed it returns
> >>> back to a (Celery) worker that finishes-up work, extracts XCom, makes
> >>> error handling and cleans-up the Pod from K8s. This also means that the
> >>> Pod is only finished when the XCom is pulled from side-car, the "base"
> >>> container might be completed and the Pod is only done and deleted when
> >>> the XCom is collected. Until KPO collects XCom the Pod keeps running.
> >>>
> >>> The current method of scheduling in Airflow is that the Scheduler
> checks
> >>> all rules of concurrency (max_active_tasks, max_tis_per_dagrun,
> >>> pools...) in state scheduled before a task is queued to be started. On
> >>> the worker when started it is directly set to "deferred" and then a
> >>> triggerer picks-up (no re-scheduling or active distribution to a
> >>> triggerer). On the way back today triggerer marks the task "scheduled"
> >>> which means the scheduler logic needs to pick-up the task again for
> >>> competition. With all other workload. And re-schedule with all
> >>> concurrency and priority checks like initially to get to queued to be
> >>> re-assigned to a (Celery) worker. This implicitly means leaving the
> >>> state of "deferred" to "scheduled" the task loses the allocated pool
> >>> slot and also need to re-allocate this.
> >>>
> >>> It most regular situations this is okay. In our scenario it is a
> >>> problem: We have many Dags competing for the K8s cluster resources and
> >>> the concurrency features of Airflow joined with priority controls
> should
> >>> ensure that important workload runs first. Once there is residual
> >>> capacity less important batches can consume cluster resources. And with
> >>> "consume resources" also refers to Pods sitting on the cluster. They
> >>> free up the cluster space only at point of XCom collected and Pod
> >>> removed. Before they still consume CPU and ephemeral storage
> >>> allocations. We limit the amount of workload being able to be sent to
> >>> K8s by Airflow pools which are the ultimate limit for concurrency on
> >>> different node pools (e.g. nodes with GPU and nodes w/o GPU). Other
> >>> workload often runs on Edge workers or directly as Python in Celery.
> >>>
> >>> With multiple Dags and different priorities we had these two effects:
> >>>
> >>> (1) A lower priority batch is running ~N*100 Pods in deferred. A higher
> >>> priority large batch is started. Pods finishing from the lower priority
> >>> tasks are assumed to drain the cluster, when they end the task
> instances
> >>> are set to "scheduled" and... then stay there until all tasks of the
> >>> higher priority tasks are worked off (assuming the higher priority
> tasks
> >>> are not limited leaving room for the lower priority tasks). So base
> >>> container of the Pods are completed, the XCom side car waits long - we
> >>> have seen even 24h - to be XCom collected to be cleaned.
> >>>
> >>>        (a) Additional side effect if pending long the AutoScaler might
> >>> pick such a node as scaling victim because really idle and after grace
> >>> period kills the Pod - Later when the workload returns to worker the
> Pod
> >>> is showing a HTTP 404 as being gone, XCom is lost... in most cases need
> >>> to run a retry, else it is anyway a delay and additional hours of
> >>> re-execution. If no retry just raising failures to users.
> >>>
> >>>        (b) We had the side effect that newer high priority workload was
> >>> not scheduled by K8s to the (almost idle) Nodes because the previous
> >>> pending Pods allocated still ephemeral storage and not sufficient space
> >>> was on K8s nodes for new workload... so the old Pods blocked the new
> and
> >>> the higher priority task instances blocked the cleanup of the lower
> prio
> >>> instances. A lot of tasks were in a kind of dead-lock.
> >>>
> >>>        (c) As the re-set to state "scheduled" from triggerer also sets
> the
> >>> "scheduled" date of the task instance also the from the same "low
> >>> priority" Dag other pending scheduled task instances are often started
> >>> earlier. So workers pick-up new tasks to start new Pods but a lot of
> old
> >>> Pods are sitting there idle waiting to collect XCom to clean-up
> >>>
> >>> (2) Also sometimes because of operational urgency we use the
> >>> "enable/disable" scheduling flag on Dags in the UI to administratively
> >>> turn-off Dag scheduling to leave space for other Dags... or to drain
> the
> >>> cluster for some operational procedures e.g. to have a safe ground
> >>> before maintenance. But as the Dag needs to be actively scheduled to
> >>> process the return from triggerer. If you turn off scheduling the
> >>> workload in flight is never finishing and is getting stuck like
> >>> described before. Pods are stale on the cluster, nobody picks-up the
> >>> XCom. And the problematic scenario is also there is no way to "clean
> up"
> >>> such tasks to finish these Dags w/o turning on scheduling... but then
> >>> also new tasks are queued and you are just not able to drain the
> >>> cluster. I know we discussed multiple times that we might need a
> "drain"
> >>> mode to let existing Dags finish but not scheduling new Dag runs... but
> >>> such feature is also missing. To say: Scheduling new tasks is tightly
> >>> coupled with the scheduling of cleanup. Not possible to separate.
> >>> Getting to the problems as 1 (a-c) as well in the scenario (2).
> >>>
> >>> We thought a while about which options we would have to contribute to
> >>> improve in general. Assumption and condition is that the initial start
> >>> on the (Celery) Worker is fast, most time is spent (once) on the
> >>> triggerer and the return to worker is actually only made for a few
> >>> seconds to clean-up. And of course we want to minimize latency to (I)
> >>> free the allocated resources and (II) not to have any additional
> >>> artificial delay for the user. Which a bit contradicts with the efforts
> >>> to flip from worker to triggerer and back again.
> >>>
> >>> Options we considered:
> >>>
> >>>     * Proposing a new "state" for a task instance, e.g. "re-schedule"
> that
> >>>       is handled with priority by scheduler.
> >>>       But the scheduler is already a big beast of complexity, adding
> >>>       another loop to handle re-scheduled with all existing complexity
> >>>       might be a large complexity to be added and adding another state
> in
> >>>       the state model also adds a lot of overhead from documentation to
> >> UI...
> >>>     * Finish-up the work on triggerer w/o return to Worker. It is only
> >>>       about cleaning the KPO and...
> >>>       Unfortunately more than just monitoring is very complex to
> implement
> >>>       and especially XCom DB access is not a desired concept and
> triggerer
> >>>       does not have support. We also have some specific triaging and
> error
> >>>       handling automation extended on top of KPO which all in async
> with
> >>>       the limited capabilities of triggerer would be hard to implement.
> >>>       Main blocker in this view is XCom access.
> >>>     * Dynamically increase priority of a task returning from triggerer.
> >>>       We considered "patching" the priority_weight value of the task
> >>>       instance on the triggerer before return to ensure that tasks
> >>>       returning are just elevated in priority. First we made this from
> the
> >>>       side via SQL (UPDATE task_instance SET priority_weight=1000 WHERE
> >>>       state='scheduled' AND next_method='trigger_reentry') but
> actually if
> >>>       the task failed and restarted then it is hard to find and reset
> the
> >>>       priority back... still a retry would need to be reset down... all
> >>>       feels like a workaround.
> >>>     * Implementing a special mode in Scheduler to select tasks with
> >>>       "next_method" being set as signal they are returning from
> triggerer
> >>>       in a special way... assuming they have a Pool slot and exclude
> some
> >>>       of the concurrency checks (As in "scheduled" state the pool slot
> is
> >>>       actually "lost")
> >>>       But this hard to really propose... as this might be even harder
> than
> >>>       the first option as well as the today complex code would get even
> >>>       harder in scheduler to consider exceptions in concurrency... with
> >>>       the risk that such special cases exceed the planned concurrency
> >>>       limits if otherwise the pools are exhausted before already.
> >>>     * Adding a REST API that the triggerer can call on scheduler to
> >>>       cross-post workload.
> >>>       That would need to add a new connection and component bundling, a
> >>>       REST API endpoint would need to be added for schedulers to
> receive
> >>>       these push calls. Probably an alternative but also adds
> >>>       architectural dependability.
> >>>     * The PR we propose to discuss here: If the task skips "scheduled"
> >>>       state and moves to queue directly the pool slot keeps allocated
> >>>       (assuming that Deferred in actively counting into pool and
> >>>       concurrency limits). Code looked not too complex as just the
> >>>       enqueuing logic from Scheduler could be integrated.
> >>>       In this it is considering that such direct queuing is only
> possible
> >>>       if the executor supports queues (not working for
> LocalExecutor!). So
> >>>       the proposed PR made it explicitly opt-in.
> >>>
> >>> Reasons (and pro-arguments) why we propose to have the PR on main:
> >>>
> >>>     * As of a lot of operational problems recently we tested this and
> >>>       patched this locally into our 3.1.7 triggerer. Works smoothly on
> >>>       production since ~1 week
> >>>     * If something goes wrong or Executor is not supported then the
> >>>       existing path setting to "scheduled" is always used as safe
> fallback
> >>>     * It is selective and is an opt-in feature
> >>>     * We dramatically reduced latency from Pod completion to cleanup
> some
> >>>       sometimes 6-24h to a few seconds
> >>>     * We assume the cleanup as return from Worker is a small effort
> only
> >>>       so no harm even if temporarily over-loading some limits
> >>>     * ...But frankly speaking the concurrency limits and Pools were
> >>>       checked initially at time of start. Limiting cleanup later on
> >>>       concurrency limits is not adding any benefits but just delays and
> >>>       problems. We just want to finish-up work.
> >>>     * But finally actually over-loading is not possible as still the
> Redis
> >>>       queue is in between - so any free Celery Worker will pick the
> task.
> >>>       Even in over-load it will just sit in Celery queue for a moment.
> >>>     * It is a relatively small change
> >>>     * Off-loads scheduler by 50% for all deferred tasks (need to pass
> >>>       scheduler only once)
> >>>     * Due to reduced latency on cleanup more "net workload"
> schedulable on
> >>>       the cluster, higher cloud utilization / less idle time.
> >>>
> >>> Hearing the feedback on PR reflecting with the devils advocate I could
> >>> understand the counter arguments:
> >>>
> >>>     * In Airflow 2 there was a mini-scheduler, there was a hard fight
> in
> >>>       Airflow 3 to get rid of this!
> >>>       Understand. But we do not want to add a "mini scheduler" we just
> >>>       want to use parts of the Executor code to push the task instance
> to
> >>>       queue and skip scheduling. It is NOT the target to make any more
> and
> >>>       schedule anything else.
> >>>     * This would skip all concurrency checks and potentially over-load
> the
> >>>       workers!
> >>>       No. Concurrency rules are checked when the workload is initially
> >>>       started. I know there are parallel bugs we are fighting with to
> >>>       ensure deferred status is counted on all levels into concurrency
> to
> >>>       correctly keep limits. Assuming that you enable counting deferred
> >>>       into pools, a direct re-queue to worker is just keeping the
> level of
> >>>       concurrency not adding more workload... just transferring back.
> And
> >>>       Celery for example has a queue so not really over-loading. It is
> >>>       mainly intended to clean-up workload which is a low effort task.
> >>>     * We plan to cut-off components and untangle package dependencies.
> >>>       After worker the Dag parser and triggerer are next. Linking to
> >>>       Executor defeats these plans!
> >>>       Yes, understood. But also today the setting of the task_instance
> is
> >>>       using direct DB access... and would in such surgery need to be
> cut
> >>>       to the level that the DB access would need to be moved to
> execution
> >>>       API back-end. So the cut for re-queueing would move to execution
> API
> >>>       in future, not triggerer. I think it would be valid to think
> about
> >>>       the options if such distribution is made how that might evolve in
> >>>       future.
> >>>     * This option is risky and we have concerns people have more
> errors.
> >>>       Feature is opt-in, need to be configured. Per default as
> proposed in
> >>>       the PR it is not active. Would be also acceptable to mark this
> >>>       experimental for a while.
> >>>
> >>> Sorry, a bit longer text. Happy to get feedback.
> >>>
> >>> Jens
> >>>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail:[email protected]
> >> For additional commands, e-mail:[email protected]
> >>
> >>

Reply via email to