Hello community, I'd like to discuss moving Dag-level and task-level callbacks from the Dag Processor to either the Worker or Triggerer.
Background Airflow's new ExecutorCallback framework, introduced with Deadline Alerts [1], provides a flexible approach for running callbacks. Deadline Callbacks can now run within the Triggerer (for async code) or on a Worker as scheduled callbacks (for synchronous code). This new Callback Framework opens up the possibility of moving all Dag callbacks and task callbacks from the Dag Processor to the Worker and Triggerer. This change would give callbacks the same level of isolation and support as standard task workloads while freeing the Dag Processor from callback execution responsibilities. This topic has appeared in previous devlist discussions, and there's an open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag Processor separation) explored running task callbacks in a Worker [3], though it also highlighted potential downsides, particularly for the KubernetesExecutor where spinning up a new pod for callback execution creates overhead compared to using the existing Dag Processor. Current Implementation Currently, both the Scheduler and Triggerer create CallbackRequests (DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them to the database using DbCallbackRequest/DagProcessorCallback. The DagFileProcessorManager fetches stored DbCallbackRequests, deserializes them, and sends them to the file_queue along with the callback request. A DagFileProcessorProcess then picks them up and executes them. Proposed Changes Here are the key steps needed to implement this change: 1. Add new Airflow configuration: Add a new configuration option (e.g., use_worker_callbacks or similar) that allows users to opt into the new callback execution behavior while maintaining the existing Dag Processor approach as the default. 2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in SyncCallable or AsyncCallable if not already provided with the new type. This maintains backwards compatibility and preserves the current Dag authoring experience. 3. Serialization Updates: Adjust Dag serialization to include Dag/Task on_*_callback references. Like Deadline Callbacks, we would serialize only the reference to the callable, not the callable itself. 4. Callback Consolidation: Combine the different DagProcessorCallback implementations with ExecutorCallbacks and TriggererCallbacks. 5. Improved Typing: Enhance typing and class separation within the Callback data structure. Currently, we differentiate between CallbackType: Triggerer, Executor, and DAG Processor. It makes sense to implement better type discrimination for DagCallback, TaskCallback, and potentially more specific types like DagSuccessCallback, DagErrorCallback, TaskSuccessCallback, EmailCallback, etc. This might warrant adding a new table column and reconsidering the CallbackType field structure. 6. Component Updates: Modify the Scheduler and Triggerer to send ExecutorCallbacks or TriggererCallbacks and store them in the database instead of sending CallbackRequest via DatabaseSink. 7. Integration: Leverage the existing logic for running Deadline Callbacks. However, if we decide to run callbacks on workers, we'll need to determine how to handle and prioritize callbacks versus standard task workloads, as well as different callback types (Deadline Alerts, Task Level Callbacks vs. Dag Level Callbacks) in the scheduler. This connects to ongoing discussions about 'Deadline Callback queuing and priority' [4]. 8. Deprecation: Deprecate the existing DagCallbackRequest/TaskCallbackRequest/CallbackSink process. I'd appreciate your thoughts on this proposal, particularly around: - Any concerns about the migration path - The prioritization strategy for callbacks vs. standard tasks Thanks for your consideration! Sebastian P.S. Many thanks to ferruzzi and shivaam for the discussions and feedback. -------------------------------------------------------- [1] ExecutorCallback Framework Executor Synchronous callback workload PR #61153 <https://github.com/apache/airflow/pull/61153> [2] Move dag-level callbacks to worker Issue #44354 <https://github.com/apache/airflow/issues/44354> [3] Dag Processor separation https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation [4] AIP-86 Deadline Callback queuing and priority https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w
