Thanks David for the proposal,

I think it makes sense to rather publish an AIP on this matter and not a private (or non public) Google doc. I think I like the idea and it makes sense - for certain use cases. The PR you created as code example does not look huge but still it adds complexity that needs to be maintained.

So certainly the AIP document should discuss for which use cases this is beneficial and which Async hooks need to be available to leverage. And it makes sense for the proposed kind-of micro batches which really accelerate by factor 100... but with the trade-off that in case of fail you need to restart from the start. Whereas 3 min compared to 3 hours probably it is easier to start the micro-batch from the beginning as to persist all intermediate state.

Nevertheless also the other parallel AIP which calls about persistence in Triggerers might be of interest to at least sometimes store sync-points.

Anyway hoping for more feedback by other maintainers... which actually on technical side might be easier to be collected in AIP/Confluence then on devlist probably. At least better on there and not in a private Google doc.

Jens

On 12/5/25 08:47, Blain David wrote:
Hello Jense,

Thanks for your time and answer.  I just granted you and Zhe-You access to the 
document.

In the article I explained why we did the iteration ourselves within the async 
@task decorated function, as this was way faster than doing it with dynamic 
task mapping.
Not that you cannot use dynamic task mapping with async PythonOperator, it just 
works as with any other operator, it's just doesn't make sense as it won't give 
you any performance benefits due to the fact that you don't share the same 
event loop (at least when using the CeleryExecutor).

You could for example on big streams of data use dynamic tsk mapping to chunk 
the stream in multiple pieces and then each task would process the chunk within 
the async operator for example, a bit like partition if you like.

In the example I used for the article, we once again don't care about 
individual download state of the FTP-file, we just want to know if the 
directory was successfully downloaded or not, ofc we added some logging 
statements to show which file was downloaded.
I also know Jarek wants individual state tracking, but that' not the solution I 
presented here, for micro batching we have our IterableOperator, which instead 
of doing partial/expand we do partial/iterate, which actually does the same as 
the for loop of the example in the article but then managed for
you in a multi threaded way for sync as async tasks as well.  There async tasks 
will benefit from the multi threading, as they share the same event loop and 
everything is run within the same Celery worker, but that's another solution.

Still with the dynamic task mapping or IterableOperator, you wouldn't be able 
to use the SFTPClientPool (before name AsyncSFTPConnectionPool as in the 
article), so you wouldn't benefit of the performance gain you get from the 
pool, that why here in this example,
we do the looping ourselves.

And I completely agree for triggerers, we also use it a lot, and it is indeed 
cool for long running tasks in which have lot's of waiting times (dead time), 
and you're just monitoring a state, that the purpose of triggerers!
But with some operators, triggers are misused as they are the "only" way to run 
async code which returns a lot of data which have to come back to the operator so it can 
be exposed as an XCom, there you'll see that you trigger table in the Airflow database 
will explode fast,
As each yielded response is being stored in the database, as it can't make use 
of a custom XCom backend like operators do, so in practise, each result yielded 
by the trigger, will first end up in your tirgger table before being stored as 
an XCom.

Also, I'm not telling here to replace anything, I just propose an alternative 
solution in Airflow so you're not purely tied to deferrable operators.  Also at 
the Summit, we experienced during a lot of presentations, that people tend to 
use more the PythonOperators (or @task) with hook than
using the operators themselves, which in my opinion makes sense as you have 
more flexibility, you still benefit from the Airflow integration with Hooks but 
you aren't tight to the operator implementation, which might offer limited 
operations, look for example at the SFTPOperator and you'll
understand immediately.  That's why I propose to allow running async code in 
PythonOperators natively, that way, you can directly interact with async hooks 
and you don't need a triggerer to do that.  Triggers are great for polling and 
listening, but not for processing huge amounts of data,
that's where celery workers shine, thus allowing PythonOperators to natively 
run async code in you celery workers, you can do so.

