This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ecfca79328 [KYUUBI #7033] Treat YARN/Kubernetes application NOT_FOUND 
as failed to prevent data quality issue
ecfca79328 is described below

commit ecfca79328c0c7add3154457e2f723b6c49e8782
Author: Wang, Fei <[email protected]>
AuthorDate: Sun Apr 27 21:09:08 2025 +0800

    [KYUUBI #7033] Treat YARN/Kubernetes application NOT_FOUND as failed to 
prevent data quality issue
    
    ### Why are the changes needed?
    
    Currently, NOT_FOUND application stated is treated as a terminated but not 
failed state.
    
    It might cause some data quality issue if downstream application depends on 
the batch state for data processing.
    
    So, I think we should treat NOT_FOUND as a failed state instead.
    
    Currently, we support 3 types of application manager.
    1. 
[JpsApplicationOperation](https://github.com/apache/kyuubi/blob/master/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala)
    2. 
[YarnApplicationOperation](https://github.com/apache/kyuubi/blob/master/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala)
    3. 
[KubernetesApplicationOperation](https://github.com/apache/kyuubi/blob/master/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala)
    
    YarnApplicationOperation and KubernetesApplicationOperation are widely used 
in production use case.
    
    And in multiple kyuubi instance mode, the NOT_FOUND case should rarely 
happen.
    1.  
https://github.com/apache/kyuubi/blob/7e199d6fdbdf52222bb3eadd056b9e5a2295f36e/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala#L369-L385
    
    3. https://github.com/apache/kyuubi/pull/7029
    
    So, I think we should treat NOT_FOUND as a failed state in production use 
case.
    It is better to fail some corner cases than to mistakenly set unsuccessful 
batches to the finished state.
    
    ### How was this patch tested?
    
    GA.
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #7033 from turboFei/revist_not_found.
    
    Closes #7033
    
    ada4f8822 [Cheng Pan] Update 
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
    985e23c24 [Wang, Fei] Refine
    f03d61242 [Wang, Fei] comments
    b9d6ac203 [Wang, Fei] incase the metadata updated by peer instance
    3bd61ca85 [Wang, Fei] add
    339df4730 [Wang, Fei] treat NOT_FOUND as failed
    
    Lead-authored-by: Wang, Fei <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/ApplicationOperation.scala       | 15 ++++++++-
 .../kyuubi/engine/JpsApplicationOperation.scala    |  2 ++
 .../engine/KubernetesApplicationOperation.scala    |  9 +++--
 .../kyuubi/engine/KyuubiApplicationManager.scala   |  5 +++
 .../kyuubi/engine/YarnApplicationOperation.scala   |  2 ++
 .../kyuubi/operation/BatchJobSubmission.scala      | 38 +++++++++++++++++-----
 .../kyuubi/server/api/v1/BatchesResource.scala     |  3 +-
 7 files changed, 60 insertions(+), 14 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index d54dff1a60..728458646d 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -85,15 +85,28 @@ trait ApplicationOperation {
       tag: String,
       proxyUser: Option[String] = None,
       submitTime: Option[Long] = None): ApplicationInfo
+
+  /**
+   * Whether the application state can be persisted and retrieved after 
finished.
+   * @return true if the application state can be persisted
+   */
+  def supportPersistedAppState: Boolean
 }
 
 object ApplicationState extends Enumeration {
   type ApplicationState = Value
   val PENDING, RUNNING, FINISHED, KILLED, FAILED, ZOMBIE, NOT_FOUND, UNKNOWN = 
Value
 
-  def isFailed(state: ApplicationState): Boolean = state match {
+  def isFailed(
+      state: ApplicationState,
+      appOperation: Option[ApplicationOperation]): Boolean = {
+    isFailed(state, appOperation.exists(_.supportPersistedAppState))
+  }
+
+  def isFailed(state: ApplicationState, supportPersistedAppState: Boolean): 
Boolean = state match {
     case FAILED => true
     case KILLED => true
+    case NOT_FOUND if supportPersistedAppState => true
     case _ => false
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index ffc233c016..7e37947b86 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -105,5 +105,7 @@ class JpsApplicationOperation extends ApplicationOperation {
     // TODO check if the process is zombie
   }
 
+  override def supportPersistedAppState: Boolean = false
+
   override def stop(): Unit = {}
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 116afd50da..a808030f43 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -137,7 +137,7 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
           val shouldDelete = cleanupDriverPodStrategy match {
             case NONE => false
             case ALL => true
-            case COMPLETED => !ApplicationState.isFailed(notification.getValue)
+            case COMPLETED => 
!ApplicationState.isFailed(notification.getValue, Some(this))
           }
           if (shouldDelete) {
             deletePod(kubernetesInfo, removed.podName.orNull, appLabel)
@@ -289,6 +289,8 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     }
   }
 
+  override def supportPersistedAppState: Boolean = true
+
   override def stop(): Unit = {
     enginePodInformers.asScala.foreach { case (_, informer) =>
       Utils.tryLogNonFatalError(informer.stop())
@@ -594,8 +596,8 @@ object KubernetesApplicationOperation extends Logging {
       case Some(containerAppState) => containerAppState
       case None => podAppState
     }
-    val applicationError =
-      if (ApplicationState.isFailed(applicationState)) {
+    val applicationError = {
+      if (ApplicationState.isFailed(applicationState, supportPersistedAppState 
= true)) {
         val errorMap = containerStatusToBuildAppState.map { cs =>
           Map(
             "Pod" -> podName,
@@ -609,6 +611,7 @@ object KubernetesApplicationOperation extends Logging {
       } else {
         None
       }
+    }
     applicationState -> applicationError
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 657e529f55..763bea1f93 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -103,6 +103,11 @@ class KyuubiApplicationManager(metadataManager: 
Option[MetadataManager])
     operations.find(_.isInstanceOf[KubernetesApplicationOperation])
       .map(_.asInstanceOf[KubernetesApplicationOperation])
   }
+
+  private[kyuubi] def getApplicationOperation(appMgrInfo: 
ApplicationManagerInfo)
+      : Option[ApplicationOperation] = {
+    operations.find(_.isSupported(appMgrInfo))
+  }
 }
 
 object KyuubiApplicationManager {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index 39ee294873..6adb62f975 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -165,6 +165,8 @@ class YarnApplicationOperation extends ApplicationOperation 
with Logging {
     }
   }
 
+  override def supportPersistedAppState: Boolean = true
+
   override def stop(): Unit = adminYarnClient.foreach { yarnClient =>
     Utils.tryLogNonFatalError(yarnClient.stop())
   }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 9dfc3b04e1..5a59af5f0c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -22,10 +22,11 @@ import java.util.concurrent.TimeUnit
 
 import com.codahale.metrics.MetricRegistry
 import com.google.common.annotations.VisibleForTesting
+import org.apache.commons.lang3.StringUtils
 
 import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, 
KillResponse, ProcBuilder}
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation, 
ApplicationState, KillResponse, ProcBuilder}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
 import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
@@ -99,6 +100,8 @@ class BatchJobSubmission(
       getOperationLog)
   }
 
+  private lazy val appOperation = 
applicationManager.getApplicationOperation(builder.appMgrInfo())
+
   def startupProcessAlive: Boolean =
     builder.processLaunched && Option(builder.process).exists(_.isAlive)
 
@@ -212,6 +215,20 @@ class BatchJobSubmission(
         metadata match {
           case Some(metadata) if metadata.peerInstanceClosed =>
             setState(OperationState.CANCELED)
+          case Some(metadata)
+              // in case it has been updated by peer kyuubi instance, see 
KYUUBI #6278
+              if StringUtils.isNotBlank(metadata.engineState) &&
+                
ApplicationState.isTerminated(ApplicationState.withName(metadata.engineState)) 
=>
+            _applicationInfo = Some(new ApplicationInfo(
+              id = metadata.engineId,
+              name = metadata.engineName,
+              state = ApplicationState.withName(metadata.engineState),
+              url = Option(metadata.engineUrl),
+              error = metadata.engineError))
+            if (applicationFailed(_applicationInfo, appOperation)) {
+              throw new KyuubiException(
+                s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+            }
           case Some(metadata) if metadata.state == 
OperationState.PENDING.toString =>
             // case 1: new batch job created using batch impl v2
             // case 2: batch job from recovery, do submission only when 
previous state is
@@ -275,7 +292,7 @@ class BatchJobSubmission(
     try {
       info(s"Submitting $batchType batch[$batchId] job:\n$builder")
       val process = builder.start
-      while (process.isAlive && !applicationFailed(_applicationInfo)) {
+      while (process.isAlive && !applicationFailed(_applicationInfo, 
appOperation)) {
         doUpdateApplicationInfoMetadataIfNeeded()
         process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
       }
@@ -284,7 +301,7 @@ class BatchJobSubmission(
         doUpdateApplicationInfoMetadataIfNeeded()
       }
 
-      if (applicationFailed(_applicationInfo)) {
+      if (applicationFailed(_applicationInfo, appOperation)) {
         Utils.terminateProcess(process, applicationStartupDestroyTimeout)
         throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
       }
@@ -329,10 +346,9 @@ class BatchJobSubmission(
       setStateIfNotCanceled(OperationState.RUNNING)
     }
     if (_applicationInfo.isEmpty) {
-      info(s"The $batchType batch[$batchId] job: $appId not found, assume that 
it has finished.")
-      return
+      _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
     }
-    if (applicationFailed(_applicationInfo)) {
+    if (applicationFailed(_applicationInfo, appOperation)) {
       throw new KyuubiException(s"$batchType batch[$batchId] job failed: 
${_applicationInfo}")
     }
     updateBatchMetadata()
@@ -341,7 +357,7 @@ class BatchJobSubmission(
       Thread.sleep(applicationCheckInterval)
       updateApplicationInfoMetadataIfNeeded()
     }
-    if (applicationFailed(_applicationInfo)) {
+    if (applicationFailed(_applicationInfo, appOperation)) {
       throw new KyuubiException(s"$batchType batch[$batchId] job failed: 
${_applicationInfo}")
     }
   }
@@ -445,8 +461,12 @@ class BatchJobSubmission(
 }
 
 object BatchJobSubmission {
-  def applicationFailed(applicationStatus: Option[ApplicationInfo]): Boolean = 
{
-    applicationStatus.map(_.state).exists(ApplicationState.isFailed)
+  def applicationFailed(
+      applicationStatus: Option[ApplicationInfo],
+      appOperation: Option[ApplicationOperation]): Boolean = {
+    applicationStatus.map(_.state).exists { state =>
+      ApplicationState.isFailed(state, appOperation)
+    }
   }
 
   def applicationTerminated(applicationStatus: Option[ApplicationInfo]): 
Boolean = {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 71ef564da3..6e7fa9a337 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -129,8 +129,9 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       metadata: Metadata,
       batchAppStatus: Option[ApplicationInfo]): Batch = {
     batchAppStatus.map { appStatus =>
+      val appOp = 
sessionManager.applicationManager.getApplicationOperation(metadata.appMgrInfo)
       val currentBatchState =
-        if (BatchJobSubmission.applicationFailed(batchAppStatus)) {
+        if (BatchJobSubmission.applicationFailed(batchAppStatus, appOp)) {
           OperationState.ERROR.toString
         } else if (BatchJobSubmission.applicationTerminated(batchAppStatus)) {
           OperationState.FINISHED.toString

Reply via email to