This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new a1a08e7f93 [KYUUBI #7132] Respect 
`kyuubi.session.engine.startup.waitCompletion` for wait engine completion
a1a08e7f93 is described below

commit a1a08e7f93db380c92fd9ea08504850d65ff0366
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Jul 14 01:49:06 2025 +0800

    [KYUUBI #7132] Respect `kyuubi.session.engine.startup.waitCompletion` for 
wait engine completion
    
    ### Why are the changes needed?
    
    We should not fail the batch submission if the submit process is alive and 
wait engine completion is false.
    
    Especially for spark on kubernetes, the app might failed with NOT_FOUND 
state if the spark submit process running time more than the submit timeout.
    
    In this PR, if the `kyuubi.session.engine.startup.waitCompletion` is false, 
when getting the application info, it use current timestamp as submit time to 
prevent the app failed with NOT_FOUND state due to submit timeout.
    
    ### How was this patch tested?
    
    Pass current GA and manually testing.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #7132 from turboFei/batch_submit.
    
    Closes #7132
    
    efb06db1c [Wang, Fei] refine
    7e453c162 [Wang, Fei] refine
    7bca1a7aa [Wang, Fei] Prevent potential timeout durartion polling the 
application info
    15529ab85 [Wang, Fei] prevent metadata manager fail
    38335f2f9 [Wang, Fei] refine
    9b8a9fde4 [Wang, Fei] comments
    11f607daa [Wang, Fei] docs
    f2f6ba148 [Wang, Fei] revert
    2da0705ad [Wang, Fei] wait for if not wait complete
    d84963420 [Wang, Fei] revert check in loop
    b4cf50a49 [Wang, Fei] comments
    8c262b7ec [Wang, Fei] refine
    ecf379b86 [Wang, Fei] Revert conf change
    60dc1676e [Wang, Fei] enlarge
    4d0aa542a [Wang, Fei] Save
    4aea96552 [Wang, Fei] refine
    2ad75fcbf [Wang, Fei] nit
    a71b11df6 [Wang, Fei] Do not fail batch if the process is alive
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/deployment/migration-guide.md                 |  2 ++
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  3 +-
 .../engine/KubernetesApplicationOperation.scala    | 13 ++++++--
 .../org/apache/kyuubi/engine/ProcBuilder.scala     |  4 +++
 .../kyuubi/engine/YarnApplicationOperation.scala   |  3 +-
 .../engine/spark/SparkBatchProcessBuilder.scala    | 15 +++------
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  | 36 ++++++++++++++++----
 .../kyuubi/operation/BatchJobSubmission.scala      | 38 ++++++++++++++++++----
 8 files changed, 84 insertions(+), 30 deletions(-)

diff --git a/docs/deployment/migration-guide.md 
b/docs/deployment/migration-guide.md
index 3570da71ff..359ab26bce 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -21,6 +21,8 @@
 
 * Since Kyuubi 1.11, the configuration 
`spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is 
removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the 
latter works without requirement of installing Kyuubi Spark extension.
 
+* Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will 
respect the `kyuubi.session.engine.startup.waitCompletion` config to determine 
whether to wait for the engine completion or not. If the engine is running in 
client mode, Kyuubi will always wait for the engine completion. And for Spark 
engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and 
`spark.kubernetes.submission.waitAppCompletion` configs to the engine conf 
based on the value of `kyuubi.sess [...]
+
 ## Upgrading from Kyuubi 1.9 to 1.10
 
 * Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the 
future, please use `kyuubi-beeline` instead.
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index a1383d7429..0aa8366235 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -324,8 +324,7 @@ private[kyuubi] class EngineRef(
       engineRef.get
     } finally {
       if (acquiredPermit) startupProcessSemaphore.foreach(_.release())
-      val waitCompletion = 
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
-      val destroyProcess = !waitCompletion && builder.isClusterMode()
+      val destroyProcess = !builder.waitEngineCompletion
       if (destroyProcess) {
         info("Destroy the builder process because waitCompletion is false" +
           " and the engine is running in cluster mode.")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 7fe24e0c6b..6066c951df 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -292,6 +292,7 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
       throw new IllegalStateException("Methods initialize and isSupported must 
be called ahead")
     }
     debug(s"Getting application[${toLabel(tag)}]'s info from Kubernetes 
cluster")
+    val startTime = System.currentTimeMillis()
     try {
       // need to initialize the kubernetes client if not exists
       getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
@@ -299,13 +300,19 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
         case (_, info) => info
         case _ =>
           // try to get the application info from kubernetes engine info store
-          metadataManager.flatMap(
-            
_.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND)
+          try {
+            metadataManager.flatMap(
+              
_.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND)
+          } catch {
+            case e: Exception =>
+              error(s"Failed to get application info from metadata manager for 
${toLabel(tag)}", e)
+              ApplicationInfo.NOT_FOUND
+          }
       }
       (appInfo.state, submitTime) match {
         // Kyuubi should wait second if pod is not be created
         case (NOT_FOUND, Some(_submitTime)) =>
-          val elapsedTime = System.currentTimeMillis - _submitTime
+          val elapsedTime = startTime - _submitTime
           if (elapsedTime > submitTimeout) {
             error(s"Can't find target driver pod by ${toLabel(tag)}, " +
               s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 0e21bb9f73..a2cd268a6c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -355,6 +355,10 @@ trait ProcBuilder {
   def clusterManager(): Option[String] = None
 
   def appMgrInfo(): ApplicationManagerInfo = ApplicationManagerInfo(None)
+
+  def waitEngineCompletion: Boolean = {
+    !isClusterMode() || 
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+  }
 }
 
 object ProcBuilder extends Logging {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index 6adb62f975..881b94e775 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -131,12 +131,13 @@ class YarnApplicationOperation extends 
ApplicationOperation with Logging {
       proxyUser: Option[String] = None,
       submitTime: Option[Long] = None): ApplicationInfo = 
withYarnClient(proxyUser) { yarnClient =>
     debug(s"Getting application info from YARN cluster by tag: $tag")
+    val startTime = System.currentTimeMillis()
     val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
     if (reports.isEmpty) {
       debug(s"Can't find target application from YARN cluster by tag: $tag")
       submitTime match {
         case Some(_submitTime) =>
-          val elapsedTime = System.currentTimeMillis - _submitTime
+          val elapsedTime = startTime - _submitTime
           if (elapsedTime < submitTimeout) {
             info(s"Wait for YARN application[tag: $tag] to be submitted, " +
               s"elapsed time: ${elapsedTime}ms, return 
${ApplicationInfo.UNKNOWN} status")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 11b4e03dde..3b0d8f44f5 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -57,7 +57,8 @@ class SparkBatchProcessBuilder(
       sparkAppNameConf() ++
       engineLogPathConf() ++
       appendPodNameConf(batchConf) ++
-      prepareK8sFileUploadPath()).map { case (k, v) =>
+      prepareK8sFileUploadPath() ++
+      engineWaitCompletionConf()).map { case (k, v) =>
       buffer ++= confKeyValue(convertConfigKey(k), v)
     }
 
@@ -79,15 +80,7 @@ class SparkBatchProcessBuilder(
 
   override protected def module: String = "kyuubi-spark-batch-submit"
 
-  override def clusterManager(): Option[String] = {
-    batchConf.get(MASTER_KEY).orElse(super.clusterManager())
-  }
-
-  override def kubernetesContext(): Option[String] = {
-    batchConf.get(KUBERNETES_CONTEXT_KEY).orElse(super.kubernetesContext())
-  }
-
-  override def kubernetesNamespace(): Option[String] = {
-    batchConf.get(KUBERNETES_NAMESPACE_KEY).orElse(super.kubernetesNamespace())
+  override private[spark] def getSparkOption(key: String) = {
+    batchConf.get(key).orElse(super.getSparkOption(key))
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 600d96a22c..3b84f6946d 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -150,7 +150,8 @@ class SparkProcessBuilder(
       engineLogPathConf ++
       extraYarnConf(allConf) ++
       appendPodNameConf(allConf) ++
-      prepareK8sFileUploadPath()).foreach {
+      prepareK8sFileUploadPath() ++
+      engineWaitCompletionConf).foreach {
       case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v)
     }
 
@@ -325,11 +326,11 @@ class SparkProcessBuilder(
   }
 
   override def clusterManager(): Option[String] = {
-    conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
+    getSparkOption(MASTER_KEY)
   }
 
   def deployMode(): Option[String] = {
-    conf.getOption(DEPLOY_MODE_KEY).orElse(defaultsConf.get(DEPLOY_MODE_KEY))
+    getSparkOption(DEPLOY_MODE_KEY)
   }
 
   override def isClusterMode(): Boolean = {
@@ -346,16 +347,15 @@ class SparkProcessBuilder(
   }
 
   def kubernetesContext(): Option[String] = {
-    
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
+    getSparkOption(KUBERNETES_CONTEXT_KEY)
   }
 
   def kubernetesNamespace(): Option[String] = {
-    
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
+    getSparkOption(KUBERNETES_NAMESPACE_KEY)
   }
 
   def kubernetesFileUploadPath(): Option[String] = {
-    conf.getOption(KUBERNETES_FILE_UPLOAD_PATH)
-      .orElse(defaultsConf.get(KUBERNETES_FILE_UPLOAD_PATH))
+    getSparkOption(KUBERNETES_FILE_UPLOAD_PATH)
   }
 
   override def validateConf(): Unit = Validator.validateConf(conf)
@@ -373,6 +373,25 @@ class SparkProcessBuilder(
   private[spark] def engineLogPathConf(): Map[String, String] = {
     Map(KYUUBI_ENGINE_LOG_PATH_KEY -> engineLog.getAbsolutePath)
   }
+
+  private[spark] def getSparkOption(key: String): Option[String] = {
+    conf.getOption(key).orElse(defaultsConf.get(key))
+  }
+
+  override def waitEngineCompletion: Boolean = {
+    !isClusterMode() || 
getSparkOption(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
+      
.getOrElse(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.defaultValStr)
+      .toBoolean
+  }
+
+  def engineWaitCompletionConf(): Map[String, String] =
+    clusterManager().map(_.toLowerCase(Locale.ROOT)) match {
+      case Some(m) if m.startsWith("yarn") =>
+        Map(YARN_SUBMIT_WAIT_APP_COMPLETION -> waitEngineCompletion.toString)
+      case Some(m) if m.startsWith("k8s") =>
+        Map(KUBERNETES_SUBMISSION_WAIT_APP_COMPLETION -> 
waitEngineCompletion.toString)
+      case _ => Map.empty
+    }
 }
 
 object SparkProcessBuilder {
@@ -384,7 +403,10 @@ object SparkProcessBuilder {
   final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
   final val KUBERNETES_DRIVER_POD_NAME = "spark.kubernetes.driver.pod.name"
   final val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = 
"spark.kubernetes.executor.podNamePrefix"
+  final val KUBERNETES_SUBMISSION_WAIT_APP_COMPLETION =
+    "spark.kubernetes.submission.waitAppCompletion"
   final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
+  final val YARN_SUBMIT_WAIT_APP_COMPLETION = 
"spark.yarn.submit.waitAppCompletion"
   final val INTERNAL_RESOURCE = "spark-internal"
 
   final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 5a59af5f0c..75a2872ba2 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -79,7 +79,26 @@ class BatchJobSubmission(
   def appStartTime: Long = _appStartTime
   def appStarted: Boolean = _appStartTime > 0
 
-  private lazy val _submitTime = if (appStarted) _appStartTime else 
System.currentTimeMillis
+  private lazy val _submitTime = System.currentTimeMillis
+
+  /**
+   * Batch submission refers to the time interval from the start of the batch 
operation run
+   * to the application being started. This method returns the time within 
this interval based
+   * on the following conditions:
+   * 1. If the application has been submitted to resource manager, return the 
time that application
+   * submitted to resource manager.
+   * 2. If the builder process does not wait for the engine completion and the 
process has not been
+   * terminated, return the current time to prevent application failed within 
NOT_FOUND state if the
+   * process duration exceeds the application submit timeout.
+   * 3. Otherwise, return the time that start to run the batch job submission 
operation.
+   */
+  private def submitTime: Long = if (appStarted) {
+    _appStartTime
+  } else if (!waitEngineCompletion && !startupProcessTerminated) {
+    System.currentTimeMillis()
+  } else {
+    _submitTime
+  }
 
   @VisibleForTesting
   private[kyuubi] val builder: ProcBuilder = {
@@ -100,11 +119,16 @@ class BatchJobSubmission(
       getOperationLog)
   }
 
+  private lazy val waitEngineCompletion = builder.waitEngineCompletion
+
   private lazy val appOperation = 
applicationManager.getApplicationOperation(builder.appMgrInfo())
 
   def startupProcessAlive: Boolean =
     builder.processLaunched && Option(builder.process).exists(_.isAlive)
 
+  private def startupProcessTerminated: Boolean =
+    builder.processLaunched && Option(builder.process).forall(!_.isAlive)
+
   override def currentApplicationInfo(): Option[ApplicationInfo] = {
     if (isTerminal(state) && 
_applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
       return _applicationInfo
@@ -114,7 +138,7 @@ class BatchJobSubmission(
         builder.appMgrInfo(),
         batchId,
         Some(session.user),
-        Some(_submitTime))
+        Some(submitTime))
     applicationId(applicationInfo).foreach { _ =>
       if (_appStartTime <= 0) {
         _appStartTime = System.currentTimeMillis()
@@ -299,6 +323,11 @@ class BatchJobSubmission(
 
       if (!process.isAlive) {
         doUpdateApplicationInfoMetadataIfNeeded()
+        val exitValue = process.exitValue()
+        if (exitValue != 0) {
+          throw new KyuubiException(
+            s"Process exit with value $exitValue, application info: 
${_applicationInfo}")
+        }
       }
 
       if (applicationFailed(_applicationInfo, appOperation)) {
@@ -323,10 +352,7 @@ class BatchJobSubmission(
         case None =>
       }
     } finally {
-      val waitCompletion = 
batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
-        .map(_.toBoolean).getOrElse(
-          
session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION))
-      val destroyProcess = !waitCompletion && builder.isClusterMode()
+      val destroyProcess = !waitEngineCompletion
       if (destroyProcess) {
         info("Destroy the builder process because waitCompletion is false" +
           " and the engine is running in cluster mode.")

Reply via email to