o-nikolas commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2790718525


##########
airflow-core/src/airflow/executors/workloads/task.py:
##########
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task workload schemas for executor communication."""
+
+from __future__ import annotations
+
+import uuid
+from pathlib import Path
+from typing import TYPE_CHECKING, Literal
+
+from pydantic import BaseModel, Field
+
+from airflow.executors.workloads.base import BaseDagBundleWorkload, BundleInfo
+
+if TYPE_CHECKING:
+    from airflow.api_fastapi.auth.tokens import JWTGenerator
+    from airflow.models.taskinstance import TaskInstance as TIModel
+    from airflow.models.taskinstancekey import TaskInstanceKey
+
+
+class TaskInstanceDTO(BaseModel):
+    """Schema for TaskInstance with minimal required fields needed for 
Executors and Task SDK."""
+
+    id: uuid.UUID
+    dag_version_id: uuid.UUID
+    task_id: str
+    dag_id: str
+    run_id: str
+    try_number: int
+    map_index: int = -1
+
+    pool_slots: int
+    queue: str
+    priority_weight: int
+    executor_config: dict | None = Field(default=None, exclude=True)
+
+    parent_context_carrier: dict | None = None
+    context_carrier: dict | None = None
+
+    @property
+    def key(self) -> TaskInstanceKey:
+        """Return the TaskInstanceKey for this task instance."""
+        from airflow.models.taskinstancekey import TaskInstanceKey
+
+        return TaskInstanceKey(
+            dag_id=self.dag_id,
+            task_id=self.task_id,
+            run_id=self.run_id,
+            try_number=self.try_number,
+            map_index=self.map_index,
+        )
+
+
+class ExecuteTask(BaseDagBundleWorkload):
+    """Execute the given Task."""
+
+    ti: TaskInstanceDTO
+    sentry_integration: str = ""
+
+    type: Literal["ExecuteTask"] = Field(init=False, default="ExecuteTask")
+
+    @classmethod
+    def make(

Review Comment:
   This is all copy/paste right? No changes to how Airflow tasks are 
constructed?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py:
##########
@@ -109,8 +110,8 @@ def queued_tasks(self, value) -> None:
         """Not implemented for hybrid executors."""
 
     @property
-    def running(self) -> set[TaskInstanceKey]:

Review Comment:
   This executor is also deprecated.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -979,6 +985,70 @@ def _critical_section_enqueue_task_instances(self, 
session: Session) -> int:
 
         return len(queued_tis)
 
+    def _enqueue_executor_callbacks(self, session: Session) -> None:
+        """
+        Enqueue ExecutorCallback workloads to executors.
+
+        Similar to _enqueue_task_instances, but for callbacks that need to run 
on executors.
+        Queries for QUEUED ExecutorCallback instances and routes them to the 
appropriate executor.
+
+        :param session: The database session
+        """
+        num_occupied_slots = sum(executor.slots_occupied for executor in 
self.job.executors)
+        max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
+
+        if max_callbacks <= 0:
+            self.log.debug("No available slots for callbacks; all executors at 
capacity")
+            return
+
+        queued_callbacks = session.scalars(
+            select(ExecutorCallback)
+            .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+            .where(ExecutorCallback.state == CallbackState.QUEUED)
+            .order_by(ExecutorCallback.priority_weight.desc())
+            .limit(max_callbacks)
+        ).all()
+
+        if not queued_callbacks:
+            return
+
+        # Route callbacks to executors using the generalized routing method
+        executor_to_callbacks = self._executor_to_workloads(queued_callbacks, 
session)
+
+        # Enqueue callbacks for each executor
+        for executor, callbacks in executor_to_callbacks.items():
+            for callback in callbacks:
+                if not isinstance(callback, ExecutorCallback):
+                    # Can't happen since we queried ExecutorCallback, but 
satisfies mypy.
+                    continue
+                dag_run = None
+                if isinstance(callback.data, dict) and "dag_run_id" in 
callback.data:
+                    dag_run_id = callback.data["dag_run_id"]
+                    dag_run = session.get(DagRun, dag_run_id)
+                elif isinstance(callback.data, dict) and "dag_id" in 
callback.data:
+                    # Fallback: try to find the latest dag_run for the dag_id
+                    dag_id = callback.data["dag_id"]
+                    dag_run = session.scalars(
+                        select(DagRun)
+                        .where(DagRun.dag_id == dag_id)
+                        .order_by(DagRun.execution_date.desc())
+                        .limit(1)
+                    ).first()
+
+                if dag_run is None:
+                    self.log.warning("Could not find DagRun for callback %s", 
callback.id)
+                    continue
+
+                workload = workloads.ExecuteCallback.make(
+                    callback=callback,
+                    dag_run=dag_run,
+                    generator=executor.jwt_generator,
+                )
+
+                executor.queue_workload(workload, session=session)
+                callback.state = CallbackState.RUNNING

Review Comment:
   The state management is kind of strange here. For tasks we park them as 
SCHEDULED when they go through the scheduler, and QUEUED when they go through 
the executor, and RUNNING when they are actually on a worker and running. It 
seems like the callback state machine is one step earlier in all cases. Was 
this intentional?



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1041,21 +1111,44 @@ def process_executor_events(
         ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] 
= {}
         event_buffer = executor.get_event_buffer()
         tis_with_right_state: list[TaskInstanceKey] = []
-
-        # Report execution
-        for ti_key, (state, _) in event_buffer.items():
-            # We create map (dag_id, task_id, logical_date) -> in-memory 
try_number
-            ti_primary_key_to_try_number_map[ti_key.primary] = 
ti_key.try_number
-
-            cls.logger().info("Received executor event with state %s for task 
instance %s", state, ti_key)
-            if state in (
-                TaskInstanceState.FAILED,
-                TaskInstanceState.SUCCESS,
-                TaskInstanceState.QUEUED,
-                TaskInstanceState.RUNNING,
-                TaskInstanceState.RESTARTING,
-            ):
-                tis_with_right_state.append(ti_key)
+        callback_keys_with_events: list[str] = []
+
+        # Report execution - handle both task and callback events
+        for key, (state, _) in event_buffer.items():
+            if isinstance(key, TaskInstanceKey):
+                ti_primary_key_to_try_number_map[key.primary] = key.try_number
+                cls.logger().info("Received executor event with state %s for 
task instance %s", state, key)
+                if state in (
+                    TaskInstanceState.FAILED,
+                    TaskInstanceState.SUCCESS,
+                    TaskInstanceState.QUEUED,
+                    TaskInstanceState.RUNNING,
+                    TaskInstanceState.RESTARTING,
+                ):
+                    tis_with_right_state.append(key)
+            else:
+                # Callback event (key is string UUID)
+                cls.logger().info("Received executor event with state %s for 
callback %s", state, key)
+                if state in (TaskInstanceState.FAILED, 
TaskInstanceState.SUCCESS):
+                    callback_keys_with_events.append(key)
+
+        # Handle callback completion events
+        for callback_id in callback_keys_with_events:
+            state, info = event_buffer.pop(callback_id)
+            callback = session.get(Callback, callback_id)
+            if callback:

Review Comment:
   Why would we not have callback if we received an event for it?



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -352,32 +353,37 @@ def _get_team_names_for_dag_ids(
             # Return dict with all None values to ensure graceful degradation
             return {}
 
-    def _get_task_team_name(self, task_instance: TaskInstance, session: 
Session) -> str | None:
+    def _get_workload_team_name(self, workload: SchedulerWorkload, session: 
Session) -> str | None:
         """
