This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1e5dc4b7729 k8s executor - ensure pods cleaned up (#61839)
1e5dc4b7729 is described below
commit 1e5dc4b7729eee7c142bc45cd5d4de1c83e72b27
Author: atrbgithub <[email protected]>
AuthorDate: Sat Mar 14 15:05:56 2026 +0000
k8s executor - ensure pods cleaned up (#61839)
This commit ensures that completed pods are eventually cleaned up.
---
.../providers/cncf/kubernetes/executors/kubernetes_executor.py | 8 ++++++++
.../unit/cncf/kubernetes/executors/test_kubernetes_executor.py | 2 ++
2 files changed, 10 insertions(+)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index a55341a62a8..2ff6fd7fb12 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -108,6 +108,7 @@ class KubernetesExecutor(BaseExecutor):
self.kube_scheduler: AirflowKubernetesScheduler | None = None
self.kube_client: client.CoreV1Api | None = None
self.scheduler_job_id: str | None = None
+ self._last_completed_pod_adoption = 0.0
self.last_handled: dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: str | None = None
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
@@ -265,6 +266,13 @@ class KubernetesExecutor(BaseExecutor):
assert self.kube_config
assert self.result_queue
assert self.task_queue
+ assert self.kube_client
+
+ adoption_interval = conf.getfloat("scheduler",
"orphaned_tasks_check_interval", fallback=300.0)
+ now = time.monotonic()
+ if now - self._last_completed_pod_adoption >= adoption_interval:
+ self._last_completed_pod_adoption = now
+ self._adopt_completed_pods(self.kube_client)
if self.running:
self.log.debug("self.running: %s", self.running)
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index af4a503c66c..c31401c0c3c 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import random
import re
import string
+import time
from datetime import datetime
from unittest import mock
@@ -256,6 +257,7 @@ class TestKubernetesExecutor:
def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = 5
+ self.kubernetes_executor._last_completed_pod_adoption =
time.monotonic()
def test_resource_version_singleton(self):
"""Test that ResourceVersion returns the same instance."""