ashb commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r3241425228
##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -32,6 +33,28 @@
from airflow.executors.workloads.types import WorkloadState
+class WorkloadType(str, Enum):
+ """Central registry of executor workload types."""
+
+ EXECUTE_TASK = "ExecuteTask"
+ EXECUTE_CALLBACK = "ExecuteCallback"
+
+
+# Central executor priority registry: tuple is ordered from highest priority
to lowest.
+#
+# Adding a new workload type is a three-place change that must stay in sync:
+# 1. ``WorkloadType`` — declare the enum member.
+# 2. ``_workload_type_priority_order`` — insert it at the right priority
slot.
+# 3. ``airflow.executors.workloads.QueueableWorkload`` — extend the
discriminated union
+# so ``queue_workload`` can accept the new schema.
+_workload_type_priority_order = (
+ WorkloadType.EXECUTE_CALLBACK,
+ WorkloadType.EXECUTE_TASK,
+)
+
+WORKLOAD_TYPE_PRIORITY: dict[str, int] = {name: idx for idx, name in
enumerate(_workload_type_priority_order)}
Review Comment:
```suggestion
WORKLOAD_TYPE_PRIORITY: dict[WorkloadType, int] = {name: idx for idx, name
in enumerate(_workload_type_priority_order)}
```
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -254,58 +306,45 @@ def log_task_event(self, *, event: str, extra: str,
ti_key: WorkloadKey):
return
self._task_event_logs.append(Log(event=event, task_instance=ti_key,
extra=extra))
- def queue_workload(self, workload: ExecutorWorkload, session: Session) ->
None:
- if isinstance(workload, workloads.ExecuteTask):
- ti = workload.ti
- self.queued_tasks[ti.key] = workload
- elif isinstance(workload, workloads.ExecuteCallback):
- if not self.supports_callbacks:
- raise NotImplementedError(
- f"{type(self).__name__} does not support ExecuteCallback
workloads. "
- f"Set supports_callbacks = True and implement callback
handling in _process_workloads(). "
- f"See LocalExecutor or CeleryExecutor for reference
implementation."
- )
- self.queued_callbacks[workload.callback.id] = workload
- else:
- raise ValueError(
- f"Un-handled workload type {type(workload).__name__!r} in
{type(self).__name__}. "
- f"Workload must be one of: ExecuteTask, ExecuteCallback."
+ def queue_workload(self, workload: QueueableWorkload, session: Session) ->
None:
+ if workload.type not in self.supported_workload_types:
+ raise NotImplementedError(
+ f"{type(self).__name__} does not support {workload.type!r}
workloads. "
+ f"Add {workload.type!r} to supported_workload_types and
implement handling "
+ f"in _process_workloads()."
Review Comment:
`supports_callbacks = True` class var no longer enables callback dispatch
Any executor plugin that declared `supports_callbacks = True` as a class
variable will now raise `NotImplementedError` when a callback is queued to it,
even though `executor.supports_callbacks` still returns `True` (the class var
shadows the new property via Python's MRO, so no deprecation warning fires
either — the breakage is completely silent until a callback arrives).
`queue_workload` checks `supported_workload_types`, not
`supports_callbacks`, so the old class var has no effect on dispatch anymore.
There's no migration path for plugin authors: they need to know to add
`supported_workload_types = frozenset({WorkloadType.EXECUTE_TASK,
WorkloadType.EXECUTE_CALLBACK})` to their class, but the deprecation warning
they'd normally see to prompt that migration never fires.
One fix: in `__init_subclass__` (or in `queue_workload` before the check),
detect when a subclass has `supports_callbacks = True` as a plain class
attribute and synthesize the appropriate `supported_workload_types` entry,
while emitting the deprecation warning.
##########
providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -103,7 +104,10 @@ def _task_event_logs(self, value):
@property
def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
"""Return queued tasks from celery and kubernetes executor."""
- return self.celery_executor.queued_tasks |
self.kubernetes_executor.queued_tasks # type: ignore[return-value]
+ queued_tasks = self.celery_executor.queued_tasks.copy()
Review Comment:
`CeleryKubernetesExecutor.queued_tasks` calls the deprecated
`BaseExecutor.queued_tasks` property on both child executors, emitting
`RemovedInAirflow4Warning` on every access. Since this file is already being
updated in this PR, please migrate these call sites to use the new API (guarded
with `AIRFLOW_V_3_3_PLUS` for back-compat with Airflow <3.3).
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -216,8 +218,7 @@ def __init__(self, parallelism: int = PARALLELISM,
team_name: str | None = None)
self.parallelism: int = parallelism
self.team_name: str | None = team_name
- self.queued_tasks: dict[TaskInstanceKey, workloads.ExecuteTask] = {}
- self.queued_callbacks: dict[str, workloads.ExecuteCallback] = {}
+ self.executor_queues: dict[str, dict[WorkloadKey, QueueableWorkload]]
= defaultdict(dict)
Review Comment:
Could this be a flat `dict[WorkloadKey, QueueableWorkload]` instead of a
dict-of-dicts?
`_get_workloads_to_schedule` immediately flattens all sub-dicts into a
single list and then sorts by `(WORKLOAD_TYPE_PRIORITY, sort_key)` — so
priority ordering (callbacks before tasks, higher-weight tasks first) is
entirely in the sort step, not in the dict structure. A flat dict would produce
identical scheduling behaviour.
The only load-bearing use of the sub-dict grouping is the deprecated
`queued_tasks`/`queued_callbacks` compat properties — which are on their way
out. Every type-keyed deletion in providers (`del
self.executor_queues[WorkloadType.EXECUTE_TASK][key]`) could be a plain `del
flat_dict[key]` since `WorkloadKey` is unique across types.
A flat dict would simplify `CeleryKubernetesExecutor.queued_tasks` (no
sub-dict merging), make provider-side deletion uniform, and remove the
`defaultdict(dict)` nesting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]