Yes, that's exactly the idea. It would work as described. Furthermore—and I've looked into this—we can internally wrap the callable as a SyncCallback or AsyncCallback. This would also support the current way for defining and passing callbacks in a DAG or task. Of course, only if you enable this feature.
Ferruzzi, Dennis <[email protected]> schrieb am Fr., 20. März 2026, 20:27: > As mentioned, I helped with the general idea and think it's the right > direction; it's something I saw as the logical next step once the Deadline > callbacks were sorted out. In the new implementation, the user would > define the on_*_callback as either a SyncCallback or an AsyncCallback, and > by that decision they determine where the callback runs, just like a > deadline callback right? > > > with DAG( > dag_id="example_callback", > on_success_callback=SyncCallback(dag_success_alert), > default_args={"on_execute_callback": > AsyncCallback(task_execute_callback)}, > ): > task1 = EmptyOperator(task_id="task1", > on_failure_callback=[SyncCallback(task_failure_alert)]) > > > > > the greater good. > The greater goood > ________________________________ > From: Jens Scheffler <[email protected]> > Sent: Friday, March 20, 2026 11:53 AM > To: [email protected] <[email protected]> > Subject: RE: [EXT] [DISCUSS] Should Dag-level and task-level callbacks be > moved to Worker or Triggerer? > > CAUTION: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe. > Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez > pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que > le contenu ne présente aucun risque. > > > > Hi Sebastian, > > thanks for kicking off the discussion. I am 100% for this. I ever > wondered why it was implemented differently. > > I just fear a bit of complexity will come, but it will be for the > greater good. > > Jens > > On 20.03.26 14:40, Sebastian Daum wrote: > > Hello community, > > > > I'd like to discuss moving Dag-level and task-level callbacks from the > Dag > > Processor to either the Worker or Triggerer. > > > > > > Background > > > > Airflow's new ExecutorCallback framework, introduced with Deadline Alerts > > [1], provides a flexible approach for running callbacks. Deadline > Callbacks > > can now run within the Triggerer (for async code) or on a Worker as > > scheduled callbacks (for synchronous code). > > > > This new Callback Framework opens up the possibility of moving all Dag > > callbacks and task callbacks from the Dag Processor to the Worker and > > Triggerer. This change would give callbacks the same level of isolation > and > > support as standard task workloads while freeing the Dag Processor from > > callback execution responsibilities. > > > > This topic has appeared in previous devlist discussions, and there's an > > open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag > Processor > > separation) explored running task callbacks in a Worker [3], though it > also > > highlighted potential downsides, particularly for the KubernetesExecutor > > where spinning up a new pod for callback execution creates overhead > > compared to using the existing Dag Processor. > > > > > > Current Implementation > > > > Currently, both the Scheduler and Triggerer create CallbackRequests > > (DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them > to > > the database using DbCallbackRequest/DagProcessorCallback. > > > > The DagFileProcessorManager fetches stored DbCallbackRequests, > deserializes > > them, and sends them to the file_queue along with the callback request. A > > DagFileProcessorProcess then picks them up and executes them. > > > > > > Proposed Changes > > > > Here are the key steps needed to implement this change: > > > > 1. Add new Airflow configuration: Add a new configuration option (e.g., > > use_worker_callbacks or similar) that allows users to opt into the new > > callback execution behavior while maintaining the existing Dag Processor > > approach as the default. > > > > 2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in > > SyncCallable or AsyncCallable if not already provided with the new type. > > This maintains backwards compatibility and preserves the current Dag > > authoring experience. > > > > 3. Serialization Updates: Adjust Dag serialization to include Dag/Task > > on_*_callback references. Like Deadline Callbacks, we would serialize > only > > the reference to the callable, not the callable itself. > > > > 4. Callback Consolidation: Combine the different DagProcessorCallback > > implementations with ExecutorCallbacks and TriggererCallbacks. > > > > 5. Improved Typing: Enhance typing and class separation within the > Callback > > data structure. Currently, we differentiate between CallbackType: > > Triggerer, Executor, and DAG Processor. It makes sense to implement > better > > type discrimination for DagCallback, TaskCallback, and potentially more > > specific types like DagSuccessCallback, DagErrorCallback, > > TaskSuccessCallback, EmailCallback, etc. This might warrant adding a new > > table column and reconsidering the CallbackType field structure. > > > > 6. Component Updates: Modify the Scheduler and Triggerer to send > > ExecutorCallbacks or TriggererCallbacks and store them in the database > > instead of sending CallbackRequest via DatabaseSink. > > > > 7. Integration: Leverage the existing logic for running Deadline > Callbacks. > > However, if we decide to run callbacks on workers, we'll need to > determine > > how to handle and prioritize callbacks versus standard task workloads, as > > well as different callback types (Deadline Alerts, Task Level Callbacks > vs. > > Dag Level Callbacks) in the scheduler. This connects to ongoing > discussions > > about 'Deadline Callback queuing and priority' [4]. > > > > 8. Deprecation: Deprecate the existing > > DagCallbackRequest/TaskCallbackRequest/CallbackSink process. > > > > I'd appreciate your thoughts on this proposal, particularly around: > > > > - Any concerns about the migration path > > > > - The prioritization strategy for callbacks vs. standard tasks > > > > > > Thanks for your consideration! > > > > Sebastian > > > > > > P.S. Many thanks to ferruzzi and shivaam for the discussions and > feedback. > > > > > > -------------------------------------------------------- > > > > [1] ExecutorCallback Framework > > > > Executor Synchronous callback workload PR #61153 > > <https://github.com/apache/airflow/pull/61153> > > > > [2] Move dag-level callbacks to worker > > > > Issue #44354 <https://github.com/apache/airflow/issues/44354> > > > > [3] Dag Processor separation > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation > > > > [4] AIP-86 Deadline Callback queuing and priority > > > > https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
