I also opened a PR which shows the implementation: 
https://github.com/apache/airflow/pull/59087

One important thing I had to do and was a bit challenging was implementing the 
asend method for the CommsDecoder, which makes retrieval of Airflow connections 
in async code possible.
The method already existed but was not implemented, so this proves it was 
already thought about.

Kr,
David

-----Original Message-----
From: Jens Scheffler <[email protected]>
Sent: 04 December 2025 21:12
To: [email protected]
Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks and 
performance in Airflow 3 by supporting native async PythonOperator

EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet 
vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur deze 
e-mail als bijlage naar [email protected]<mailto:[email protected]>.

Requested access to Google doc to read more details. Am interested and also as 
Daniel what the difference is/would be.

Especially also as progress tracking might be important. Yes, Task Mapping is 
very expensive if you want to download 17k XML files, but also when running 
Async and you are at 5000 files, if you resume would you know what was complete 
or would it start from scratch all times?

I think such micro-batching is cool but some state tracking is important
- which might if it is in the DB also overload the DB or add very many 
transactions.

Trioggerer though I think still is cool for long running tasks where you just 
wait for response, e.g. you triggered another job remote or you started a Pod 
that runs for an hour. We have Pods runnign for 10h sometimes and then it is 
important to be able to roll new SW to workers and with triggerers we cann 
de-couple this.

So maybe - without missing details - I would judge such micro-batching as a 
third execution option but most probably would not replace the others.

Also knowin from own experience, writing async code is more complex and error 
prone, so if you would request all normal code being async you might scare 
users away. Proper review needed to ensure all IO is async (also DB calls!)

