This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c78e23250 [KYUUBI #6668] Fix kyuubi batch state abnormal
c78e23250 is described below
commit c78e23250a0b171d868467ba5e7781a9902f39da
Author: Wang, Fei <[email protected]>
AuthorDate: Sat Sep 14 12:27:34 2024 -0700
[KYUUBI #6668] Fix kyuubi batch state abnormal
# :mag: Description
## Issue References ๐
This pull request fixes #6668
## Describe Your Solution ๐ง
1. when failed to kill the batch, check the current application info
2. if the application state is UNKNOWN(less than submit timeout) or
NOT_FOUND, mark the batch state to CANCELED
3. If the k8s pod added after the batch marked as CANCELED, delete the pod
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6670 from turboFei/session_close_operation.
Closes #6668
068eaf216 [Wang, Fei] def
248c3e383 [Wang, Fei] check for onUpdate
695bb805d [Wang, Fei] clean up on add
9304f4605 [Wang, Fei] method
e2a15f8bc [Wang, Fei] batch
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../engine/KubernetesApplicationOperation.scala | 83 ++++++++++++++++------
.../kyuubi/operation/BatchJobSubmission.scala | 16 ++++-
2 files changed, 76 insertions(+), 23 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 392de720a..c4d3c93ba 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.engine
import java.util.Locale
-import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService,
TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService,
ThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -35,6 +35,9 @@ import
org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
import
org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
import
org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL,
COMPLETED, NONE}
import org.apache.kyuubi.engine.ApplicationState.{isTerminated,
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.server.KyuubiServer
+import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
class KubernetesApplicationOperation extends ApplicationOperation with Logging
{
@@ -69,11 +72,16 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _
+ private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _
+
private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo):
KubernetesClient = {
checkKubernetesInfo(kubernetesInfo)
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo =>
buildKubernetesClient(kInfo))
}
+ private def metadataManager = KyuubiServer.kyuubiServer.backendService
+ .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
+
// Visible for testing
private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo):
Unit = {
val context = kubernetesInfo.context
@@ -131,27 +139,7 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
case COMPLETED => !ApplicationState.isFailed(notification.getValue)
}
if (shouldDelete) {
- val podName = removed.name
- try {
- val kubernetesClient =
getOrCreateKubernetesClient(kubernetesInfo)
- val deleted = if (podName == null) {
- !kubernetesClient.pods()
- .withLabel(LABEL_KYUUBI_UNIQUE_KEY, appLabel)
- .delete().isEmpty
- } else {
- !kubernetesClient.pods().withName(podName).delete().isEmpty
- }
- if (deleted) {
- info(s"[$kubernetesInfo] Operation of delete pod $podName
with" +
- s" ${toLabel(appLabel)} is completed.")
- } else {
- warn(s"[$kubernetesInfo] Failed to delete pod $podName with
${toLabel(appLabel)}.")
- }
- } catch {
- case NonFatal(e) => error(
- s"[$kubernetesInfo] Failed to delete pod $podName with
${toLabel(appLabel)}",
- e)
- }
+ deletePod(kubernetesInfo, removed.name, appLabel)
}
info(s"Remove terminated application $removed with
${toLabel(appLabel)}")
}
@@ -175,6 +163,8 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
cleanupDriverPodCheckInterval,
cleanupDriverPodCheckInterval,
TimeUnit.MILLISECONDS)
+ cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
+ "cleanup-canceled-app-pod-thread")
}
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
@@ -296,11 +286,14 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
pod,
appStateSource,
appStateContainer)
+ checkPodAppCanceled(kubernetesInfo, pod)
}
}
override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
if (isSparkEnginePod(newPod)) {
+ val kyuubiUniqueKey =
newPod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
+ val firstUpdate = appInfoStore.get(kyuubiUniqueKey) == null
updateApplicationState(kubernetesInfo, newPod)
val appState = toApplicationState(newPod, appStateSource,
appStateContainer)
if (isTerminated(appState)) {
@@ -311,6 +304,9 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
newPod,
appStateSource,
appStateContainer)
+ if (firstUpdate) {
+ checkPodAppCanceled(kubernetesInfo, newPod)
+ }
}
}
@@ -416,6 +412,49 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
toApplicationState(pod, appStateSource, appStateContainer))
}
}
+
+ private def deletePod(
+ kubernetesInfo: KubernetesInfo,
+ podName: String,
+ podLabelUniqueKey: String): Unit = {
+ try {
+ val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo)
+ val deleted = if (podName == null) {
+ !kubernetesClient.pods()
+ .withLabel(LABEL_KYUUBI_UNIQUE_KEY, podLabelUniqueKey)
+ .delete().isEmpty
+ } else {
+ !kubernetesClient.pods().withName(podName).delete().isEmpty
+ }
+ if (deleted) {
+ info(s"[$kubernetesInfo] Operation of delete pod $podName with" +
+ s" ${toLabel(podLabelUniqueKey)} is completed.")
+ } else {
+ warn(s"[$kubernetesInfo] Failed to delete pod $podName with
${toLabel(podLabelUniqueKey)}.")
+ }
+ } catch {
+ case NonFatal(e) => error(
+ s"[$kubernetesInfo] Failed to delete pod $podName with
${toLabel(podLabelUniqueKey)}",
+ e)
+ }
+ }
+
+ private def checkPodAppCanceled(kubernetesInfo: KubernetesInfo, pod: Pod):
Unit = {
+ if (kyuubiConf.isRESTEnabled) {
+ cleanupCanceledAppPodExecutor.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ val kyuubiUniqueKey =
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
+ val batch =
metadataManager.flatMap(_.getBatchSessionMetadata(kyuubiUniqueKey))
+ if (batch.map(_.state).map(OperationState.withName)
+ .exists(_ == OperationState.CANCELED)) {
+ warn(s"[$kubernetesInfo] Batch[$kyuubiUniqueKey] is canceled, " +
+ s"try to delete the pod ${pod.getMetadata.getName}")
+ deletePod(kubernetesInfo, pod.getMetadata.getName, kyuubiUniqueKey)
+ }
+ }
+ })
+ }
+ }
}
object KubernetesApplicationOperation extends Logging {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 8b2cfef85..939806d17 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -377,7 +377,21 @@ class BatchJobSubmission(
// we can not change state safely
killMessage = (false, s"batch $batchId is already terminal so can
not kill it.")
} else if (!isTerminalState(state)) {
- // failed to kill, the kill message is enough
+ _applicationInfo = currentApplicationInfo()
+ _applicationInfo.map(_.state) match {
+ case Some(ApplicationState.FINISHED) =>
+ setState(OperationState.FINISHED)
+ updateBatchMetadata()
+ case Some(ApplicationState.FAILED) =>
+ setState(OperationState.ERROR)
+ updateBatchMetadata()
+ case Some(ApplicationState.UNKNOWN) |
+ Some(ApplicationState.NOT_FOUND) |
+ Some(ApplicationState.KILLED) =>
+ setState(OperationState.CANCELED)
+ updateBatchMetadata()
+ case _ => // failed to kill, the kill message is enough
+ }
}
}
}