akshatb1 commented on a change in pull request #28258:
URL: https://github.com/apache/spark/pull/28258#discussion_r418993885



##########
File path: core/src/main/scala/org/apache/spark/deploy/Client.scala
##########
@@ -124,38 +128,58 @@ private class ClientEndpoint(
     }
   }
 
-  /* Find out driver status then exit the JVM */
+  /**
+   * Find out driver status then exit the JVM. If the waitAppCompletion is set 
to true, monitors
+   * the application until it finishes, fails or is killed.
+   */
   def pollAndReportStatus(driverId: String): Unit = {
     // Since ClientEndpoint is the only RpcEndpoint in the process, blocking 
the event loop thread
     // is fine.
     logInfo("... waiting before polling master for driver state")
     Thread.sleep(5000)
     logInfo("... polling master for driver state")
-    val statusResponse =
-      
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
-    if (statusResponse.found) {
-      logInfo(s"State of $driverId is ${statusResponse.state.get}")
-      // Worker node, if present
-      (statusResponse.workerId, statusResponse.workerHostPort, 
statusResponse.state) match {
-        case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
-          logInfo(s"Driver running on $hostPort ($id)")
-        case _ =>
-      }
-      // Exception, if present
-      statusResponse.exception match {
-        case Some(e) =>
-          logError(s"Exception from cluster was: $e")
-          e.printStackTrace()
-          System.exit(-1)
-        case _ =>
-          System.exit(0)
+    while (true) {
+      val statusResponse =
+        
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
+      if (statusResponse.found) {
+        logInfo(s"State of $driverId is ${statusResponse.state.get}")
+        // Worker node, if present
+        (statusResponse.workerId, statusResponse.workerHostPort, 
statusResponse.state) match {
+          case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
+            logInfo(s"Driver running on $hostPort ($id)")
+          case _ =>
+        }
+        // Exception, if present
+        statusResponse.exception match {
+          case Some(e) =>
+            logError(s"Exception from cluster was: $e")
+            e.printStackTrace()
+            System.exit(-1)
+          case _ =>
+            if (!waitAppCompletion) {
+              logInfo(s"spark-submit not configured to wait for completion, " +
+                s"exiting spark-submit JVM.")
+              System.exit(0)
+            } else {
+              statusResponse.state.get match {
+                case DriverState.FINISHED | DriverState.FAILED |
+                     DriverState.ERROR | DriverState.KILLED =>
+                  logInfo(s"State of $driverId is  
${statusResponse.state.get}, " +
+                  s"exiting spark-submit JVM.")

Review comment:
       @srowen I am presuming this was to fix indentation in line # 168. If so, 
I have updated it in the latest commit. If not, kindly let me know which line # 
you were referring to. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to