This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new e9bc6792a7 [KYUUBI #6997] Get the latest batch app info after submit 
process terminated to prevent batch ERROR due to engine submit timeout
e9bc6792a7 is described below

commit e9bc6792a733071ee00a87c951bd6cc6d09fb3f1
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Mar 24 12:53:22 2025 -0700

    [KYUUBI #6997] Get the latest batch app info after submit process 
terminated to prevent batch ERROR due to engine submit timeout
    
    ### Why are the changes needed?
    
    We meet below issue:
    For spark on yarn:
    ```
    spark.yarn.submit.waitAppCompletion=false
    kyuubi.engine.yarn.submit.timeout=PT10M
    ```
    
    Due to network issue, the application submission was very slow.
    
    It was submitted after 15 minutes.
    <img width="1430" alt="image" 
src="https://github.com/user-attachments/assets/a326c3d1-4d39-42da-b6aa-cad5f8e7fc4b";
 />
    
    <img width="1350" alt="image" 
src="https://github.com/user-attachments/assets/8e20056a-bd71-4515-a5e3-f881509a34b2";
 />
    
    Then the batch failed from PENDING state to ERRO state directly, due to 
application state NOT_FOUND(exceeds the kyuubi.engine.yarn.submit.timeout).
    
    
https://github.com/apache/kyuubi/blob/a54ee39ab338e310c6b9a508ad8f14c0bd82fa0f/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala#L99-L106
    
    <img width="1727" alt="image" 
src="https://github.com/user-attachments/assets/20a2987c-675c-4136-a107-001f30b1b217";
 />
    
    Here is the operation event:
    <img width="1727" alt="image" 
src="https://github.com/user-attachments/assets/e2bab9c3-a959-4e2b-a207-813ae6489b30";
 />
    
    But from the batch log, the current application status should be `PENDING`.
    ```
    :2025-03-21 17:36:19.350 INFO [KyuubiSessionManager-exec-pool: 
Thread-176922] org.apache.kyuubi.operation.BatchJobSubmission: Batch report for 
bbba09c8-3704-4a87-8394-9bcbbd39cc34, 
Some(ApplicationInfo(application_1741747369441_2258235,6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732_6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732.e3a34b86-7fc7-43ea-b4a5-1b6f27df54b5.0_20250322002147.stm,PENDING,Some(https://apollo-rno-rm-2.vip.hadoop.ebay.com:50030/proxy/application_1741747369441_2258235/),Some()))
    ```
    
    So, we should retrieve the batch application info after the submission 
process terminated before checking the application failed, to get the current 
application information to prevent the corner case:
    1. the application submission time exceeds the 
`kyuubi.engine.yarn.submit.timeout` and the app state is NOT FOUND
    2. can not get the application report before the submission process 
terminated
    3. then the batch state to ERROR from PENDING directly.
    
    Conclusion:
    
    The application state transition was:
    
    UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) -> 
processExit -> batchOpError -> PENDING(updateApplicationInfoMetadataIfNeeded) 
-> UNKNOWN(batchError but app not terminated)
    
    After this PR, it should be:
    
    UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) ->  
processExit-> PENDING(after process terminated) -> ....
    
    ### How was this patch tested?
    
    Existing GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #6997 from turboFei/app_not_found_v2.
    
    Closes #6997
    
    370cf49e9 [Wang, Fei] v2
    912ec28ca [Wang, Fei] nit
    3c376f922 [Wang, Fei] log the op ex
    d9cbdb87d [Wang, Fei] fix app not found
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 196b47e32a527c4b4d6d296a28b8c6fc7d56a1be)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../kyuubi/operation/AbstractOperation.scala       |  1 +
 .../kyuubi/operation/BatchJobSubmission.scala      | 76 ++++++++++++----------
 2 files changed, 44 insertions(+), 33 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 05dd7fda90..4d5db5d4eb 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -103,6 +103,7 @@ abstract class AbstractOperation(session: Session) extends 
Operation with Loggin
 
   protected def setOperationException(opEx: KyuubiSQLException): Unit = {
     this.operationException = opEx
+    withOperationLog(error(s"Error operating $opType: ${opEx.getMessage}", 
opEx))
   }
 
   def getOperationJobProgress: TProgressUpdateResp = operationJobProgress
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 771ff3b4ce..f573dc0d2d 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -163,6 +163,8 @@ class BatchJobSubmission(
       opState: OperationState,
       appState: ApplicationState.ApplicationState): 
