This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ecfca79328 [KYUUBI #7033] Treat YARN/Kubernetes application NOT_FOUND
as failed to prevent data quality issue
ecfca79328 is described below
commit ecfca79328c0c7add3154457e2f723b6c49e8782
Author: Wang, Fei <[email protected]>
AuthorDate: Sun Apr 27 21:09:08 2025 +0800
[KYUUBI #7033] Treat YARN/Kubernetes application NOT_FOUND as failed to
prevent data quality issue
### Why are the changes needed?
Currently, NOT_FOUND application stated is treated as a terminated but not
failed state.
It might cause some data quality issue if downstream application depends on
the batch state for data processing.
So, I think we should treat NOT_FOUND as a failed state instead.
Currently, we support 3 types of application manager.
1.
[JpsApplicationOperation](https://github.com/apache/kyuubi/blob/master/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala)
2.
[YarnApplicationOperation](https://github.com/apache/kyuubi/blob/master/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala)
3.
[KubernetesApplicationOperation](https://github.com/apache/kyuubi/blob/master/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala)
YarnApplicationOperation and KubernetesApplicationOperation are widely used
in production use case.
And in multiple kyuubi instance mode, the NOT_FOUND case should rarely
happen.
1.
https://github.com/apache/kyuubi/blob/7e199d6fdbdf52222bb3eadd056b9e5a2295f36e/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala#L369-L385
3. https://github.com/apache/kyuubi/pull/7029
So, I think we should treat NOT_FOUND as a failed state in production use
case.
It is better to fail some corner cases than to mistakenly set unsuccessful
batches to the finished state.
### How was this patch tested?
GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7033 from turboFei/revist_not_found.
Closes #7033
ada4f8822 [Cheng Pan] Update
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
985e23c24 [Wang, Fei] Refine
f03d61242 [Wang, Fei] comments
b9d6ac203 [Wang, Fei] incase the metadata updated by peer instance
3bd61ca85 [Wang, Fei] add
339df4730 [Wang, Fei] treat NOT_FOUND as failed
Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/ApplicationOperation.scala | 15 ++++++++-
.../kyuubi/engine/JpsApplicationOperation.scala | 2 ++
.../engine/KubernetesApplicationOperation.scala | 9 +++--
.../kyuubi/engine/KyuubiApplicationManager.scala | 5 +++
.../kyuubi/engine/YarnApplicationOperation.scala | 2 ++
.../kyuubi/operation/BatchJobSubmission.scala | 38 +++++++++++++++++-----
.../kyuubi/server/api/v1/BatchesResource.scala | 3 +-
7 files changed, 60 insertions(+), 14 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index d54dff1a60..728458646d 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -85,15 +85,28 @@ trait ApplicationOperation {
tag: String,
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo
+
+ /**
+ * Whether the application state can be persisted and retrieved after
finished.
+ * @return true if the application state can be persisted
+ */
+ def supportPersistedAppState: Boolean
}
object ApplicationState extends Enumeration {
type ApplicationState = Value
val PENDING, RUNNING, FINISHED, KILLED, FAILED, ZOMBIE, NOT_FOUND, UNKNOWN =
Value
- def isFailed(state: ApplicationState): Boolean = state match {
+ def isFailed(
+ state: ApplicationState,
+ appOperation: Option[ApplicationOperation]): Boolean = {
+ isFailed(state, appOperation.exists(_.supportPersistedAppState))
+ }
+
+ def isFailed(state: ApplicationState, supportPersistedAppState: Boolean):
Boolean = state match {
case FAILED => true
case KILLED => true
+ case NOT_FOUND if supportPersistedAppState => true
case _ => false
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index ffc233c016..7e37947b86 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -105,5 +105,7 @@ class JpsApplicationOperation extends ApplicationOperation {
// TODO check if the process is zombie
}
+ override def supportPersistedAppState: Boolean = false
+
override def stop(): Unit = {}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 116afd50da..a808030f43 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -137,7 +137,7 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
val shouldDelete = cleanupDriverPodStrategy match {
case NONE => false
case ALL => true
- case COMPLETED => !ApplicationState.isFailed(notification.getValue)
+ case COMPLETED =>
!ApplicationState.isFailed(notification.getValue, Some(this))
}
if (shouldDelete) {
deletePod(kubernetesInfo, removed.podName.orNull, appLabel)
@@ -289,6 +289,8 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
}
}
+ override def supportPersistedAppState: Boolean = true
+
override def stop(): Unit = {
enginePodInformers.asScala.foreach { case (_, informer) =>
Utils.tryLogNonFatalError(informer.stop())
@@ -594,8 +596,8 @@ object KubernetesApplicationOperation extends Logging {
case Some(containerAppState) => containerAppState
case None => podAppState
}
- val applicationError =
- if (ApplicationState.isFailed(applicationState)) {
+ val applicationError = {
+ if (ApplicationState.isFailed(applicationState, supportPersistedAppState
= true)) {
val errorMap = containerStatusToBuildAppState.map { cs =>
Map(
"Pod" -> podName,
@@ -609,6 +611,7 @@ object KubernetesApplicationOperation extends Logging {
} else {
None
}
+ }
applicationState -> applicationError
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 657e529f55..763bea1f93 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -103,6 +103,11 @@ class KyuubiApplicationManager(metadataManager:
Option[MetadataManager])
operations.find(_.isInstanceOf[KubernetesApplicationOperation])
.map(_.asInstanceOf[KubernetesApplicationOperation])
}
+
+ private[kyuubi] def getApplicationOperation(appMgrInfo:
ApplicationManagerInfo)
+ : Option[ApplicationOperation] = {
+ operations.find(_.isSupported(appMgrInfo))
+ }
}
object KyuubiApplicationManager {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index 39ee294873..6adb62f975 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -165,6 +165,8 @@ class YarnApplicationOperation extends ApplicationOperation
with Logging {
}
}
+ override def supportPersistedAppState: Boolean = true
+
override def stop(): Unit = adminYarnClient.foreach { yarnClient =>
Utils.tryLogNonFatalError(yarnClient.stop())
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 9dfc3b04e1..5a59af5f0c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -22,10 +22,11 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
+import org.apache.commons.lang3.StringUtils
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState,
KillResponse, ProcBuilder}
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation,
ApplicationState, KillResponse, ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
@@ -99,6 +100,8 @@ class BatchJobSubmission(
getOperationLog)
}
+ private lazy val appOperation =
applicationManager.getApplicationOperation(builder.appMgrInfo())
+
def startupProcessAlive: Boolean =
builder.processLaunched && Option(builder.process).exists(_.isAlive)
@@ -212,6 +215,20 @@ class BatchJobSubmission(
metadata match {
case Some(metadata) if metadata.peerInstanceClosed =>
setState(OperationState.CANCELED)
+ case Some(metadata)
+ // in case it has been updated by peer kyuubi instance, see
KYUUBI #6278
+ if StringUtils.isNotBlank(metadata.engineState) &&
+
ApplicationState.isTerminated(ApplicationState.withName(metadata.engineState))
=>
+ _applicationInfo = Some(new ApplicationInfo(
+ id = metadata.engineId,
+ name = metadata.engineName,
+ state = ApplicationState.withName(metadata.engineState),
+ url = Option(metadata.engineUrl),
+ error = metadata.engineError))
+ if (applicationFailed(_applicationInfo, appOperation)) {
+ throw new KyuubiException(
+ s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+ }
case Some(metadata) if metadata.state ==
OperationState.PENDING.toString =>
// case 1: new batch job created using batch impl v2
// case 2: batch job from recovery, do submission only when
previous state is
@@ -275,7 +292,7 @@ class BatchJobSubmission(
try {
info(s"Submitting $batchType batch[$batchId] job:\n$builder")
val process = builder.start
- while (process.isAlive && !applicationFailed(_applicationInfo)) {
+ while (process.isAlive && !applicationFailed(_applicationInfo,
appOperation)) {
doUpdateApplicationInfoMetadataIfNeeded()
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
}
@@ -284,7 +301,7 @@ class BatchJobSubmission(
doUpdateApplicationInfoMetadataIfNeeded()
}
- if (applicationFailed(_applicationInfo)) {
+ if (applicationFailed(_applicationInfo, appOperation)) {
Utils.terminateProcess(process, applicationStartupDestroyTimeout)
throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
}
@@ -329,10 +346,9 @@ class BatchJobSubmission(
setStateIfNotCanceled(OperationState.RUNNING)
}
if (_applicationInfo.isEmpty) {
- info(s"The $batchType batch[$batchId] job: $appId not found, assume that
it has finished.")
- return
+ _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
}
- if (applicationFailed(_applicationInfo)) {
+ if (applicationFailed(_applicationInfo, appOperation)) {
throw new KyuubiException(s"$batchType batch[$batchId] job failed:
${_applicationInfo}")
}
updateBatchMetadata()
@@ -341,7 +357,7 @@ class BatchJobSubmission(
Thread.sleep(applicationCheckInterval)
updateApplicationInfoMetadataIfNeeded()
}
- if (applicationFailed(_applicationInfo)) {
+ if (applicationFailed(_applicationInfo, appOperation)) {
throw new KyuubiException(s"$batchType batch[$batchId] job failed:
${_applicationInfo}")
}
}
@@ -445,8 +461,12 @@ class BatchJobSubmission(
}
object BatchJobSubmission {
- def applicationFailed(applicationStatus: Option[ApplicationInfo]): Boolean =
{
- applicationStatus.map(_.state).exists(ApplicationState.isFailed)
+ def applicationFailed(
+ applicationStatus: Option[ApplicationInfo],
+ appOperation: Option[ApplicationOperation]): Boolean = {
+ applicationStatus.map(_.state).exists { state =>
+ ApplicationState.isFailed(state, appOperation)
+ }
}
def applicationTerminated(applicationStatus: Option[ApplicationInfo]):
Boolean = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 71ef564da3..6e7fa9a337 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -129,8 +129,9 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
metadata: Metadata,
batchAppStatus: Option[ApplicationInfo]): Batch = {
batchAppStatus.map { appStatus =>
+ val appOp =
sessionManager.applicationManager.getApplicationOperation(metadata.appMgrInfo)
val currentBatchState =
- if (BatchJobSubmission.applicationFailed(batchAppStatus)) {
+ if (BatchJobSubmission.applicationFailed(batchAppStatus, appOp)) {
OperationState.ERROR.toString
} else if (BatchJobSubmission.applicationTerminated(batchAppStatus)) {
OperationState.FINISHED.toString