This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.10 by this push:
new 779b4bde31 [KYUUBI #7101] Load the existing pods when initializing
kubernetes client to cleanup terminated app pods
779b4bde31 is described below
commit 779b4bde31cd524a806f0cd314a63d04cdfd0640
Author: Wang, Fei <[email protected]>
AuthorDate: Sun Jun 22 22:35:14 2025 -0700
[KYUUBI #7101] Load the existing pods when initializing kubernetes client
to cleanup terminated app pods
### Why are the changes needed?
To prevent the terminated app pods leak if the events missed during kyuubi
server restart.
### How was this patch tested?
Manual test.
```
:2025-06-17 17:50:37.275 INFO [main]
org.apache.kyuubi.engine.KubernetesApplicationOperation:
[KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod
kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423211008-grectg-stm-17da59fe-caf4-41e4-a12f-6c1ed9a293f9-driver
with label: kyuubi-unique-tag=17da59fe-caf4-41e4-a12f-6c1ed9a293f9 in app
state FINISHED, marking it as terminated
2025-06-17 17:50:37.278 INFO [main]
org.apache.kyuubi.engine.KubernetesApplicationOperation:
[KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod
kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423212011-gpdtsi-stm-6a23000f-10be-4a42-ae62-4fa2da8fac07-driver
with label: kyuubi-unique-tag=6a23000f-10be-4a42-ae62-4fa2da8fac07 in app
state FINISHED, marking it as terminated
```
The pods are cleaned up eventually.
<img width="664" alt="image"
src="https://github.com/user-attachments/assets/8cf58f61-065f-4fb0-9718-2e3c00e8d2e0"
/>
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7101 from turboFei/pod_cleanup.
Closes #7101
7f76cf57c [Wang, Fei] async
11c9db25d [Wang, Fei] cleanup
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 302b5fa1e60708cf8783893d166a22c7e6130309)
Signed-off-by: Wang, Fei <[email protected]>
---
.../engine/KubernetesApplicationOperation.scala | 46 +++++++++++++++++++++-
1 file changed, 45 insertions(+), 1 deletion(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index c53dda0e36..42e8e13bc9 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -74,9 +74,45 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _
+ private var kubernetesClientInitializeCleanupTerminatedPodExecutor:
ThreadPoolExecutor = _
+
private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo):
KubernetesClient = {
checkKubernetesInfo(kubernetesInfo)
- kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo =>
buildKubernetesClient(kInfo))
+ kubernetesClients.computeIfAbsent(
+ kubernetesInfo,
+ kInfo => {
+ val kubernetesClient = buildKubernetesClient(kInfo)
+ cleanTerminatedAppPodsOnKubernetesClientInitialize(kInfo,
kubernetesClient)
+ kubernetesClient
+ })
+ }
+
+ private def cleanTerminatedAppPodsOnKubernetesClientInitialize(
+ kubernetesInfo: KubernetesInfo,
+ kubernetesClient: KubernetesClient): Unit = {
+ if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) {
+ kubernetesClientInitializeCleanupTerminatedPodExecutor.submit(new
Runnable {
+ override def run(): Unit = {
+ val existingPods =
+
kubernetesClient.pods().withLabel(LABEL_KYUUBI_UNIQUE_KEY).list().getItems
+ info(s"[$kubernetesInfo] Found ${existingPods.size()} existing pods
with label " +
+ s"$LABEL_KYUUBI_UNIQUE_KEY")
+ val eventType = KubernetesResourceEventTypes.UPDATE
+ existingPods.asScala.filter(isSparkEnginePod).foreach { pod =>
+ val appState = toApplicationState(pod, appStateSource,
appStateContainer, eventType)
+ if (isTerminated(appState)) {
+ val kyuubiUniqueKey =
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
+ info(s"[$kubernetesInfo] Found existing pod
${pod.getMetadata.getName} with " +
+ s"${toLabel(kyuubiUniqueKey)} in app state $appState, marking
it as terminated")
+ if (appInfoStore.get(kyuubiUniqueKey) == null) {
+ updateApplicationState(kubernetesInfo, pod, eventType)
+ }
+ markApplicationTerminated(kubernetesInfo, pod, eventType)
+ }
+ }
+ }
+ })
+ }
}
private var metadataManager: Option[MetadataManager] = _
@@ -167,6 +203,9 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
TimeUnit.MILLISECONDS)
cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
"cleanup-canceled-app-pod-thread")
+ kubernetesClientInitializeCleanupTerminatedPodExecutor =
+ ThreadUtils.newDaemonCachedThreadPool(
+ "kubernetes-client-initialize-cleanup-terminated-pod-thread")
initializeKubernetesClient(kyuubiConf)
}
@@ -313,6 +352,11 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
ThreadUtils.shutdown(cleanupCanceledAppPodExecutor)
cleanupCanceledAppPodExecutor = null
}
+
+ if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) {
+
ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor)
+ kubernetesClientInitializeCleanupTerminatedPodExecutor = null
+ }
}
private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)