As I already discussed with Jarek in the past but also with Hussein during the 
Airflow Summit, we at a certain moment encountered performance issues when 
using a lot of deferred operators.

Allowing PythonOperators (and thus also @task decorated methods) to natively 
execute async Python code in Airflow solved our performance issues.
And yes, you could argue if that’s really necessary and also what’s the added 
value? And at first you would indeed think it doesn’t make sense at all do so, 
right?
But please bear with me first and hear me out first why we did it that way and 
how it solved our performance issues and it will become crystal clear 😉
So below is the article I wrote, which is also publicly available 
here<https://docs.google.com/document/d/1pNdQUB0gH-r2X1N_g774IOUEurowwQZ5OJ7yiY89qok>
 on Google Docs  which makes it easier to read than through the devlist.

Here is my article:

Rethinking deferrable operators, async hooks and performance in Airflow 3

At our company, we strive to avoid custom code in Airflow as much as possible 
to improve maintainability.
For years this meant favouring dedicated Airflow operators over Python 
operators.
However, in Airflow 3, as the number of deferred operators in our DAGs 
continued to grow, we began facing severe performance issues with deferrable 
operators, which forced us to re-evaluate that approach.

Initially we expected deferrable operators to improve performance for 
I/O-related tasks—such as REST API calls—because triggerers follow an async 
producer/consumer pattern. But in practice we discovered the opposite.

Why Deferrable Operators Became the Bottleneck?

Deferrable operators and sensors delegate async work to triggerers.
This is perfectly fine for lightweight tasks such as polling or waiting for 
messages on a queue.

But in our case:


  *   MSGraphAsyncOperator performs long-running async operations.
  *   HttpOperator in deferrable mode can perform long-running HTTP 
interactions, especially if pagination is involved.
  *   There is no native deferrable SFTPOperator, so if we want to use the 
SFTPHookAsync, we must use the PythonOperator which natively doesn’t support 
async code (not that big of challenge).
  *   Both can return large payloads.
  *   Triggerers must store yielded events directly into the Airflow metadata 
database.

Triggerers are not designed for sustained high-load async execution or large 
data transfers. Unlike Celery workers, triggerers scale poorly and quickly 
become the bottleneck.

Yielded events from triggers are stored directly in the Airflow metadata 
database because, unlike workers, triggers cannot leverage a custom XCom 
backend to offload large payloads, which can lead to increased database load 
and potential performance bottlenecks.

Dynamic task mapping with deferrable operators amplifies the problem even 
further which AIP‑88 partially solves.
Triggerers also cannot be run on the Edge Executor as triggerers are still 
tightly coupled with the Airflow metadata database (possibly addressed in 
AIP‑92).

Rethinking the approach: Async hooks + Python tasks

These limitations led us to reconsider calling async hooks directly from Python 
@task decorated functions or PythonOperators, thus avoiding deferrable 
operators and thus triggerers entirely.
Operators are wrappers around hooks. Well‑written operators should contain 
little logic and delegate all the work to the hooks which do the real work,so  
why not call them directly?
This idea is also a bit in line with what Bolke already 
presented<https://airflowsummit.org/slides/2023/ab1-1400-Operators.pdf> in 2023.

Advantages of this approach include:


  *   No dynamic task mapping needed when iterating—just loop in Python, unless 
you really need to track each individual step but that comes with a cost.
  *   Massive reduction in scheduler load.
  *   No triggerers involved.
  *   Async code can run on Edge Workers.
  *   Celery workers scale far much better than triggerers, so by moving from 
deferred operators and thus triggerers to async operators on celery workers, 
our performance issues on the triggerer were gone and run times were much 
shorter probably because the trigger mechanism also puts more load on the 
scheduler.
  *   Sync or async doesn’t make any difference in performance, unless you have 
to execute the same async function multiple times, that’s when async shines 
compared to sync especially with I/O related operations.

Concrete Example: Async SFTP Downloads

Below is an example comparing the download of ~17,000 XML-files and storing 
into our Datawarehouse.
A single Celery worker can orchestrate many concurrent downloads using asyncio.
A semaphore (here used internally by the AsyncSFTPConnectionPool)  protects the 
SFTP server from being overloaded.
Benchmark results:

Approach                                                                        
                      Environment                    Time
Mapped SFTPOperator                                                             
      production                       3h 25m 55s
PythonOperator + SFTPHook                                                       
local laptop                     1h 21m 09s
Async Python task + SFTPHookAsync (without pool)       local laptop             
        8m 29s
Async Python task + AsyncSFTPConnectionPool              production             
          3m32s

As you all can conclude, DagRun time went down from more than 3 hours to only 3 
minutes and a half, which is huge!

In the google docs there are 2 different code snippets on how it’s done sync 
and async which I will not put here.

Conclusion

Using async hooks inside async Python tasks provides better performance, 
scalability, and flexibility, and avoids reliance on triggerers entirely.
This hybrid approach—'async where it matters, operators where they make 
sense'—may represent the future of high‑performance Airflow data processing 
workloads.

What did I change in Airflow?

Not that much, I only:


  *   Introduced an async PythonOperator so you don’t have to handle the event 
loop yourself, not that special, but also natively supported on async @task 
decorated python methods, which is nice to read.
  *   Did some improvements on the SFTPHookAsync to fully take advantage of the 
async.
  *   Introduced a SFTPHookPool so multiple asyncio tasks can re-use connection 
instance to gain even more performance, in this case it meant a reduction of 5 
minutes in processing time, so we went from 8 to 3 minutes.


Reply via email to