This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 434c1cc9f2b K8s: use joinable manager queues (#63789)
434c1cc9f2b is described below

commit 434c1cc9f2b50c5cffd3cf6c2d8b4f2ab3526808
Author: Dev-iL <[email protected]>
AuthorDate: Tue Mar 17 21:45:02 2026 +0200

    K8s: use joinable manager queues (#63789)
    
    The executor already treats both queues as joinable queues. It calls:
      - task_done()
      - join()
      - flush logic that assumes task accounting is tracked
    
    A plain manager Queue() does not match that contract. On Python 3.14 this 
showed up in teardown/error paths as: `ValueError: task_done() called too many 
times`
---
 .../providers/cncf/kubernetes/executors/kubernetes_executor.py        | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 2ff6fd7fb12..520c29ef2bd 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -103,8 +103,8 @@ class KubernetesExecutor(BaseExecutor):
         self.parallelism = self.kube_config.parallelism
 
         self._manager = multiprocessing.Manager()
-        self.task_queue: Queue[KubernetesJob] = self._manager.Queue()
-        self.result_queue: Queue[KubernetesResults] = self._manager.Queue()
+        self.task_queue: Queue[KubernetesJob] = self._manager.JoinableQueue()
+        self.result_queue: Queue[KubernetesResults] = 
self._manager.JoinableQueue()
         self.kube_scheduler: AirflowKubernetesScheduler | None = None
         self.kube_client: client.CoreV1Api | None = None
         self.scheduler_job_id: str | None = None

Reply via email to