For you last example, in our case DB calls are still sync, as of my knowledge, 
we don’t have any DB hook based on DBApiHook which supports async operations?  
Also the db operation can be seen as another sync task, so they don't need to 
run in the same async task, you just pass the XCom returned from the async task 
to the sync task.  But having async DB hooks could be cool, I also though about 
it, but this would also depend on the driver if it supports it, still it also 
something I would like to test in the near future.

I hope this answered most of your questions Jens 😉

-----Original Message-----
From: Jens Scheffler <[email protected]>
Sent: 04 December 2025 21:12
To: [email protected]
Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks and 
performance in Airflow 3 by supporting native async PythonOperator

EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet 
vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur deze 
e-mail als bijlage naar [email protected]<mailto:[email protected]>.

Requested access to Google doc to read more details. Am interested and also as 
Daniel what the difference is/would be.

Especially also as progress tracking might be important. Yes, Task Mapping is 
very expensive if you want to download 17k XML files, but also when running 
Async and you are at 5000 files, if you resume would you know what was complete 
or would it start from scratch all times?

I think such micro-batching is cool but some state tracking is important
- which might if it is in the DB also overload the DB or add very many 
transactions.

Trioggerer though I think still is cool for long running tasks where you just 
wait for response, e.g. you triggered another job remote or you started a Pod 
that runs for an hour. We have Pods runnign for 10h sometimes and then it is 
important to be able to roll new SW to workers and with triggerers we cann 
de-couple this.

So maybe - without missing details - I would judge such micro-batching as a 
third execution option but most probably would not replace the others.

Also knowin from own experience, writing async code is more complex and error 
prone, so if you would request all normal code being async you might scare 
users away. Proper review needed to ensure all IO is async (also DB calls!)

On 12/4/25 18:08, Daniel Standish via dev wrote:
Here's what I'm hearing from this

1. not using task mapping, but just looping instead, can be much more
efficient.
Yes, of course it can.

2. there are ways in which triggerer / deferrable operators are not
fully complete, or do not fully have feature parity with regular
operators (such as the custom xcom backend example) I believe it.  But
this could certainly be worked on.

Question for you:

How is your proposal different / better than say, just calling
`asyncio.run(...)` in a python task?


On Thu, Dec 4, 2025 at 8:38 AM Blain David <[email protected]> wrote:

As I already discussed with Jarek in the past but also with Hussein
during the Airflow Summit, we at a certain moment encountered
performance issues when using a lot of deferred operators.

Allowing PythonOperators (and thus also @task decorated methods) to
natively execute async Python code in Airflow solved our performance issues.
And yes, you could argue if that’s really necessary and also what’s
the added value? And at first you would indeed think it doesn’t make
sense at all do so, right?
But please bear with me first and hear me out first why we did it
that way and how it solved our performance issues and it will become
crystal clear 😉
So below is the article I wrote, which is also publicly available
here<
https://doc/
s.google.com%2Fdocument%2Fd%2F1pNdQUB0gH-r2X1N_g774IOUEurowwQZ5OJ7yiY
89qok&data=05%7C02%7Cdavid.blain%40infrabel.be%7Ca90fcc33ab604f91948f
08de33719fc0%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C63900476028
6869534%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuM
DAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&
sdata=%2FNbnXz%2BWTH0WDp8lTic8sraokWDojaNYfr51I2ohy58%3D&reserved=0>
on Google Docs  which makes it easier to read than through the devlist.

Here is my article:

Rethinking deferrable operators, async hooks and performance in
Airflow 3

At our company, we strive to avoid custom code in Airflow as much as
possible to improve maintainability.
For years this meant favouring dedicated Airflow operators over
Python operators.
However, in Airflow 3, as the number of deferred operators in our
DAGs continued to grow, we began facing severe performance issues
with deferrable operators, which forced us to re-evaluate that approach.

Initially we expected deferrable operators to improve performance for
I/O-related tasks—such as REST API calls—because triggerers follow an
async producer/consumer pattern. But in practice we discovered the opposite.

Why Deferrable Operators Became the Bottleneck?