-        Resolve team name for a task instance using the DAG > Bundle > Team 
relationship chain.
+        Resolve team name for a workload using the DAG > Bundle > Team 
relationship chain.
 
-        TaskInstance > DagModel (via dag_id) > DagBundleModel (via 
bundle_name) > Team
+        SchedulerWorkload > DagModel (via dag_id) > DagBundleModel (via 
bundle_name) > Team
 
-        :param task_instance: The TaskInstance to resolve team name for
+        :param workload: The SchedulerWorkload to resolve team name for
         :param session: Database session for queries
         :return: Team name if found or None
         """
+        dag_id = workload.get_dag_id()
+        if dag_id is None:

Review Comment:
   Can this only happen in the case of a Callback? Or is this something that 
can happen to a task now?



##########
providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -100,11 +101,15 @@ def _task_event_logs(self):
     def _task_event_logs(self, value):
         """Not implemented for hybrid executors."""
 
-    @property

Review Comment:
   This executor is deprecated (see the note 
[here](https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/celery_kubernetes_executor.html)
 for example), we don't have to make updates to this executor to support these 
new features.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -352,32 +353,37 @@ def _get_team_names_for_dag_ids(
             # Return dict with all None values to ensure graceful degradation
             return {}
 
-    def _get_task_team_name(self, task_instance: TaskInstance, session: 
Session) -> str | None:
+    def _get_workload_team_name(self, workload: SchedulerWorkload, session: 
Session) -> str | None:
         """
