This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1685c9f1e71 Fix misaligned `queued_tasks` types in hybrid executors 
(#63744)
1685c9f1e71 is described below

commit 1685c9f1e713cc62fc137995399b192fbabf9c51
Author: ANKIT KUMAR <[email protected]>
AuthorDate: Wed Mar 18 02:36:35 2026 +0530

    Fix misaligned `queued_tasks` types in hybrid executors (#63744)
    
    The `queued_tasks` property in `LocalKubernetesExecutor` and 
`CeleryKubernetesExecutor` incorrectly merged base executor tasks. 
`dict.update()` modifies the dictionary in place which could lead to race 
conditions during rapid dict updates. This commit replaces `dict.update()` with 
the python dictionary union operator `|` for a safer and immutable map 
combination.
    
    Signed-off-by: Ankit Kumar <[email protected]>
    Co-authored-by: Jens Scheffler <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../providers/celery/executors/celery_kubernetes_executor.py        | 5 +----
 .../cncf/kubernetes/executors/local_kubernetes_executor.py          | 6 +-----
 2 files changed, 2 insertions(+), 9 deletions(-)

diff --git 
a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
 
b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
index 34cfb27e86a..9d55cda5376 100644
--- 
a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
+++ 
b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
@@ -103,10 +103,7 @@ class CeleryKubernetesExecutor(BaseExecutor):
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
         """Return queued tasks from celery and kubernetes executor."""
-        queued_tasks = self.celery_executor.queued_tasks.copy()
-        queued_tasks.update(self.kubernetes_executor.queued_tasks)  # type: 
ignore[arg-type]
-
-        return queued_tasks  # type: ignore[return-value]
+        return self.celery_executor.queued_tasks | 
self.kubernetes_executor.queued_tasks
 
     @queued_tasks.setter
     def queued_tasks(self, value) -> None:
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
index f2eb64e46c5..7b8d59bcfac 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
@@ -98,11 +98,7 @@ class LocalKubernetesExecutor(BaseExecutor):
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
         """Return queued tasks from local and kubernetes executor."""
-        queued_tasks = self.local_executor.queued_tasks.copy()
-        # TODO: fix this, there is misalignment between the types of 
queued_tasks so it is likely wrong
-        queued_tasks.update(self.kubernetes_executor.queued_tasks)  # type: 
ignore[arg-type]
-
-        return queued_tasks
+        return self.local_executor.queued_tasks | 
self.kubernetes_executor.queued_tasks
 
     @queued_tasks.setter
     def queued_tasks(self, value) -> None:

Reply via email to