This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new e9bc6792a7 [KYUUBI #6997] Get the latest batch app info after submit
process terminated to prevent batch ERROR due to engine submit timeout
e9bc6792a7 is described below
commit e9bc6792a733071ee00a87c951bd6cc6d09fb3f1
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Mar 24 12:53:22 2025 -0700
[KYUUBI #6997] Get the latest batch app info after submit process
terminated to prevent batch ERROR due to engine submit timeout
### Why are the changes needed?
We meet below issue:
For spark on yarn:
```
spark.yarn.submit.waitAppCompletion=false
kyuubi.engine.yarn.submit.timeout=PT10M
```
Due to network issue, the application submission was very slow.
It was submitted after 15 minutes.
<img width="1430" alt="image"
src="https://github.com/user-attachments/assets/a326c3d1-4d39-42da-b6aa-cad5f8e7fc4b"
/>
<img width="1350" alt="image"
src="https://github.com/user-attachments/assets/8e20056a-bd71-4515-a5e3-f881509a34b2"
/>
Then the batch failed from PENDING state to ERRO state directly, due to
application state NOT_FOUND(exceeds the kyuubi.engine.yarn.submit.timeout).
https://github.com/apache/kyuubi/blob/a54ee39ab338e310c6b9a508ad8f14c0bd82fa0f/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala#L99-L106
<img width="1727" alt="image"
src="https://github.com/user-attachments/assets/20a2987c-675c-4136-a107-001f30b1b217"
/>
Here is the operation event:
<img width="1727" alt="image"
src="https://github.com/user-attachments/assets/e2bab9c3-a959-4e2b-a207-813ae6489b30"
/>
But from the batch log, the current application status should be `PENDING`.
```
:2025-03-21 17:36:19.350 INFO [KyuubiSessionManager-exec-pool:
Thread-176922] org.apache.kyuubi.operation.BatchJobSubmission: Batch report for
bbba09c8-3704-4a87-8394-9bcbbd39cc34,
Some(ApplicationInfo(application_1741747369441_2258235,6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732_6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732.e3a34b86-7fc7-43ea-b4a5-1b6f27df54b5.0_20250322002147.stm,PENDING,Some(https://apollo-rno-rm-2.vip.hadoop.ebay.com:50030/proxy/application_1741747369441_2258235/),Some()))
```
So, we should retrieve the batch application info after the submission
process terminated before checking the application failed, to get the current
application information to prevent the corner case:
1. the application submission time exceeds the
`kyuubi.engine.yarn.submit.timeout` and the app state is NOT FOUND
2. can not get the application report before the submission process
terminated
3. then the batch state to ERROR from PENDING directly.
Conclusion:
The application state transition was:
UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) ->
processExit -> batchOpError -> PENDING(updateApplicationInfoMetadataIfNeeded)
-> UNKNOWN(batchError but app not terminated)
After this PR, it should be:
UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) ->
processExit-> PENDING(after process terminated) -> ....
### How was this patch tested?
Existing GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #6997 from turboFei/app_not_found_v2.
Closes #6997
370cf49e9 [Wang, Fei] v2
912ec28ca [Wang, Fei] nit
3c376f922 [Wang, Fei] log the op ex
d9cbdb87d [Wang, Fei] fix app not found
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 196b47e32a527c4b4d6d296a28b8c6fc7d56a1be)
Signed-off-by: Wang, Fei <[email protected]>
---
.../kyuubi/operation/AbstractOperation.scala | 1 +
.../kyuubi/operation/BatchJobSubmission.scala | 76 ++++++++++++----------
2 files changed, 44 insertions(+), 33 deletions(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 05dd7fda90..4d5db5d4eb 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -103,6 +103,7 @@ abstract class AbstractOperation(session: Session) extends
Operation with Loggin
protected def setOperationException(opEx: KyuubiSQLException): Unit = {
this.operationException = opEx
+ withOperationLog(error(s"Error operating $opType: ${opEx.getMessage}",
opEx))
}
def getOperationJobProgress: TProgressUpdateResp = operationJobProgress
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 771ff3b4ce..f573dc0d2d 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -163,6 +163,8 @@ class BatchJobSubmission(
opState: OperationState,
appState: ApplicationState.ApplicationState):
ApplicationState.ApplicationState = {
if (opState == OperationState.ERROR &&
!ApplicationState.isTerminated(appState)) {
+ withOperationLog(error(s"Batch $batchId state is $opState," +
+ s" but the application state is $appState and not terminated, set to
UNKNOWN."))
ApplicationState.UNKNOWN
} else {
appState
@@ -240,50 +242,58 @@ class BatchJobSubmission(
private def submitAndMonitorBatchJob(): Unit = {
var appStatusFirstUpdated = false
var lastStarvationCheckTime = createTime
+
+ def doUpdateApplicationInfoMetadataIfNeeded(): Unit = {
+ updateApplicationInfoMetadataIfNeeded()
+ if (!appStatusFirstUpdated) {
+ // only the ApplicationInfo with non-empty id indicates that batch is
RUNNING
+ if (applicationId(_applicationInfo).isDefined) {
+ setStateIfNotCanceled(OperationState.RUNNING)
+ updateBatchMetadata()
+ appStatusFirstUpdated = true
+ } else {
+ val currentTime = System.currentTimeMillis()
+ if (currentTime - lastStarvationCheckTime >
applicationStarvationTimeout) {
+ lastStarvationCheckTime = currentTime
+ warn(s"Batch[$batchId] has not started, check the Kyuubi server to
ensure" +
+ s" that batch jobs can be submitted.")
+ }
+ }
+ }
+ }
+
try {
info(s"Submitting $batchType batch[$batchId] job:\n$builder")
val process = builder.start
- while (!applicationFailed(_applicationInfo) && process.isAlive) {
- updateApplicationInfoMetadataIfNeeded()
- if (!appStatusFirstUpdated) {
- // only the ApplicationInfo with non-empty id indicates that batch
is RUNNING
- if (applicationId(_applicationInfo).isDefined) {
- setStateIfNotCanceled(OperationState.RUNNING)
- updateBatchMetadata()
- appStatusFirstUpdated = true
- } else {
- val currentTime = System.currentTimeMillis()
- if (currentTime - lastStarvationCheckTime >
applicationStarvationTimeout) {
- lastStarvationCheckTime = currentTime
- warn(s"Batch[$batchId] has not started, check the Kyuubi server
to ensure" +
- s" that batch jobs can be submitted.")
- }
- }
- }
+ while (process.isAlive && !applicationFailed(_applicationInfo)) {
+ doUpdateApplicationInfoMetadataIfNeeded()
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
}
+ if (!process.isAlive) {
+ doUpdateApplicationInfoMetadataIfNeeded()
+ }
+
if (applicationFailed(_applicationInfo)) {
Utils.terminateProcess(process, applicationStartupDestroyTimeout)
throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
- } else {
- process.waitFor()
- if (process.exitValue() != 0) {
- throw new KyuubiException(s"Process exit with value
${process.exitValue()}")
- }
+ }
- while (!appStarted && applicationId(_applicationInfo).isEmpty &&
- !applicationTerminated(_applicationInfo)) {
- Thread.sleep(applicationCheckInterval)
- updateApplicationInfoMetadataIfNeeded()
- }
+ if (process.waitFor() != 0) {
+ throw new KyuubiException(s"Process exit with value
${process.exitValue}")
+ }
- applicationId(_applicationInfo) match {
- case Some(appId) => monitorBatchJob(appId)
- case None if !appStarted =>
- throw new KyuubiException(s"$batchType batch[$batchId] job failed:
${_applicationInfo}")
- case None =>
- }
+ while (!appStarted && applicationId(_applicationInfo).isEmpty &&
+ !applicationTerminated(_applicationInfo)) {
+ Thread.sleep(applicationCheckInterval)
+ doUpdateApplicationInfoMetadataIfNeeded()
+ }
+
+ applicationId(_applicationInfo) match {
+ case Some(appId) => monitorBatchJob(appId)
+ case None if !appStarted =>
+ throw new KyuubiException(s"$batchType batch[$batchId] job failed:
${_applicationInfo}")
+ case None =>
}
} finally {
val waitCompletion =
batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)