This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e5dc4b7729 k8s executor - ensure pods cleaned up (#61839)
1e5dc4b7729 is described below

commit 1e5dc4b7729eee7c142bc45cd5d4de1c83e72b27
Author: atrbgithub <[email protected]>
AuthorDate: Sat Mar 14 15:05:56 2026 +0000

    k8s executor - ensure pods cleaned up (#61839)
    
    This commit ensures that completed pods are eventually cleaned up.
---
 .../providers/cncf/kubernetes/executors/kubernetes_executor.py    | 8 ++++++++
 .../unit/cncf/kubernetes/executors/test_kubernetes_executor.py    | 2 ++
 2 files changed, 10 insertions(+)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index a55341a62a8..2ff6fd7fb12 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -108,6 +108,7 @@ class KubernetesExecutor(BaseExecutor):
         self.kube_scheduler: AirflowKubernetesScheduler | None = None
         self.kube_client: client.CoreV1Api | None = None
         self.scheduler_job_id: str | None = None
+        self._last_completed_pod_adoption = 0.0
         self.last_handled: dict[TaskInstanceKey, float] = {}
         self.kubernetes_queue: str | None = None
         self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
@@ -265,6 +266,13 @@ class KubernetesExecutor(BaseExecutor):
             assert self.kube_config
             assert self.result_queue
             assert self.task_queue
+            assert self.kube_client
+
+        adoption_interval = conf.getfloat("scheduler", 
"orphaned_tasks_check_interval", fallback=300.0)
+        now = time.monotonic()
+        if now - self._last_completed_pod_adoption >= adoption_interval:
+            self._last_completed_pod_adoption = now
+            self._adopt_completed_pods(self.kube_client)
 
         if self.running:
             self.log.debug("self.running: %s", self.running)
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index af4a503c66c..c31401c0c3c 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 import random
 import re
 import string
+import time
 from datetime import datetime
 from unittest import mock
 
@@ -256,6 +257,7 @@ class TestKubernetesExecutor:
     def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = 5
+        self.kubernetes_executor._last_completed_pod_adoption = 
time.monotonic()
 
     def test_resource_version_singleton(self):
         """Test that ResourceVersion returns the same instance."""

Reply via email to