Are there any further thoughts on this topic?

Ferruzzi, Dennis <[email protected]> schrieb am Fr., 20. März 2026, 23:33:

> Cool, just checking.   Yeah, I like the idea and would also like to make
> sure when we say we're putting a config option in there, that there is some
> clearly documented deprecation path so that the new behavior will
> eventually be the default.  Naming is hard, but I wonder if a better config
> flag might be a bool named `use_legacy_callbacks`?  But either way, that's
> an implementation detail we don't necessarily need to hash out here.
>
> - ferruzzi
> ________________________________
> From: Sebastian Daum <[email protected]>
> Sent: Friday, March 20, 2026 2:07 PM
> To: [email protected] <[email protected]>
> Subject: RE: [EXT] [DISCUSS] Should Dag-level and task-level callbacks be
> moved to Worker or Triggerer?
>
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe.
> Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez
> pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que
> le contenu ne présente aucun risque.
>
>
>
> Yes, that's exactly the idea. It would work as described. Furthermore—and
> I've looked into this—we can internally wrap the callable as a SyncCallback
> or AsyncCallback. This would also support the current way for defining and
> passing callbacks in a DAG or task. Of course, only if you enable this
> feature.
>
> Ferruzzi, Dennis <[email protected]> schrieb am Fr., 20. März 2026,
> 20:27:
>
> > As mentioned, I helped with the general idea and think it's the right
> > direction; it's something I saw as the logical next step once the
> Deadline
> > callbacks were sorted out.  In the new implementation, the user would
> > define the on_*_callback as either a SyncCallback or an AsyncCallback,
> and
> > by that decision they determine where the callback runs, just like a
> > deadline callback right?
> >
> >
> > with DAG(
> >     dag_id="example_callback",
> >     on_success_callback=SyncCallback(dag_success_alert),
> >     default_args={"on_execute_callback":
> > AsyncCallback(task_execute_callback)},
> > ):
> >     task1 = EmptyOperator(task_id="task1",
> > on_failure_callback=[SyncCallback(task_failure_alert)])
> >
> >
> >
> > > the greater good.
> > The greater goood
> > ________________________________
> > From: Jens Scheffler <[email protected]>
> > Sent: Friday, March 20, 2026 11:53 AM
> > To: [email protected] <[email protected]>
> > Subject: RE: [EXT] [DISCUSS] Should Dag-level and task-level callbacks be
> > moved to Worker or Triggerer?
> >
> > CAUTION: This email originated from outside of the organization. Do not
> > click links or open attachments unless you can confirm the sender and
> know
> > the content is safe.
> >
> >
> >
> > AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe.
> > Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne
> pouvez
> > pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain
> que
> > le contenu ne présente aucun risque.
> >
> >
> >
> > Hi Sebastian,
> >
> > thanks for kicking off the discussion. I am 100% for this. I ever
> > wondered why it was implemented differently.
> >
> > I just fear a bit of complexity will come, but it will be for the
> > greater good.
> >
> > Jens
> >
> > On 20.03.26 14:40, Sebastian Daum wrote:
> > > Hello community,
> > >
> > > I'd like to discuss moving Dag-level and task-level callbacks from the
> > Dag
> > > Processor to either the Worker or Triggerer.
> > >
> > >
> > > Background
> > >
> > > Airflow's new ExecutorCallback framework, introduced with Deadline
> Alerts
> > > [1], provides a flexible approach for running callbacks. Deadline
> > Callbacks
> > > can now run within the Triggerer (for async code) or on a Worker as
> > > scheduled callbacks (for synchronous code).
> > >
> > > This new Callback Framework opens up the possibility of moving all Dag
> > > callbacks and task callbacks from the Dag Processor to the Worker and
> > > Triggerer. This change would give callbacks the same level of isolation
> > and
> > > support as standard task workloads while freeing the Dag Processor from
> > > callback execution responsibilities.
> > >
> > > This topic has appeared in previous devlist discussions, and there's an
> > > open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag
> > Processor
> > > separation) explored running task callbacks in a Worker [3], though it
> > also
> > > highlighted potential downsides, particularly for the
> KubernetesExecutor
> > > where spinning up a new pod for callback execution creates overhead
> > > compared to using the existing Dag Processor.
> > >
> > >
> > > Current Implementation
> > >
> > > Currently, both the Scheduler and Triggerer create CallbackRequests
> > > (DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them
> > to
> > > the database using DbCallbackRequest/DagProcessorCallback.
> > >
> > > The DagFileProcessorManager fetches stored DbCallbackRequests,
> > deserializes
> > > them, and sends them to the file_queue along with the callback
> request. A
> > > DagFileProcessorProcess then picks them up and executes them.
> > >
> > >
> > > Proposed Changes
> > >
> > > Here are the key steps needed to implement this change:
> > >
> > > 1. Add new Airflow configuration: Add a new configuration option (e.g.,
> > > use_worker_callbacks or similar) that allows users to opt into the new
> > > callback execution behavior while maintaining the existing Dag
> Processor
> > > approach as the default.
> > >
> > > 2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in
> > > SyncCallable or AsyncCallable if not already provided with the new
> type.
> > > This maintains backwards compatibility and preserves the current Dag
> > > authoring experience.
> > >
> > > 3. Serialization Updates: Adjust Dag serialization to include Dag/Task
> > > on_*_callback references. Like Deadline Callbacks, we would serialize
> > only
> > > the reference to the callable, not the callable itself.
> > >
> > > 4. Callback Consolidation: Combine the different DagProcessorCallback
> > > implementations with ExecutorCallbacks and TriggererCallbacks.
> > >
> > > 5. Improved Typing: Enhance typing and class separation within the
> > Callback
> > > data structure. Currently, we differentiate between CallbackType:
> > > Triggerer, Executor, and DAG Processor. It makes sense to implement
> > better
> > > type discrimination for DagCallback, TaskCallback, and potentially more
> > > specific types like DagSuccessCallback, DagErrorCallback,
> > > TaskSuccessCallback, EmailCallback, etc. This might warrant adding a
> new
> > > table column and reconsidering the CallbackType field structure.
> > >
> > > 6. Component Updates: Modify the Scheduler and Triggerer to send
> > > ExecutorCallbacks or TriggererCallbacks and store them in the database
> > > instead of sending CallbackRequest via DatabaseSink.
> > >
> > > 7. Integration: Leverage the existing logic for running Deadline
> > Callbacks.
> > > However, if we decide to run callbacks on workers, we'll need to
> > determine
> > > how to handle and prioritize callbacks versus standard task workloads,
> as
> > > well as different callback types (Deadline Alerts, Task Level Callbacks
> > vs.
> > > Dag Level Callbacks) in the scheduler. This connects to ongoing
> > discussions
> > > about 'Deadline Callback queuing and priority' [4].
> > >
> > > 8. Deprecation: Deprecate the existing
> > > DagCallbackRequest/TaskCallbackRequest/CallbackSink process.
> > >
> > > I'd appreciate your thoughts on this proposal, particularly around:
> > >
> > > - Any concerns about the migration path
> > >
> > > - The prioritization strategy for callbacks vs. standard tasks
> > >
> > >
> > > Thanks for your consideration!
> > >
> > > Sebastian
> > >
> > >
> > > P.S. Many thanks to ferruzzi and shivaam for the discussions and
> > feedback.
> > >
> > >
> > > --------------------------------------------------------
> > >
> > > [1] ExecutorCallback Framework
> > >
> > > Executor Synchronous callback workload PR #61153
> > > <https://github.com/apache/airflow/pull/61153>
> > >
> > > [2] Move dag-level callbacks to worker
> > >
> > > Issue #44354 <https://github.com/apache/airflow/issues/44354>
> > >
> > > [3] Dag Processor separation
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation
> > >
> > > [4] AIP-86 Deadline Callback queuing and priority
> > >
> > > https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w
> > >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [email protected]
> > For additional commands, e-mail: [email protected]
> >
> >
>

Reply via email to