This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c78e23250 [KYUUBI #6668] Fix kyuubi batch state abnormal
c78e23250 is described below

commit c78e23250a0b171d868467ba5e7781a9902f39da
Author: Wang, Fei <[email protected]>
AuthorDate: Sat Sep 14 12:27:34 2024 -0700

    [KYUUBI #6668] Fix kyuubi batch state abnormal
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6668
    
    ## Describe Your Solution ๐Ÿ”ง
    
    1. when failed to kill the batch, check the current application info
    2. if the application state is UNKNOWN(less than submit timeout) or 
NOT_FOUND, mark the batch state to CANCELED
    3. If the k8s pod added after the batch marked as CANCELED, delete the pod
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6670 from turboFei/session_close_operation.
    
    Closes #6668
    
    068eaf216 [Wang, Fei] def
    248c3e383 [Wang, Fei] check for onUpdate
    695bb805d [Wang, Fei] clean up on add
    9304f4605 [Wang, Fei] method
    e2a15f8bc [Wang, Fei] batch
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../engine/KubernetesApplicationOperation.scala    | 83 ++++++++++++++++------
 .../kyuubi/operation/BatchJobSubmission.scala      | 16 ++++-
 2 files changed, 76 insertions(+), 23 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 392de720a..c4d3c93ba 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.engine
 
 import java.util.Locale
-import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, 
TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, 
ThreadPoolExecutor, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
@@ -35,6 +35,9 @@ import 
org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
 import 
org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
 import 
org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, 
COMPLETED, NONE}
 import org.apache.kyuubi.engine.ApplicationState.{isTerminated, 
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.server.KyuubiServer
+import org.apache.kyuubi.session.KyuubiSessionManager
 import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
 
 class KubernetesApplicationOperation extends ApplicationOperation with Logging 
{
@@ -69,11 +72,16 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
 
   private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _
 
+  private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _
+
   private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): 
KubernetesClient = {
     checkKubernetesInfo(kubernetesInfo)
     kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => 
buildKubernetesClient(kInfo))
   }
 
+  private def metadataManager = KyuubiServer.kyuubiServer.backendService
+    .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
+
   // Visible for testing
   private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo): 
Unit = {
     val context = kubernetesInfo.context
@@ -131,27 +139,7 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
             case COMPLETED => !ApplicationState.isFailed(notification.getValue)
           }
           if (shouldDelete) {
-            val podName = removed.name
-            try {
-              val kubernetesClient = 
getOrCreateKubernetesClient(kubernetesInfo)
-              val deleted = if (podName == null) {
-                !kubernetesClient.pods()
-                  .withLabel(LABEL_KYUUBI_UNIQUE_KEY, appLabel)
-                  .delete().isEmpty
-              } else {
-                !kubernetesClient.pods().withName(podName).delete().isEmpty
-              }
-              if (deleted) {
-                info(s"[$kubernetesInfo] Operation of delete pod $podName 
with" +
-                  s" ${toLabel(appLabel)} is completed.")
-              } else {
-                warn(s"[$kubernetesInfo] Failed to delete pod $podName with 
${toLabel(appLabel)}.")
-              }
-            } catch {
-              case NonFatal(e) => error(
-                  s"[$kubernetesInfo] Failed to delete pod $podName with 
${toLabel(appLabel)}",
-                  e)
-            }
+            deletePod(kubernetesInfo, removed.name, appLabel)
           }
           info(s"Remove terminated application $removed with 
${toLabel(appLabel)}")
         }
@@ -175,6 +163,8 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
       cleanupDriverPodCheckInterval,
       cleanupDriverPodCheckInterval,
       TimeUnit.MILLISECONDS)
+    cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
+      "cleanup-canceled-app-pod-thread")
   }
 
   override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
@@ -296,11 +286,14 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
           pod,
           appStateSource,
           appStateContainer)
+        checkPodAppCanceled(kubernetesInfo, pod)
       }
     }
 
     override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
       if (isSparkEnginePod(newPod)) {
+        val kyuubiUniqueKey = 
newPod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
+        val firstUpdate = appInfoStore.get(kyuubiUniqueKey) == null
         updateApplicationState(kubernetesInfo, newPod)
         val appState = toApplicationState(newPod, appStateSource, 
appStateContainer)
         if (isTerminated(appState)) {
@@ -311,6 +304,9 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
           newPod,
           appStateSource,
           appStateContainer)
+        if (firstUpdate) {
+          checkPodAppCanceled(kubernetesInfo, newPod)
+        }
       }
     }
 
@@ -416,6 +412,49 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
         toApplicationState(pod, appStateSource, appStateContainer))
     }
   }
+
+  private def deletePod(
+      kubernetesInfo: KubernetesInfo,
+      podName: String,
+      podLabelUniqueKey: String): Unit = {
+    try {
+      val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo)
+      val deleted = if (podName == null) {
+        !kubernetesClient.pods()
+          .withLabel(LABEL_KYUUBI_UNIQUE_KEY, podLabelUniqueKey)
+          .delete().isEmpty
+      } else {
+        !kubernetesClient.pods().withName(podName).delete().isEmpty
+      }
+      if (deleted) {
+        info(s"[$kubernetesInfo] Operation of delete pod $podName with" +
+          s" ${toLabel(podLabelUniqueKey)} is completed.")
+      } else {
+        warn(s"[$kubernetesInfo] Failed to delete pod $podName with 
${toLabel(podLabelUniqueKey)}.")
+      }
+    } catch {
+      case NonFatal(e) => error(
+          s"[$kubernetesInfo] Failed to delete pod $podName with 
${toLabel(podLabelUniqueKey)}",
+          e)
+    }
+  }
+
+  private def checkPodAppCanceled(kubernetesInfo: KubernetesInfo, pod: Pod): 
Unit = {
+    if (kyuubiConf.isRESTEnabled) {
+      cleanupCanceledAppPodExecutor.submit(new Runnable {
+        override def run(): Unit = Utils.tryLogNonFatalError {
+          val kyuubiUniqueKey = 
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
+          val batch = 
metadataManager.flatMap(_.getBatchSessionMetadata(kyuubiUniqueKey))
+          if (batch.map(_.state).map(OperationState.withName)
+              .exists(_ == OperationState.CANCELED)) {
+            warn(s"[$kubernetesInfo] Batch[$kyuubiUniqueKey] is canceled, " +
+              s"try to delete the pod ${pod.getMetadata.getName}")
+            deletePod(kubernetesInfo, pod.getMetadata.getName, kyuubiUniqueKey)
+          }
+        }
+      })
+    }
+  }
 }
 
 object KubernetesApplicationOperation extends Logging {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 8b2cfef85..939806d17 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -377,7 +377,21 @@ class BatchJobSubmission(
             // we can not change state safely
             killMessage = (false, s"batch $batchId is already terminal so can 
not kill it.")
           } else if (!isTerminalState(state)) {
-            // failed to kill, the kill message is enough
+            _applicationInfo = currentApplicationInfo()
+            _applicationInfo.map(_.state) match {
+              case Some(ApplicationState.FINISHED) =>
+                setState(OperationState.FINISHED)
+                updateBatchMetadata()
+              case Some(ApplicationState.FAILED) =>
+                setState(OperationState.ERROR)
+                updateBatchMetadata()
+              case Some(ApplicationState.UNKNOWN) |
+                  Some(ApplicationState.NOT_FOUND) |
+                  Some(ApplicationState.KILLED) =>
+                setState(OperationState.CANCELED)
+                updateBatchMetadata()
+              case _ => // failed to kill, the kill message is enough
+            }
           }
         }
       }

Reply via email to