Cool, just checking.   Yeah, I like the idea and would also like to make sure 
when we say we're putting a config option in there, that there is some clearly 
documented deprecation path so that the new behavior will eventually be the 
default.  Naming is hard, but I wonder if a better config flag might be a bool 
named `use_legacy_callbacks`?  But either way, that's an implementation detail 
we don't necessarily need to hash out here.

- ferruzzi
________________________________
From: Sebastian Daum <[email protected]>
Sent: Friday, March 20, 2026 2:07 PM
To: [email protected] <[email protected]>
Subject: RE: [EXT] [DISCUSS] Should Dag-level and task-level callbacks be moved 
to Worker or Triggerer?

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe. Ne 
cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez pas 
confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que le 
contenu ne présente aucun risque.



Yes, that's exactly the idea. It would work as described. Furthermore—and
I've looked into this—we can internally wrap the callable as a SyncCallback
or AsyncCallback. This would also support the current way for defining and
passing callbacks in a DAG or task. Of course, only if you enable this
feature.

Ferruzzi, Dennis <[email protected]> schrieb am Fr., 20. März 2026, 20:27:

> As mentioned, I helped with the general idea and think it's the right
> direction; it's something I saw as the logical next step once the Deadline
> callbacks were sorted out.  In the new implementation, the user would
> define the on_*_callback as either a SyncCallback or an AsyncCallback, and
> by that decision they determine where the callback runs, just like a
> deadline callback right?
>
>
> with DAG(
>     dag_id="example_callback",
>     on_success_callback=SyncCallback(dag_success_alert),
>     default_args={"on_execute_callback":
> AsyncCallback(task_execute_callback)},
> ):
>     task1 = EmptyOperator(task_id="task1",
> on_failure_callback=[SyncCallback(task_failure_alert)])
>
>
>
> > the greater good.
> The greater goood
> ________________________________
> From: Jens Scheffler <[email protected]>
> Sent: Friday, March 20, 2026 11:53 AM
> To: [email protected] <[email protected]>
> Subject: RE: [EXT] [DISCUSS] Should Dag-level and task-level callbacks be
> moved to Worker or Triggerer?
>
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe.
> Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez
> pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que
> le contenu ne présente aucun risque.
>
>
>
> Hi Sebastian,
>
> thanks for kicking off the discussion. I am 100% for this. I ever
> wondered why it was implemented differently.
>
> I just fear a bit of complexity will come, but it will be for the
> greater good.
>
> Jens
>
> On 20.03.26 14:40, Sebastian Daum wrote:
> > Hello community,
> >
> > I'd like to discuss moving Dag-level and task-level callbacks from the
> Dag
> > Processor to either the Worker or Triggerer.
> >
> >
> > Background
> >
> > Airflow's new ExecutorCallback framework, introduced with Deadline Alerts
> > [1], provides a flexible approach for running callbacks. Deadline
> Callbacks
> > can now run within the Triggerer (for async code) or on a Worker as
> > scheduled callbacks (for synchronous code).
> >
> > This new Callback Framework opens up the possibility of moving all Dag
> > callbacks and task callbacks from the Dag Processor to the Worker and
> > Triggerer. This change would give callbacks the same level of isolation
> and
> > support as standard task workloads while freeing the Dag Processor from
> > callback execution responsibilities.
> >
> > This topic has appeared in previous devlist discussions, and there's an
> > open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag
> Processor
> > separation) explored running task callbacks in a Worker [3], though it
> also
> > highlighted potential downsides, particularly for the KubernetesExecutor
> > where spinning up a new pod for callback execution creates overhead
> > compared to using the existing Dag Processor.
> >
> >
> > Current Implementation
> >
> > Currently, both the Scheduler and Triggerer create CallbackRequests
> > (DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them
> to
> > the database using DbCallbackRequest/DagProcessorCallback.
> >
> > The DagFileProcessorManager fetches stored DbCallbackRequests,
> deserializes
> > them, and sends them to the file_queue along with the callback request. A
> > DagFileProcessorProcess then picks them up and executes them.
> >
> >
> > Proposed Changes
> >
> > Here are the key steps needed to implement this change:
> >
> > 1. Add new Airflow configuration: Add a new configuration option (e.g.,
> > use_worker_callbacks or similar) that allows users to opt into the new
> > callback execution behavior while maintaining the existing Dag Processor
> > approach as the default.
> >
> > 2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in
> > SyncCallable or AsyncCallable if not already provided with the new type.
> > This maintains backwards compatibility and preserves the current Dag
> > authoring experience.
> >
> > 3. Serialization Updates: Adjust Dag serialization to include Dag/Task
> > on_*_callback references. Like Deadline Callbacks, we would serialize
> only
> > the reference to the callable, not the callable itself.
> >
> > 4. Callback Consolidation: Combine the different DagProcessorCallback
> > implementations with ExecutorCallbacks and TriggererCallbacks.
> >
> > 5. Improved Typing: Enhance typing and class separation within the
> Callback
> > data structure. Currently, we differentiate between CallbackType:
> > Triggerer, Executor, and DAG Processor. It makes sense to implement
> better
> > type discrimination for DagCallback, TaskCallback, and potentially more
> > specific types like DagSuccessCallback, DagErrorCallback,
> > TaskSuccessCallback, EmailCallback, etc. This might warrant adding a new
> > table column and reconsidering the CallbackType field structure.
> >
> > 6. Component Updates: Modify the Scheduler and Triggerer to send
> > ExecutorCallbacks or TriggererCallbacks and store them in the database
> > instead of sending CallbackRequest via DatabaseSink.
> >
> > 7. Integration: Leverage the existing logic for running Deadline
> Callbacks.
> > However, if we decide to run callbacks on workers, we'll need to
> determine
> > how to handle and prioritize callbacks versus standard task workloads, as
> > well as different callback types (Deadline Alerts, Task Level Callbacks
> vs.
> > Dag Level Callbacks) in the scheduler. This connects to ongoing
> discussions
> > about 'Deadline Callback queuing and priority' [4].
> >
> > 8. Deprecation: Deprecate the existing
> > DagCallbackRequest/TaskCallbackRequest/CallbackSink process.
> >
> > I'd appreciate your thoughts on this proposal, particularly around:
> >
> > - Any concerns about the migration path
> >
> > - The prioritization strategy for callbacks vs. standard tasks
> >
> >
> > Thanks for your consideration!
> >
> > Sebastian
> >
> >
> > P.S. Many thanks to ferruzzi and shivaam for the discussions and
> feedback.
> >
> >
> > --------------------------------------------------------
> >
> > [1] ExecutorCallback Framework
> >
> > Executor Synchronous callback workload PR #61153
> > <https://github.com/apache/airflow/pull/61153>
> >
> > [2] Move dag-level callbacks to worker
> >
> > Issue #44354 <https://github.com/apache/airflow/issues/44354>
> >
> > [3] Dag Processor separation
> >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation
> >
> > [4] AIP-86 Deadline Callback queuing and priority
> >
> > https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to