-        Resolve team name for a task instance using the DAG > Bundle > Team 
relationship chain.
+        Resolve team name for a workload using the DAG > Bundle > Team 
relationship chain.
 
-        TaskInstance > DagModel (via dag_id) > DagBundleModel (via 
bundle_name) > Team
+        SchedulerWorkload > DagModel (via dag_id) > DagBundleModel (via 
bundle_name) > Team
 
-        :param task_instance: The TaskInstance to resolve team name for
+        :param workload: The SchedulerWorkload to resolve team name for
         :param session: Database session for queries
         :return: Team name if found or None
         """
+        dag_id = workload.get_dag_id()
+        if dag_id is None:
+            self.log.debug("Workload %s has no dag_id, cannot resolve team 
name", workload)

Review Comment:
   Should this be a warning or error message? Or even an exception? I'm not 
sure what this means if it doesn't exist.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -979,6 +985,70 @@ def _critical_section_enqueue_task_instances(self, 
session: Session) -> int:
 
         return len(queued_tis)
 
+    def _enqueue_executor_callbacks(self, session: Session) -> None:
+        """
+        Enqueue ExecutorCallback workloads to executors.
+
+        Similar to _enqueue_task_instances, but for callbacks that need to run 
on executors.
+        Queries for QUEUED ExecutorCallback instances and routes them to the 
appropriate executor.
+
+        :param session: The database session
+        """
+        num_occupied_slots = sum(executor.slots_occupied for executor in 
self.job.executors)
+        max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
+
+        if max_callbacks <= 0:
+            self.log.debug("No available slots for callbacks; all executors at 
capacity")
+            return
+
+        queued_callbacks = session.scalars(
+            select(ExecutorCallback)
+            .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+            .where(ExecutorCallback.state == CallbackState.QUEUED)
+            .order_by(ExecutorCallback.priority_weight.desc())
+            .limit(max_callbacks)
+        ).all()
+
+        if not queued_callbacks:
+            return
+
+        # Route callbacks to executors using the generalized routing method
+        executor_to_callbacks = self._executor_to_workloads(queued_callbacks, 
session)
+
+        # Enqueue callbacks for each executor
+        for executor, callbacks in executor_to_callbacks.items():
+            for callback in callbacks:
+                if not isinstance(callback, ExecutorCallback):
+                    # Can't happen since we queried ExecutorCallback, but 
satisfies mypy.
+                    continue
+                dag_run = None

Review Comment:
   Why do we do so much work to determine the dag_run? This feels like it's a 
missing input to the callback workload data object when it's created? At that 
time we should know the task or dag that created the callback. It feels strange 
to me that we're trying to figure it out after the fact. But I'm probably just 
struggling to understand the architecture here.



##########
airflow-core/src/airflow/executors/workloads/callback.py:
##########
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Callback workload schemas for executor communication."""
+
+from __future__ import annotations
+
+from enum import Enum
+from importlib import import_module
+from pathlib import Path
+from typing import TYPE_CHECKING, Literal
+from uuid import UUID
+
+import structlog
+from pydantic import BaseModel, Field, field_validator
+
+from airflow.executors.workloads.base import BaseDagBundleWorkload, BundleInfo
+
+if TYPE_CHECKING:
+    from airflow.api_fastapi.auth.tokens import JWTGenerator
+    from airflow.models import DagRun
+    from airflow.models.callback import Callback as CallbackModel, CallbackKey
+
+log = structlog.get_logger(__name__)
+
+
+class CallbackFetchMethod(str, Enum):
+    """Methods used to fetch callback at runtime."""
+
+    # For future use once Dag Processor callbacks 
(on_success_callback/on_failure_callback) get moved to executors
+    DAG_ATTRIBUTE = "dag_attribute"
+
+    # For deadline callbacks since they import callbacks through the import 
path
+    IMPORT_PATH = "import_path"
+
+
+class CallbackDTO(BaseModel):
+    """Schema for Callback with minimal required fields needed for Executors 
and Task SDK."""
+
+    id: str  # A uuid.UUID stored as a string
+    fetch_method: CallbackFetchMethod
+    data: dict
+
+    @field_validator("id", mode="before")
+    @classmethod
+    def validate_id(cls, v):
+        """Convert UUID to str if needed."""
+        if isinstance(v, UUID):
+            return str(v)
+        return v
+
+    @property
+    def key(self) -> CallbackKey:
+        """Return callback ID as key (CallbackKey = str)."""
+        return self.id
+
+
+class ExecuteCallback(BaseDagBundleWorkload):
+    """Execute the given Callback."""
+
+    callback: CallbackDTO
+
+    type: Literal["ExecuteCallback"] = Field(init=False, 
default="ExecuteCallback")
+
+    @classmethod
+    def make(
+        cls,
+        callback: CallbackModel,
+        dag_run: DagRun,
+        dag_rel_path: Path | None = None,
+        generator: JWTGenerator | None = None,
+        bundle_info: BundleInfo | None = None,
+    ) -> ExecuteCallback:
+        """Create an ExecuteCallback workload from a Callback ORM model."""
+        if not bundle_info:
+            bundle_info = BundleInfo(
+                name=dag_run.dag_model.bundle_name,
+                version=dag_run.bundle_version,
+            )
+        fname = f"executor_callbacks/{callback.id}"  # TODO: better log file 
template
+
+        return cls(
+            callback=CallbackDTO.model_validate(callback, 
from_attributes=True),
+            dag_rel_path=dag_rel_path or 
Path(dag_run.dag_model.relative_fileloc or ""),
+            token=cls.generate_token(str(callback.id), generator),
+            log_path=fname,
+            bundle_info=bundle_info,
+        )
+
+
+def execute_callback_workload(
+    callback: CallbackDTO,
+    log,
+) -> tuple[bool, str | None]:
+    """
+    Execute a callback function by importing and calling it, returning the 
success state.
+
+    Supports two patterns:
+    1. Functions - called directly with kwargs
+    2. Classes that return callable instances (like BaseNotifier) - 
instantiated then called with context
+
+    Example:
+        # Function callback
+        callback.data = {"path": "my_module.alert_func", "kwargs": {"msg": 
"Alert!", "context": {...}}}
+        execute_callback_workload(callback, log)  # Calls 
alert_func(msg="Alert!", context={...})
+
+        # Notifier callback
+        callback.data = {"path": 
"airflow.providers.slack...SlackWebhookNotifier", "kwargs": {"text": "Alert!", 
"context": {...}}}
+        execute_callback_workload(callback, log)  # 
SlackWebhookNotifier(text=..., context=...) then calls instance(context)
+
+    :param callback: The Callback schema containing path and kwargs
+    :param log: Logger instance for recording execution
+    :return: Tuple of (success: bool, error_message: str | None)
+    """
+    callback_path = callback.data.get("path")
+    callback_kwargs = callback.data.get("kwargs", {})
+
+    if not callback_path:
+        return False, "Callback path not found in data."
+
+    try:
+        # Import the callback callable
+        # Expected format: "module.path.to.function_or_class"
+        module_path, function_name = callback_path.rsplit(".", 1)
+        module = import_module(module_path)
+        callback_callable = getattr(module, function_name)
+
+        log.debug("Executing callback %s(%s)...", callback_path, 
callback_kwargs)
+
+        # If the callback is a callabale, call it.  If it is a class, 
instantiate it.
+        result = callback_callable(**callback_kwargs)
+
+        # If the callback is a class then it is now instantiated and callable, 
call it.
+        if callable(result):
+            context = callback_kwargs.get("context", {})
+            log.debug("Calling result with context for %s", callback_path)
+            result = result(context)
+
+        log.info("Callback %s executed successfully.", callback_path)
+        return True, None

