This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 434c1cc9f2b K8s: use joinable manager queues (#63789)
434c1cc9f2b is described below
commit 434c1cc9f2b50c5cffd3cf6c2d8b4f2ab3526808
Author: Dev-iL <[email protected]>
AuthorDate: Tue Mar 17 21:45:02 2026 +0200
K8s: use joinable manager queues (#63789)
The executor already treats both queues as joinable queues. It calls:
- task_done()
- join()
- flush logic that assumes task accounting is tracked
A plain manager Queue() does not match that contract. On Python 3.14 this
showed up in teardown/error paths as: `ValueError: task_done() called too many
times`
---
.../providers/cncf/kubernetes/executors/kubernetes_executor.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 2ff6fd7fb12..520c29ef2bd 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -103,8 +103,8 @@ class KubernetesExecutor(BaseExecutor):
self.parallelism = self.kube_config.parallelism
self._manager = multiprocessing.Manager()
- self.task_queue: Queue[KubernetesJob] = self._manager.Queue()
- self.result_queue: Queue[KubernetesResults] = self._manager.Queue()
+ self.task_queue: Queue[KubernetesJob] = self._manager.JoinableQueue()
+ self.result_queue: Queue[KubernetesResults] =
self._manager.JoinableQueue()
self.kube_scheduler: AirflowKubernetesScheduler | None = None
self.kube_client: client.CoreV1Api | None = None
self.scheduler_job_id: str | None = None