This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a1a08e7f93 [KYUUBI #7132] Respect
`kyuubi.session.engine.startup.waitCompletion` for wait engine completion
a1a08e7f93 is described below
commit a1a08e7f93db380c92fd9ea08504850d65ff0366
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Jul 14 01:49:06 2025 +0800
[KYUUBI #7132] Respect `kyuubi.session.engine.startup.waitCompletion` for
wait engine completion
### Why are the changes needed?
We should not fail the batch submission if the submit process is alive and
wait engine completion is false.
Especially for spark on kubernetes, the app might failed with NOT_FOUND
state if the spark submit process running time more than the submit timeout.
In this PR, if the `kyuubi.session.engine.startup.waitCompletion` is false,
when getting the application info, it use current timestamp as submit time to
prevent the app failed with NOT_FOUND state due to submit timeout.
### How was this patch tested?
Pass current GA and manually testing.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7132 from turboFei/batch_submit.
Closes #7132
efb06db1c [Wang, Fei] refine
7e453c162 [Wang, Fei] refine
7bca1a7aa [Wang, Fei] Prevent potential timeout durartion polling the
application info
15529ab85 [Wang, Fei] prevent metadata manager fail
38335f2f9 [Wang, Fei] refine
9b8a9fde4 [Wang, Fei] comments
11f607daa [Wang, Fei] docs
f2f6ba148 [Wang, Fei] revert
2da0705ad [Wang, Fei] wait for if not wait complete
d84963420 [Wang, Fei] revert check in loop
b4cf50a49 [Wang, Fei] comments
8c262b7ec [Wang, Fei] refine
ecf379b86 [Wang, Fei] Revert conf change
60dc1676e [Wang, Fei] enlarge
4d0aa542a [Wang, Fei] Save
4aea96552 [Wang, Fei] refine
2ad75fcbf [Wang, Fei] nit
a71b11df6 [Wang, Fei] Do not fail batch if the process is alive
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/migration-guide.md | 2 ++
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 3 +-
.../engine/KubernetesApplicationOperation.scala | 13 ++++++--
.../org/apache/kyuubi/engine/ProcBuilder.scala | 4 +++
.../kyuubi/engine/YarnApplicationOperation.scala | 3 +-
.../engine/spark/SparkBatchProcessBuilder.scala | 15 +++------
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 36 ++++++++++++++++----
.../kyuubi/operation/BatchJobSubmission.scala | 38 ++++++++++++++++++----
8 files changed, 84 insertions(+), 30 deletions(-)
diff --git a/docs/deployment/migration-guide.md
b/docs/deployment/migration-guide.md
index 3570da71ff..359ab26bce 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -21,6 +21,8 @@
* Since Kyuubi 1.11, the configuration
`spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is
removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the
latter works without requirement of installing Kyuubi Spark extension.
+* Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will
respect the `kyuubi.session.engine.startup.waitCompletion` config to determine
whether to wait for the engine completion or not. If the engine is running in
client mode, Kyuubi will always wait for the engine completion. And for Spark
engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and
`spark.kubernetes.submission.waitAppCompletion` configs to the engine conf
based on the value of `kyuubi.sess [...]
+
## Upgrading from Kyuubi 1.9 to 1.10
* Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the
future, please use `kyuubi-beeline` instead.
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index a1383d7429..0aa8366235 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -324,8 +324,7 @@ private[kyuubi] class EngineRef(
engineRef.get
} finally {
if (acquiredPermit) startupProcessSemaphore.foreach(_.release())
- val waitCompletion =
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
- val destroyProcess = !waitCompletion && builder.isClusterMode()
+ val destroyProcess = !builder.waitEngineCompletion
if (destroyProcess) {
info("Destroy the builder process because waitCompletion is false" +
" and the engine is running in cluster mode.")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 7fe24e0c6b..6066c951df 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -292,6 +292,7 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
}
debug(s"Getting application[${toLabel(tag)}]'s info from Kubernetes
cluster")
+ val startTime = System.currentTimeMillis()
try {
// need to initialize the kubernetes client if not exists
getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
@@ -299,13 +300,19 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
case (_, info) => info
case _ =>
// try to get the application info from kubernetes engine info store
- metadataManager.flatMap(
-
_.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND)
+ try {
+ metadataManager.flatMap(
+
_.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND)
+ } catch {
+ case e: Exception =>
+ error(s"Failed to get application info from metadata manager for
${toLabel(tag)}", e)
+ ApplicationInfo.NOT_FOUND
+ }
}
(appInfo.state, submitTime) match {
// Kyuubi should wait second if pod is not be created
case (NOT_FOUND, Some(_submitTime)) =>
- val elapsedTime = System.currentTimeMillis - _submitTime
+ val elapsedTime = startTime - _submitTime
if (elapsedTime > submitTimeout) {
error(s"Can't find target driver pod by ${toLabel(tag)}, " +
s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 0e21bb9f73..a2cd268a6c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -355,6 +355,10 @@ trait ProcBuilder {
def clusterManager(): Option[String] = None
def appMgrInfo(): ApplicationManagerInfo = ApplicationManagerInfo(None)
+
+ def waitEngineCompletion: Boolean = {
+ !isClusterMode() ||
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+ }
}
object ProcBuilder extends Logging {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index 6adb62f975..881b94e775 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -131,12 +131,13 @@ class YarnApplicationOperation extends
ApplicationOperation with Logging {
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo =
withYarnClient(proxyUser) { yarnClient =>
debug(s"Getting application info from YARN cluster by tag: $tag")
+ val startTime = System.currentTimeMillis()
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
debug(s"Can't find target application from YARN cluster by tag: $tag")
submitTime match {
case Some(_submitTime) =>
- val elapsedTime = System.currentTimeMillis - _submitTime
+ val elapsedTime = startTime - _submitTime
if (elapsedTime < submitTimeout) {
info(s"Wait for YARN application[tag: $tag] to be submitted, " +
s"elapsed time: ${elapsedTime}ms, return
${ApplicationInfo.UNKNOWN} status")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 11b4e03dde..3b0d8f44f5 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -57,7 +57,8 @@ class SparkBatchProcessBuilder(
sparkAppNameConf() ++
engineLogPathConf() ++
appendPodNameConf(batchConf) ++
- prepareK8sFileUploadPath()).map { case (k, v) =>
+ prepareK8sFileUploadPath() ++
+ engineWaitCompletionConf()).map { case (k, v) =>
buffer ++= confKeyValue(convertConfigKey(k), v)
}
@@ -79,15 +80,7 @@ class SparkBatchProcessBuilder(
override protected def module: String = "kyuubi-spark-batch-submit"
- override def clusterManager(): Option[String] = {
- batchConf.get(MASTER_KEY).orElse(super.clusterManager())
- }
-
- override def kubernetesContext(): Option[String] = {
- batchConf.get(KUBERNETES_CONTEXT_KEY).orElse(super.kubernetesContext())
- }
-
- override def kubernetesNamespace(): Option[String] = {
- batchConf.get(KUBERNETES_NAMESPACE_KEY).orElse(super.kubernetesNamespace())
+ override private[spark] def getSparkOption(key: String) = {
+ batchConf.get(key).orElse(super.getSparkOption(key))
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 600d96a22c..3b84f6946d 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -150,7 +150,8 @@ class SparkProcessBuilder(
engineLogPathConf ++
extraYarnConf(allConf) ++
appendPodNameConf(allConf) ++
- prepareK8sFileUploadPath()).foreach {
+ prepareK8sFileUploadPath() ++
+ engineWaitCompletionConf).foreach {
case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v)
}
@@ -325,11 +326,11 @@ class SparkProcessBuilder(
}
override def clusterManager(): Option[String] = {
- conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
+ getSparkOption(MASTER_KEY)
}
def deployMode(): Option[String] = {
- conf.getOption(DEPLOY_MODE_KEY).orElse(defaultsConf.get(DEPLOY_MODE_KEY))
+ getSparkOption(DEPLOY_MODE_KEY)
}
override def isClusterMode(): Boolean = {
@@ -346,16 +347,15 @@ class SparkProcessBuilder(
}
def kubernetesContext(): Option[String] = {
-
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
+ getSparkOption(KUBERNETES_CONTEXT_KEY)
}
def kubernetesNamespace(): Option[String] = {
-
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
+ getSparkOption(KUBERNETES_NAMESPACE_KEY)
}
def kubernetesFileUploadPath(): Option[String] = {
- conf.getOption(KUBERNETES_FILE_UPLOAD_PATH)
- .orElse(defaultsConf.get(KUBERNETES_FILE_UPLOAD_PATH))
+ getSparkOption(KUBERNETES_FILE_UPLOAD_PATH)
}
override def validateConf(): Unit = Validator.validateConf(conf)
@@ -373,6 +373,25 @@ class SparkProcessBuilder(
private[spark] def engineLogPathConf(): Map[String, String] = {
Map(KYUUBI_ENGINE_LOG_PATH_KEY -> engineLog.getAbsolutePath)
}
+
+ private[spark] def getSparkOption(key: String): Option[String] = {
+ conf.getOption(key).orElse(defaultsConf.get(key))
+ }
+
+ override def waitEngineCompletion: Boolean = {
+ !isClusterMode() ||
getSparkOption(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
+
.getOrElse(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.defaultValStr)
+ .toBoolean
+ }
+
+ def engineWaitCompletionConf(): Map[String, String] =
+ clusterManager().map(_.toLowerCase(Locale.ROOT)) match {
+ case Some(m) if m.startsWith("yarn") =>
+ Map(YARN_SUBMIT_WAIT_APP_COMPLETION -> waitEngineCompletion.toString)
+ case Some(m) if m.startsWith("k8s") =>
+ Map(KUBERNETES_SUBMISSION_WAIT_APP_COMPLETION ->
waitEngineCompletion.toString)
+ case _ => Map.empty
+ }
}
object SparkProcessBuilder {
@@ -384,7 +403,10 @@ object SparkProcessBuilder {
final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
final val KUBERNETES_DRIVER_POD_NAME = "spark.kubernetes.driver.pod.name"
final val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
"spark.kubernetes.executor.podNamePrefix"
+ final val KUBERNETES_SUBMISSION_WAIT_APP_COMPLETION =
+ "spark.kubernetes.submission.waitAppCompletion"
final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
+ final val YARN_SUBMIT_WAIT_APP_COMPLETION =
"spark.yarn.submit.waitAppCompletion"
final val INTERNAL_RESOURCE = "spark-internal"
final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 5a59af5f0c..75a2872ba2 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -79,7 +79,26 @@ class BatchJobSubmission(
def appStartTime: Long = _appStartTime
def appStarted: Boolean = _appStartTime > 0
- private lazy val _submitTime = if (appStarted) _appStartTime else
System.currentTimeMillis
+ private lazy val _submitTime = System.currentTimeMillis
+
+ /**
+ * Batch submission refers to the time interval from the start of the batch
operation run
+ * to the application being started. This method returns the time within
this interval based
+ * on the following conditions:
+ * 1. If the application has been submitted to resource manager, return the
time that application
+ * submitted to resource manager.
+ * 2. If the builder process does not wait for the engine completion and the
process has not been
+ * terminated, return the current time to prevent application failed within
NOT_FOUND state if the
+ * process duration exceeds the application submit timeout.
+ * 3. Otherwise, return the time that start to run the batch job submission
operation.
+ */
+ private def submitTime: Long = if (appStarted) {
+ _appStartTime
+ } else if (!waitEngineCompletion && !startupProcessTerminated) {
+ System.currentTimeMillis()
+ } else {
+ _submitTime
+ }
@VisibleForTesting
private[kyuubi] val builder: ProcBuilder = {
@@ -100,11 +119,16 @@ class BatchJobSubmission(
getOperationLog)
}
+ private lazy val waitEngineCompletion = builder.waitEngineCompletion
+
private lazy val appOperation =
applicationManager.getApplicationOperation(builder.appMgrInfo())
def startupProcessAlive: Boolean =
builder.processLaunched && Option(builder.process).exists(_.isAlive)
+ private def startupProcessTerminated: Boolean =
+ builder.processLaunched && Option(builder.process).forall(!_.isAlive)
+
override def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) &&
_applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
return _applicationInfo
@@ -114,7 +138,7 @@ class BatchJobSubmission(
builder.appMgrInfo(),
batchId,
Some(session.user),
- Some(_submitTime))
+ Some(submitTime))
applicationId(applicationInfo).foreach { _ =>
if (_appStartTime <= 0) {
_appStartTime = System.currentTimeMillis()
@@ -299,6 +323,11 @@ class BatchJobSubmission(
if (!process.isAlive) {
doUpdateApplicationInfoMetadataIfNeeded()
+ val exitValue = process.exitValue()
+ if (exitValue != 0) {
+ throw new KyuubiException(
+ s"Process exit with value $exitValue, application info:
${_applicationInfo}")
+ }
}
if (applicationFailed(_applicationInfo, appOperation)) {
@@ -323,10 +352,7 @@ class BatchJobSubmission(
case None =>
}
} finally {
- val waitCompletion =
batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
- .map(_.toBoolean).getOrElse(
-
session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION))
- val destroyProcess = !waitCompletion && builder.isClusterMode()
+ val destroyProcess = !waitEngineCompletion
if (destroyProcess) {
info("Destroy the builder process because waitCompletion is false" +
" and the engine is running in cluster mode.")