Review Comment:
   The doc string says
   > calling it, returning the success state.
   
   Is `True` here basically casted to some success state later up the call 
stack?



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -495,24 +534,26 @@ def running_state(self, key: TaskInstanceKey, info=None) 
-> None:
         """
         self.change_state(key, TaskInstanceState.RUNNING, info, 
remove_running=False)
 
-    def get_event_buffer(self, dag_ids=None) -> dict[TaskInstanceKey, 
EventBufferValueType]:
+    def get_event_buffer(self, dag_ids=None) -> dict[WorkloadKey, 
EventBufferValueType]:
         """
         Return and flush the event buffer.
 
         In case dag_ids is specified it will only return and flush events
         for the given dag_ids. Otherwise, it returns and flushes all events.
+        Note: Callback events (with string keys) are always included 
regardless of dag_ids filter.
 
         :param dag_ids: the dag_ids to return events for; returns all if given 
``None``.
         :return: a dict of events
         """
-        cleared_events: dict[TaskInstanceKey, EventBufferValueType] = {}
+        cleared_events: dict[WorkloadKey, EventBufferValueType] = {}
         if dag_ids is None:
             cleared_events = self.event_buffer
             self.event_buffer = {}
         else:
-            for ti_key in list(self.event_buffer.keys()):
-                if ti_key.dag_id in dag_ids:
-                    cleared_events[ti_key] = self.event_buffer.pop(ti_key)
+            for key in list(self.event_buffer.keys()):
+                # Include if it's a callback (string key) or if it's a task in 
the specified dags
+                if isinstance(key, str) or key.dag_id in dag_ids:

Review Comment:
   I think in the future we'll likely want a more reliable check than just str 
(since other workload types may also have string keys but require different 
behaviour).
   



##########
airflow-core/src/airflow/executors/workloads/callback.py:
##########
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Callback workload schemas for executor communication."""
+
+from __future__ import annotations
+
+from enum import Enum
+from importlib import import_module
+from pathlib import Path
+from typing import TYPE_CHECKING, Literal
+from uuid import UUID
+
+import structlog
+from pydantic import BaseModel, Field, field_validator
+
+from airflow.executors.workloads.base import BaseDagBundleWorkload, BundleInfo
+
+if TYPE_CHECKING:
+    from airflow.api_fastapi.auth.tokens import JWTGenerator
+    from airflow.models import DagRun
+    from airflow.models.callback import Callback as CallbackModel, CallbackKey
+
+log = structlog.get_logger(__name__)
+
+
+class CallbackFetchMethod(str, Enum):
+    """Methods used to fetch callback at runtime."""
+
+    # For future use once Dag Processor callbacks 
(on_success_callback/on_failure_callback) get moved to executors
+    DAG_ATTRIBUTE = "dag_attribute"
+
+    # For deadline callbacks since they import callbacks through the import 
path
+    IMPORT_PATH = "import_path"
+
+
+class CallbackDTO(BaseModel):
+    """Schema for Callback with minimal required fields needed for Executors 
and Task SDK."""
+
+    id: str  # A uuid.UUID stored as a string
+    fetch_method: CallbackFetchMethod
+    data: dict
+
+    @field_validator("id", mode="before")
+    @classmethod
+    def validate_id(cls, v):
+        """Convert UUID to str if needed."""
+        if isinstance(v, UUID):
+            return str(v)
+        return v
+
+    @property
+    def key(self) -> CallbackKey:
+        """Return callback ID as key (CallbackKey = str)."""
+        return self.id
+
+
+class ExecuteCallback(BaseDagBundleWorkload):
+    """Execute the given Callback."""
+
+    callback: CallbackDTO
+
+    type: Literal["ExecuteCallback"] = Field(init=False, 
default="ExecuteCallback")
+
+    @classmethod
+    def make(
+        cls,
+        callback: CallbackModel,
+        dag_run: DagRun,
+        dag_rel_path: Path | None = None,
+        generator: JWTGenerator | None = None,
+        bundle_info: BundleInfo | None = None,
+    ) -> ExecuteCallback:
+        """Create an ExecuteCallback workload from a Callback ORM model."""
+        if not bundle_info:
+            bundle_info = BundleInfo(
+                name=dag_run.dag_model.bundle_name,
+                version=dag_run.bundle_version,
+            )
+        fname = f"executor_callbacks/{callback.id}"  # TODO: better log file 
template

