This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new bada9c0411 [KYUUBI #7095] Respect terminated app state when building
batch info from metadata
bada9c0411 is described below
commit bada9c04115a966132d66bb0cc5e7997e97b4b1b
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Jun 12 10:20:44 2025 -0700
[KYUUBI #7095] Respect terminated app state when building batch info from
metadata
### Why are the changes needed?
Respect terminated app state when building batch info from metadata
It is a followup for https://github.com/apache/kyuubi/pull/2911,
https://github.com/apache/kyuubi/blob/9e40e39c39566c435ad92f0659c6120b9f2b8578/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala#L128-L142
1. if the kyuubi instance is unreachable during maintain window.
2. the batch app state has been terminated, and the app stated was
backfilled by another kyuubi instance peer, see #2911
3. the batch state in the metadata table is still PENDING/RUNNING
4. return the terminated batch state for such case instead of `PENDING or
RUNNING`.
### How was this patch tested?
GA and IT.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7095 from turboFei/always_respect_appstate.
Closes #7095
ec72666c9 [Wang, Fei] rename
bc74a9c56 [Wang, Fei] if op not terminated
e786c8d9b [Wang, Fei] respect terminated app state when building batch info
from metadata
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../kyuubi/server/api/v1/BatchesResource.scala | 23 ++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 6e7fa9a337..4632527559 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -91,7 +91,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
private def sessionManager =
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
- private def buildBatch(session: KyuubiBatchSession): Batch = {
+ private def buildBatchFromSession(session: KyuubiBatchSession): Batch = {
val batchOp = session.batchJobSubmissionOp
val batchOpStatus = batchOp.getStatus
@@ -125,7 +125,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
Map.empty[String, String].asJava)
}
- private def buildBatch(
+ private def buildBatchFromMetadataAndAppInfo(
metadata: Metadata,
batchAppStatus: Option[ApplicationInfo]): Batch = {
batchAppStatus.map { appStatus =>
@@ -316,7 +316,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
} match {
case Success(sessionHandle) =>
sessionManager.getBatchSession(sessionHandle) match {
- case Some(batchSession) => buildBatch(batchSession)
+ case Some(batchSession) => buildBatchFromSession(batchSession)
case None => throw new IllegalStateException(
s"can not find batch $batchId from metadata store")
}
@@ -349,7 +349,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
val userName = fe.getSessionUser(Map.empty[String, String])
val sessionHandle = formatSessionHandle(batchId)
sessionManager.getBatchSession(sessionHandle).map { batchSession =>
- buildBatch(batchSession)
+ buildBatchFromSession(batchSession)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
val isOperationTerminated = (StringUtils.isNotBlank(metadata.state)
@@ -362,7 +362,18 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
isOperationTerminated ||
isApplicationTerminated ||
metadata.kyuubiInstance == fe.connectionUrl) {
- MetadataManager.buildBatch(metadata)
+ if (isApplicationTerminated && !isOperationTerminated) {
+ buildBatchFromMetadataAndAppInfo(
+ metadata,
+ Some(ApplicationInfo(
+ metadata.engineId,
+ metadata.engineName,
+ metadata.appState.orNull,
+ Option(metadata.engineUrl),
+ metadata.engineError)))
+ } else {
+ MetadataManager.buildBatch(metadata)
+ }
} else {
val internalRestClient =
getInternalRestClient(metadata.kyuubiInstance)
try {
@@ -387,7 +398,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
engineState = appInfo.state.toString,
engineError = appInfo.error))
}
- buildBatch(metadata, batchAppStatus)
+ buildBatchFromMetadataAndAppInfo(metadata, batchAppStatus)
}
}
}.getOrElse {