kaxil commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r2963375864
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -92,18 +92,13 @@ class CeleryExecutor(BaseExecutor):
"""
supports_ad_hoc_ti_run: bool = True
- supports_callbacks: bool = True
+ supported_workload_types: frozenset[str] = frozenset({"ExecuteTask",
"ExecuteCallback"})
Review Comment:
This uses raw strings `"ExecuteTask"` and `"ExecuteCallback"` instead of
`WorkloadType.EXECUTE_TASK` and `WorkloadType.EXECUTE_CALLBACK` like
`LocalExecutor` does. Works because `WorkloadType` is a `str` enum, but it's
fragile if the values ever change.
##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -76,6 +94,23 @@ class BaseWorkloadSchema(BaseModel):
def generate_token(sub_id: str, generator: JWTGenerator | None = None) ->
str:
return generator.generate({"sub": sub_id}) if generator else ""
+ @property
+ def queue_key(self) -> WorkloadKey:
+ """Return a unique key used to store/lookup this workload in the
executor queue."""
+ raise NotImplementedError
Review Comment:
Since all concrete workloads go through `BaseDagBundleWorkload(ABC)`, could
this be declared `@abstractmethod` on that class instead? Would catch missing
implementations at class definition time rather than runtime.
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -63,11 +62,6 @@ class AwsLambdaExecutor(BaseExecutor):
supports_multi_team: bool = True
- if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
Review Comment:
Same pattern here -- `queue_workload` (line ~207) and `_process_workloads`
(line ~221) still reference `self.queued_tasks`.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -81,11 +80,6 @@ class KubernetesExecutor(BaseExecutor):
supports_ad_hoc_ti_run: bool = True
supports_multi_team: bool = True
- if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
Review Comment:
Same issue -- `queue_workload` (line ~235), `_process_workloads` (line
~251), and `revoke_task`/`cleanup_stuck_queued_tasks` all still use
`self.queued_tasks`.
##########
airflow-core/tests/unit/executors/test_base_executor.py:
##########
@@ -288,7 +289,6 @@ def test_debug_dump(caplog):
executor = BaseExecutor()
with caplog.at_level(logging.INFO):
executor.debug_dump()
- assert "executor.queued" in caplog.text
assert "executor.running" in caplog.text
Review Comment:
This assertion was removed, and with a fresh executor the `debug_dump` loop
body never executes (empty `defaultdict`). Might be worth adding a test that
populates the queues first and verifies the new `executor.queued[...]` format
shows up.
##########
airflow-core/src/airflow/executors/workloads/types.py:
##########
@@ -20,6 +20,8 @@
from typing import TYPE_CHECKING, TypeAlias
+from airflow.executors.workloads.callback import ExecuteCallback
Review Comment:
`QueueableWorkload` is only consumed under `TYPE_CHECKING` (in
`base_executor.py`). These imports and the alias could live under
`TYPE_CHECKING` too, like `WorkloadKey` above.
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py:
##########
@@ -92,11 +91,6 @@ class AwsBatchExecutor(BaseExecutor):
# AWS only allows a maximum number of JOBs in the describe_jobs function
DESCRIBE_JOBS_BATCH_SIZE = 99
- if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
Review Comment:
The type annotation is removed, but `queue_workload` (line ~127) and
`_process_workloads` (line ~141) still access `self.queued_tasks`, which now
fires a `RemovedInAirflow4Warning` on every call. Should these use
`self.executor_queues[WorkloadType.EXECUTE_TASK]` directly, or can the
overrides be removed entirely if the base class handles it?
##########
providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##########
@@ -95,11 +95,6 @@ class AwsEcsExecutor(BaseExecutor):
# AWS limits the maximum number of ARNs in the describe_tasks function.
DESCRIBE_TASKS_BATCH_SIZE = 99
- if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
Review Comment:
Same issue as the Batch executor -- `queue_workload` (line ~135) and
`_process_workloads` (line ~150) still go through `self.queued_tasks`,
triggering deprecation warnings in production.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]