Review Comment:
   Did we want to leave this todo undone? Or was that meant to be finished 
before merging?



##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""ORM models and Pydantic schemas for BaseWorkload."""
+
+from __future__ import annotations
+
+import os
+from abc import ABC
+from typing import TYPE_CHECKING
+
+from pydantic import BaseModel
+
+if TYPE_CHECKING:
+    from airflow.api_fastapi.auth.tokens import JWTGenerator
+
+
+class BaseWorkload:
+    """
+    Mixin for ORM models that can be scheduled as workloads.
+
+    This mixin defines the interface that scheduler workloads (TaskInstance,
+    ExecutorCallback, etc.) must implement to provide routing information to 
the scheduler.
+
+    Subclasses must override:
+    - get_dag_id() -> str | None
+    - get_executor_name() -> str | None
+    """
+
+    def get_dag_id(self) -> str | None:
+        """
+        Return the DAG ID for scheduler routing.
+
+        Must be implemented by subclasses.
+        """
+        raise NotImplementedError(f"{self.__class__.__name__} must implement 
get_dag_id()")
+
+    def get_executor_name(self) -> str | None:
+        """
+        Return the executor name for scheduler routing.
+
+        Must be implemented by subclasses.
+        """
+        raise NotImplementedError(f"{self.__class__.__name__} must implement 
get_executor_name()")
+
+
+class BundleInfo(BaseModel):
+    """Schema for telling task which bundle to run with."""
+
+    name: str
+    version: str | None = None
+
+
+class BaseWorkloadSchema(BaseModel):
+    """Base Pydantic schema for executor workload DTOs."""
+
+    token: str  # The identity token for this workload.

Review Comment:
   Put the doc in the name?
   ```suggestion
       identity_token: str
   ```



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -945,21 +1042,44 @@ def process_executor_events(
         ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] 
= {}
         event_buffer = executor.get_event_buffer()
         tis_with_right_state: list[TaskInstanceKey] = []
-
-        # Report execution
-        for ti_key, (state, _) in event_buffer.items():
-            # We create map (dag_id, task_id, logical_date) -> in-memory 
try_number
-            ti_primary_key_to_try_number_map[ti_key.primary] = 
ti_key.try_number
-
-            cls.logger().info("Received executor event with state %s for task 
instance %s", state, ti_key)
-            if state in (
-                TaskInstanceState.FAILED,
-                TaskInstanceState.SUCCESS,
-                TaskInstanceState.QUEUED,
-                TaskInstanceState.RUNNING,
-                TaskInstanceState.RESTARTING,
-            ):
-                tis_with_right_state.append(ti_key)
+        callback_keys_with_events: list[str] = []
+
+        # Report execution - handle both task and callback events
+        for key, (state, _) in event_buffer.items():
+            if isinstance(key, TaskInstanceKey):
+                ti_primary_key_to_try_number_map[key.primary] = key.try_number
+                cls.logger().info("Received executor event with state %s for 
task instance %s", state, key)
+                if state in (
+                    TaskInstanceState.FAILED,
+                    TaskInstanceState.SUCCESS,
+                    TaskInstanceState.QUEUED,
+                    TaskInstanceState.RUNNING,
+                    TaskInstanceState.RESTARTING,
+                ):
+                    tis_with_right_state.append(key)
+            else:
+                # Callback event (key is string UUID)
+                cls.logger().info("Received executor event with state %s for 
callback %s", state, key)
+                if state in (TaskInstanceState.FAILED, 
TaskInstanceState.SUCCESS):
+                    callback_keys_with_events.append(key)
+
+        # Handle callback completion events
+        for callback_id in callback_keys_with_events:
+            state, info = event_buffer.pop(callback_id)
+            callback = session.get(Callback, callback_id)
+            if callback:
+                # Note: We receive TaskInstanceState from executor 
(SUCCESS/FAILED) but convert to CallbackState here.
+                # This is intentional - executor layer uses generic completion 
states, scheduler converts to proper types.
+                if state == TaskInstanceState.SUCCESS:

Review Comment:
   Still looks like this is using TI state for callbacks? Even though we now 
have full a full callback state machine from the rest of your changes here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to