On 12/4/25 18:08, Daniel Standish via dev wrote:
> Here's what I'm hearing from this
>
> 1. not using task mapping, but just looping instead, can be much more
> efficient.
> Yes, of course it can.
>
> 2. there are ways in which triggerer / deferrable operators are not
> fully complete, or do not fully have feature parity with regular
> operators (such as the custom xcom backend example) I believe it.  But
> this could certainly be worked on.
>
> Question for you:
>
> How is your proposal different / better than say, just calling
> `asyncio.run(...)` in a python task?
>
>
> On Thu, Dec 4, 2025 at 8:38 AM Blain David <[email protected]> wrote:
>
>> As I already discussed with Jarek in the past but also with Hussein
>> during the Airflow Summit, we at a certain moment encountered
>> performance issues when using a lot of deferred operators.
>>
>> Allowing PythonOperators (and thus also @task decorated methods) to
>> natively execute async Python code in Airflow solved our performance issues.
>> And yes, you could argue if that’s really necessary and also what’s
>> the added value? And at first you would indeed think it doesn’t make
>> sense at all do so, right?
>> But please bear with me first and hear me out first why we did it
>> that way and how it solved our performance issues and it will become
>> crystal clear 😉
>> So below is the article I wrote, which is also publicly available
>> here<
>> https://doc/
>> s.google.com%2Fdocument%2Fd%2F1pNdQUB0gH-r2X1N_g774IOUEurowwQZ5OJ7yiY
>> 89qok&data=05%7C02%7Cdavid.blain%40infrabel.be%7Ca90fcc33ab604f91948f
>> 08de33719fc0%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C63900476028
>> 6869534%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuM
>> DAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&
>> sdata=%2FNbnXz%2BWTH0WDp8lTic8sraokWDojaNYfr51I2ohy58%3D&reserved=0>
>> on Google Docs  which makes it easier to read than through the devlist.
>>
>> Here is my article:
>>
>> Rethinking deferrable operators, async hooks and performance in
>> Airflow 3
>>
>> At our company, we strive to avoid custom code in Airflow as much as
>> possible to improve maintainability.
>> For years this meant favouring dedicated Airflow operators over
>> Python operators.
>> However, in Airflow 3, as the number of deferred operators in our
>> DAGs continued to grow, we began facing severe performance issues
>> with deferrable operators, which forced us to re-evaluate that approach.
>>
>> Initially we expected deferrable operators to improve performance for
>> I/O-related tasks—such as REST API calls—because triggerers follow an
>> async producer/consumer pattern. But in practice we discovered the opposite.
>>
>> Why Deferrable Operators Became the Bottleneck?
>>
>> Deferrable operators and sensors delegate async work to triggerers.
>> This is perfectly fine for lightweight tasks such as polling or
>> waiting for messages on a queue.
>>
>> But in our case:
>>
>>
>>    *   MSGraphAsyncOperator performs long-running async operations.
>>    *   HttpOperator in deferrable mode can perform long-running HTTP
>> interactions, especially if pagination is involved.
>>    *   There is no native deferrable SFTPOperator, so if we want to use the
>> SFTPHookAsync, we must use the PythonOperator which natively doesn’t
>> support async code (not that big of challenge).
>>    *   Both can return large payloads.
>>    *   Triggerers must store yielded events directly into the Airflow
>> metadata database.
>>
>> Triggerers are not designed for sustained high-load async execution
>> or large data transfers. Unlike Celery workers, triggerers scale
>> poorly and quickly become the bottleneck.
>>
>> Yielded events from triggers are stored directly in the Airflow
>> metadata database because, unlike workers, triggers cannot leverage a
>> custom XCom backend to offload large payloads, which can lead to
>> increased database load and potential performance bottlenecks.
>>
>> Dynamic task mapping with deferrable operators amplifies the problem
>> even further which AIP‑88 partially solves.
>> Triggerers also cannot be run on the Edge Executor as triggerers are
>> still tightly coupled with the Airflow metadata database (possibly
>> addressed in AIP‑92).
>>
>> Rethinking the approach: Async hooks + Python tasks
>>
>> These limitations led us to reconsider calling async hooks directly
>> from Python @task decorated functions or PythonOperators, thus
>> avoiding deferrable operators and thus triggerers entirely.
>> Operators are wrappers around hooks. Well‑written operators should
>> contain little logic and delegate all the work to the hooks which do
>> the real work,so  why not call them directly?
>> This idea is also a bit in line with what Bolke already presented<
>> https://airflowsummit.org/slides/2023/ab1-1400-Operators.pdf> in 2023.
>>
>> Advantages of this approach include:
>>
>>
>>    *   No dynamic task mapping needed when iterating—just loop in Python,
>> unless you really need to track each individual step but that comes
>> with a cost.
>>    *   Massive reduction in scheduler load.
>>    *   No triggerers involved.
>>    *   Async code can run on Edge Workers.
>>    *   Celery workers scale far much better than triggerers, so by moving
>> from deferred operators and thus triggerers to async operators on
>> celery workers, our performance issues on the triggerer were gone and
>> run times were much shorter probably because the trigger mechanism
>> also puts more load on the scheduler.
>>    *   Sync or async doesn’t make any difference in performance, unless you
>> have to execute the same async function multiple times, that’s when
>> async shines compared to sync especially with I/O related operations.
>>
>> Concrete Example: Async SFTP Downloads
>>
>> Below is an example comparing the download of ~17,000 XML-files and
>> storing into our Datawarehouse.
>> A single Celery worker can orchestrate many concurrent downloads
>> using asyncio.
>> A semaphore (here used internally by the AsyncSFTPConnectionPool)
>> protects the SFTP server from being overloaded.
>> Benchmark results:
>>
>> Approach
>>                              Environment                    Time
>> Mapped SFTPOperator
>>             production                       3h 25m 55s
>> PythonOperator + SFTPHook
>>       local laptop                     1h 21m 09s
>> Async Python task + SFTPHookAsync (without pool)       local laptop
>>               8m 29s
>> Async Python task + AsyncSFTPConnectionPool              production
>>                 3m32s
>>
>> As you all can conclude, DagRun time went down from more than 3 hours
>> to only 3 minutes and a half, which is huge!
>>
>> In the google docs there are 2 different code snippets on how it’s
>> done sync and async which I will not put here.
>>
>> Conclusion
>>
>> Using async hooks inside async Python tasks provides better
>> performance, scalability, and flexibility, and avoids reliance on triggerers 
>> entirely.
>> This hybrid approach—'async where it matters, operators where they
>> make sense'—may represent the future of high‑performance Airflow data
>> processing workloads.
>>
>> What did I change in Airflow?
>>
>> Not that much, I only:
>>
>>
>>    *   Introduced an async PythonOperator so you don’t have to handle the
>> event loop yourself, not that special, but also natively supported on
>> async @task decorated python methods, which is nice to read.
>>    *   Did some improvements on the SFTPHookAsync to fully take advantage
>> of the async.
>>    *   Introduced a SFTPHookPool so multiple asyncio tasks can re-use
>> connection instance to gain even more performance, in this case it
>> meant a reduction of 5 minutes in processing time, so we went from 8 to 3 
>> minutes.
>>
>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to