Deferrable operators and sensors delegate async work to triggerers.
This is perfectly fine for lightweight tasks such as polling or
waiting for messages on a queue.

But in our case:


    *   MSGraphAsyncOperator performs long-running async operations.
    *   HttpOperator in deferrable mode can perform long-running HTTP
interactions, especially if pagination is involved.
    *   There is no native deferrable SFTPOperator, so if we want to use the
SFTPHookAsync, we must use the PythonOperator which natively doesn’t
support async code (not that big of challenge).
    *   Both can return large payloads.
    *   Triggerers must store yielded events directly into the Airflow
metadata database.

Triggerers are not designed for sustained high-load async execution
or large data transfers. Unlike Celery workers, triggerers scale
poorly and quickly become the bottleneck.

Yielded events from triggers are stored directly in the Airflow
metadata database because, unlike workers, triggers cannot leverage a
custom XCom backend to offload large payloads, which can lead to
increased database load and potential performance bottlenecks.

Dynamic task mapping with deferrable operators amplifies the problem
even further which AIP‑88 partially solves.
Triggerers also cannot be run on the Edge Executor as triggerers are
still tightly coupled with the Airflow metadata database (possibly
addressed in AIP‑92).

Rethinking the approach: Async hooks + Python tasks

These limitations led us to reconsider calling async hooks directly
from Python @task decorated functions or PythonOperators, thus
avoiding deferrable operators and thus triggerers entirely.
Operators are wrappers around hooks. Well‑written operators should
contain little logic and delegate all the work to the hooks which do
the real work,so  why not call them directly?
This idea is also a bit in line with what Bolke already presented<
https://airflowsummit.org/slides/2023/ab1-1400-Operators.pdf> in 2023.

Advantages of this approach include:


    *   No dynamic task mapping needed when iterating—just loop in Python,
unless you really need to track each individual step but that comes
with a cost.
    *   Massive reduction in scheduler load.
    *   No triggerers involved.
    *   Async code can run on Edge Workers.
    *   Celery workers scale far much better than triggerers, so by moving
from deferred operators and thus triggerers to async operators on
celery workers, our performance issues on the triggerer were gone and
run times were much shorter probably because the trigger mechanism
also puts more load on the scheduler.
    *   Sync or async doesn’t make any difference in performance, unless you
have to execute the same async function multiple times, that’s when
async shines compared to sync especially with I/O related operations.

Concrete Example: Async SFTP Downloads

Below is an example comparing the download of ~17,000 XML-files and
storing into our Datawarehouse.
A single Celery worker can orchestrate many concurrent downloads
using asyncio.
A semaphore (here used internally by the AsyncSFTPConnectionPool)
protects the SFTP server from being overloaded.
Benchmark results:

Approach
                              Environment                    Time
Mapped SFTPOperator
             production                       3h 25m 55s
PythonOperator + SFTPHook
       local laptop                     1h 21m 09s
Async Python task + SFTPHookAsync (without pool)       local laptop
               8m 29s
Async Python task + AsyncSFTPConnectionPool              production
                 3m32s

As you all can conclude, DagRun time went down from more than 3 hours
to only 3 minutes and a half, which is huge!

In the google docs there are 2 different code snippets on how it’s
done sync and async which I will not put here.

Conclusion

Using async hooks inside async Python tasks provides better
performance, scalability, and flexibility, and avoids reliance on triggerers 
entirely.
This hybrid approach—'async where it matters, operators where they
make sense'—may represent the future of high‑performance Airflow data
processing workloads.

What did I change in Airflow?

Not that much, I only:


    *   Introduced an async PythonOperator so you don’t have to handle the
event loop yourself, not that special, but also natively supported on
async @task decorated python methods, which is nice to read.
    *   Did some improvements on the SFTPHookAsync to fully take advantage
of the async.
    *   Introduced a SFTPHookPool so multiple asyncio tasks can re-use
connection instance to gain even more performance, in this case it
meant a reduction of 5 minutes in processing time, so we went from 8 to 3 
minutes.



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to