ashb commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r3241425228


##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -32,6 +33,28 @@
     from airflow.executors.workloads.types import WorkloadState
 
 
+class WorkloadType(str, Enum):
+    """Central registry of executor workload types."""
+
+    EXECUTE_TASK = "ExecuteTask"
+    EXECUTE_CALLBACK = "ExecuteCallback"
+
+
+# Central executor priority registry: tuple is ordered from highest priority 
to lowest.
+#
+# Adding a new workload type is a three-place change that must stay in sync:
+#   1. ``WorkloadType`` — declare the enum member.
+#   2. ``_workload_type_priority_order`` — insert it at the right priority 
slot.
+#   3. ``airflow.executors.workloads.QueueableWorkload`` — extend the 
discriminated union
+#      so ``queue_workload`` can accept the new schema.
+_workload_type_priority_order = (
+    WorkloadType.EXECUTE_CALLBACK,
+    WorkloadType.EXECUTE_TASK,
+)
+
+WORKLOAD_TYPE_PRIORITY: dict[str, int] = {name: idx for idx, name in 
enumerate(_workload_type_priority_order)}

Review Comment:
   ```suggestion
   WORKLOAD_TYPE_PRIORITY: dict[WorkloadType, int] = {name: idx for idx, name 
in enumerate(_workload_type_priority_order)}
   ```
   



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -254,58 +306,45 @@ def log_task_event(self, *, event: str, extra: str, 
ti_key: WorkloadKey):
             return
         self._task_event_logs.append(Log(event=event, task_instance=ti_key, 
extra=extra))
 
-    def queue_workload(self, workload: ExecutorWorkload, session: Session) -> 
None:
-        if isinstance(workload, workloads.ExecuteTask):
-            ti = workload.ti
-            self.queued_tasks[ti.key] = workload
-        elif isinstance(workload, workloads.ExecuteCallback):
-            if not self.supports_callbacks:
-                raise NotImplementedError(
-                    f"{type(self).__name__} does not support ExecuteCallback 
workloads. "
-                    f"Set supports_callbacks = True and implement callback 
handling in _process_workloads(). "
-                    f"See LocalExecutor or CeleryExecutor for reference 
implementation."
-                )
-            self.queued_callbacks[workload.callback.id] = workload
-        else:
-            raise ValueError(
-                f"Un-handled workload type {type(workload).__name__!r} in 
{type(self).__name__}. "
-                f"Workload must be one of: ExecuteTask, ExecuteCallback."
+    def queue_workload(self, workload: QueueableWorkload, session: Session) -> 
None:
+        if workload.type not in self.supported_workload_types:
+            raise NotImplementedError(
+                f"{type(self).__name__} does not support {workload.type!r} 
workloads. "
+                f"Add {workload.type!r} to supported_workload_types and 
implement handling "
+                f"in _process_workloads()."

Review Comment:
   `supports_callbacks = True` class var no longer enables callback dispatch
   
   Any executor plugin that declared `supports_callbacks = True` as a class 
variable will now raise `NotImplementedError` when a callback is queued to it, 
even though `executor.supports_callbacks` still returns `True` (the class var 
shadows the new property via Python's MRO, so no deprecation warning fires 
either — the breakage is completely silent until a callback arrives).
   
   `queue_workload` checks `supported_workload_types`, not 
`supports_callbacks`, so the old class var has no effect on dispatch anymore. 
There's no migration path for plugin authors: they need to know to add 
`supported_workload_types = frozenset({WorkloadType.EXECUTE_TASK, 
WorkloadType.EXECUTE_CALLBACK})` to their class, but the deprecation warning 
they'd normally see to prompt that migration never fires.
   
   One fix: in `__init_subclass__` (or in `queue_workload` before the check), 
detect when a subclass has `supports_callbacks = True` as a plain class 
attribute and synthesize the appropriate `supported_workload_types` entry, 
while emitting the deprecation warning.



##########
providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -103,7 +104,10 @@ def _task_event_logs(self, value):
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
         """Return queued tasks from celery and kubernetes executor."""
-        return self.celery_executor.queued_tasks | 
self.kubernetes_executor.queued_tasks  # type: ignore[return-value]
+        queued_tasks = self.celery_executor.queued_tasks.copy()

Review Comment:
   `CeleryKubernetesExecutor.queued_tasks` calls the deprecated 
`BaseExecutor.queued_tasks` property on both child executors, emitting 
`RemovedInAirflow4Warning` on every access. Since this file is already being 
updated in this PR, please migrate these call sites to use the new API (guarded 
with `AIRFLOW_V_3_3_PLUS` for back-compat with Airflow <3.3).
   



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -216,8 +218,7 @@ def __init__(self, parallelism: int = PARALLELISM, 
team_name: str | None = None)
 
         self.parallelism: int = parallelism
         self.team_name: str | None = team_name
-        self.queued_tasks: dict[TaskInstanceKey, workloads.ExecuteTask] = {}
-        self.queued_callbacks: dict[str, workloads.ExecuteCallback] = {}
+        self.executor_queues: dict[str, dict[WorkloadKey, QueueableWorkload]] 
= defaultdict(dict)

Review Comment:
   Could this be a flat `dict[WorkloadKey, QueueableWorkload]` instead of a 
dict-of-dicts?
   
   `_get_workloads_to_schedule` immediately flattens all sub-dicts into a 
single list and then sorts by `(WORKLOAD_TYPE_PRIORITY, sort_key)` — so 
priority ordering (callbacks before tasks, higher-weight tasks first) is 
entirely in the sort step, not in the dict structure. A flat dict would produce 
identical scheduling behaviour.
   
   The only load-bearing use of the sub-dict grouping is the deprecated 
`queued_tasks`/`queued_callbacks` compat properties — which are on their way 
out. Every type-keyed deletion in providers (`del 
self.executor_queues[WorkloadType.EXECUTE_TASK][key]`) could be a plain `del 
flat_dict[key]` since `WorkloadKey` is unique across types.
   
   A flat dict would simplify `CeleryKubernetesExecutor.queued_tasks` (no 
sub-dict merging), make provider-side deletion uniform, and remove the 
`defaultdict(dict)` nesting.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to