ApplicationState.ApplicationState = {
     if (opState == OperationState.ERROR && 
!ApplicationState.isTerminated(appState)) {
+      withOperationLog(error(s"Batch $batchId state is $opState," +
+        s" but the application state is $appState and not terminated, set to 
UNKNOWN."))
       ApplicationState.UNKNOWN
     } else {
       appState
@@ -240,50 +242,58 @@ class BatchJobSubmission(
   private def submitAndMonitorBatchJob(): Unit = {
     var appStatusFirstUpdated = false
     var lastStarvationCheckTime = createTime
+
+    def doUpdateApplicationInfoMetadataIfNeeded(): Unit = {
+      updateApplicationInfoMetadataIfNeeded()
+      if (!appStatusFirstUpdated) {
+        // only the ApplicationInfo with non-empty id indicates that batch is 
RUNNING
+        if (applicationId(_applicationInfo).isDefined) {
+          setStateIfNotCanceled(OperationState.RUNNING)
+          updateBatchMetadata()
+          appStatusFirstUpdated = true
+        } else {
+          val currentTime = System.currentTimeMillis()
+          if (currentTime - lastStarvationCheckTime > 
applicationStarvationTimeout) {
+            lastStarvationCheckTime = currentTime
+            warn(s"Batch[$batchId] has not started, check the Kyuubi server to 
ensure" +
+              s" that batch jobs can be submitted.")
+          }
+        }
+      }
+    }
+
     try {
       info(s"Submitting $batchType batch[$batchId] job:\n$builder")
       val process = builder.start
-      while (!applicationFailed(_applicationInfo) && process.isAlive) {
-        updateApplicationInfoMetadataIfNeeded()
-        if (!appStatusFirstUpdated) {
-          // only the ApplicationInfo with non-empty id indicates that batch 
is RUNNING
-          if (applicationId(_applicationInfo).isDefined) {
-            setStateIfNotCanceled(OperationState.RUNNING)
-            updateBatchMetadata()
-            appStatusFirstUpdated = true
-          } else {
-            val currentTime = System.currentTimeMillis()
-            if (currentTime - lastStarvationCheckTime > 
applicationStarvationTimeout) {
-              lastStarvationCheckTime = currentTime
-              warn(s"Batch[$batchId] has not started, check the Kyuubi server 
to ensure" +
-                s" that batch jobs can be submitted.")
-            }
-          }
-        }
+      while (process.isAlive && !applicationFailed(_applicationInfo)) {
+        doUpdateApplicationInfoMetadataIfNeeded()
         process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
       }
 
+      if (!process.isAlive) {
+        doUpdateApplicationInfoMetadataIfNeeded()
+      }
+
       if (applicationFailed(_applicationInfo)) {
         Utils.terminateProcess(process, applicationStartupDestroyTimeout)
         throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
-      } else {
-        process.waitFor()
-        if (process.exitValue() != 0) {
-          throw new KyuubiException(s"Process exit with value 
${process.exitValue()}")
-        }
+      }
 
-        while (!appStarted && applicationId(_applicationInfo).isEmpty &&
-          !applicationTerminated(_applicationInfo)) {
-          Thread.sleep(applicationCheckInterval)
-          updateApplicationInfoMetadataIfNeeded()
-        }
+      if (process.waitFor() != 0) {
+        throw new KyuubiException(s"Process exit with value 
${process.exitValue}")
+      }
 
-        applicationId(_applicationInfo) match {
-          case Some(appId) => monitorBatchJob(appId)
-          case None if !appStarted =>
-            throw new KyuubiException(s"$batchType batch[$batchId] job failed: 
${_applicationInfo}")
-          case None =>
-        }
+      while (!appStarted && applicationId(_applicationInfo).isEmpty &&
+        !applicationTerminated(_applicationInfo)) {
+        Thread.sleep(applicationCheckInterval)
+        doUpdateApplicationInfoMetadataIfNeeded()
+      }
+
+      applicationId(_applicationInfo) match {
+        case Some(appId) => monitorBatchJob(appId)
+        case None if !appStarted =>
+          throw new KyuubiException(s"$batchType batch[$batchId] job failed: 
${_applicationInfo}")
+        case None =>
       }
     } finally {
       val waitCompletion = 
batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)

Reply via email to