Repository: spark
Updated Branches:
  refs/heads/master 813effc70 -> 137d94235


[SPARK-3877][YARN] Throw an exception when application is not successful so 
that the exit code wil be set to 1

When an yarn application fails (yarn-cluster mode), the exit code of 
spark-submit is still 0. It's hard for people to write some automatic scripts 
to run spark jobs in yarn because the failure can not be detected in these 
scripts.

This PR added a status checking after `monitorApplication`. If an application 
is not successful, `run()` will throw an `SparkException`, so that Client.scala 
will exit with code 1. Therefore, people can use the exit code of 
`spark-submit` to write some automatic scripts.

Author: zsxwing <zsxw...@gmail.com>

Closes #2732 from zsxwing/SPARK-3877 and squashes the following commits:

1f89fa5 [zsxwing] Fix the unit test
a0498e1 [zsxwing] Update the docs and the error message
e1cb9ef [zsxwing] Fix the hacky way of calling Client
ff16fec [zsxwing] Remove System.exit in Client.scala and add a test
6a2c103 [zsxwing] [SPARK-3877] Throw an exception when application is not 
successful so that the exit code wil be set to 1


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/137d9423
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/137d9423
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/137d9423

Branch: refs/heads/master
Commit: 137d94235383cc49ccf8a7bb7f314f578aa1dede
Parents: 813effc
Author: zsxwing <zsxw...@gmail.com>
Authored: Wed Oct 22 15:04:41 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Wed Oct 22 15:04:41 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 12 ++------
 .../apache/spark/deploy/yarn/ClientBase.scala   | 29 ++++++++++++++++----
 .../cluster/YarnClientSchedulerBackend.scala    |  2 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 12 ++------
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 24 ++++++++++------
 5 files changed, 44 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/137d9423/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5c7bca4..9c66c78 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -137,15 +137,7 @@ object Client {
     System.setProperty("SPARK_YARN_MODE", "true")
     val sparkConf = new SparkConf
 
-    try {
-      val args = new ClientArguments(argStrings, sparkConf)
-      new Client(args, sparkConf).run()
-    } catch {
-      case e: Exception =>
-        Console.err.println(e.getMessage)
-        System.exit(1)
-    }
-
-    System.exit(0)
+    val args = new ClientArguments(argStrings, sparkConf)
+    new Client(args, sparkConf).run()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/137d9423/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 0efac4e..fb0e34b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -417,17 +417,19 @@ private[spark] trait ClientBase extends Logging {
 
   /**
    * Report the state of an application until it has exited, either 
successfully or
-   * due to some failure, then return the application state.
+   * due to some failure, then return a pair of the yarn application state 
(FINISHED, FAILED,
+   * KILLED, or RUNNING) and the final application state (UNDEFINED, 
SUCCEEDED, FAILED,
+   * or KILLED).
    *
    * @param appId ID of the application to monitor.
    * @param returnOnRunning Whether to also return the application state when 
it is RUNNING.
    * @param logApplicationReport Whether to log details of the application 
report every iteration.
-   * @return state of the application, one of FINISHED, FAILED, KILLED, and 
RUNNING.
+   * @return A pair of the yarn application state and the final application 
state.
    */
   def monitorApplication(
       appId: ApplicationId,
       returnOnRunning: Boolean = false,
-      logApplicationReport: Boolean = true): YarnApplicationState = {
+      logApplicationReport: Boolean = true): (YarnApplicationState, 
FinalApplicationStatus) = {
     val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
     var lastState: YarnApplicationState = null
     while (true) {
@@ -468,11 +470,11 @@ private[spark] trait ClientBase extends Logging {
       if (state == YarnApplicationState.FINISHED ||
         state == YarnApplicationState.FAILED ||
         state == YarnApplicationState.KILLED) {
-        return state
+        return (state, report.getFinalApplicationStatus)
       }
 
       if (returnOnRunning && state == YarnApplicationState.RUNNING) {
-        return state
+        return (state, report.getFinalApplicationStatus)
       }
 
       lastState = state
@@ -485,8 +487,23 @@ private[spark] trait ClientBase extends Logging {
   /**
    * Submit an application to the ResourceManager and monitor its state.
    * This continues until the application has exited for any reason.
+   * If the application finishes with a failed, killed, or undefined status,
+   * throw an appropriate SparkException.
    */
-  def run(): Unit = monitorApplication(submitApplication())
+  def run(): Unit = {
+    val (yarnApplicationState, finalApplicationStatus) = 
monitorApplication(submitApplication())
+    if (yarnApplicationState == YarnApplicationState.FAILED ||
+      finalApplicationStatus == FinalApplicationStatus.FAILED) {
+      throw new SparkException("Application finished with failed status")
+    }
+    if (yarnApplicationState == YarnApplicationState.KILLED ||
+      finalApplicationStatus == FinalApplicationStatus.KILLED) {
+      throw new SparkException("Application is killed")
+    }
+    if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+      throw new SparkException("The final status of application is undefined")
+    }
+  }
 
   /* 
---------------------------------------------------------------------------------------
 *
    |  Methods that cannot be implemented here due to API differences across 
hadoop versions  |

http://git-wip-us.apache.org/repos/asf/spark/blob/137d9423/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 6bb4b82..d948a2a 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -99,7 +99,7 @@ private[spark] class YarnClientSchedulerBackend(
    */
   private def waitForApplication(): Unit = {
     assert(client != null && appId != null, "Application has not been 
submitted yet!")
-    val state = client.monitorApplication(appId, returnOnRunning = true) // 
blocking
+    val (state, _) = client.monitorApplication(appId, returnOnRunning = true) 
// blocking
     if (state == YarnApplicationState.FINISHED ||
       state == YarnApplicationState.FAILED ||
       state == YarnApplicationState.KILLED) {

http://git-wip-us.apache.org/repos/asf/spark/blob/137d9423/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0b43e6e..addaddb 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -135,15 +135,7 @@ object Client {
     System.setProperty("SPARK_YARN_MODE", "true")
     val sparkConf = new SparkConf
 
-    try {
-      val args = new ClientArguments(argStrings, sparkConf)
-      new Client(args, sparkConf).run()
-    } catch {
-      case e: Exception =>
-        Console.err.println(e.getMessage)
-        System.exit(1)
-    }
-
-    System.exit(0)
+    val args = new ClientArguments(argStrings, sparkConf)
+    new Client(args, sparkConf).run()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/137d9423/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index a826b2a..d79b85e 100644
--- 
a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ 
b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.MiniYARNCluster
 
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
@@ -123,21 +123,29 @@ class YarnClusterSuite extends FunSuite with 
BeforeAndAfterAll with Matchers wit
     val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
     var result = File.createTempFile("result", null, tempDir)
 
-    // The Client object will call System.exit() after the job is done, and we 
don't want
-    // that because it messes up the scalatest monitoring. So replicate some 
of what main()
-    // does here.
     val args = Array("--class", main,
       "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
       "--arg", "yarn-cluster",
       "--arg", result.getAbsolutePath(),
       "--num-executors", "1")
-    val sparkConf = new SparkConf()
-    val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
-    val clientArgs = new ClientArguments(args, sparkConf)
-    new Client(clientArgs, yarnConf, sparkConf).run()
+    Client.main(args)
     checkResult(result)
   }
 
+  test("run Spark in yarn-cluster mode unsuccessfully") {
+    val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
+
+    // Use only one argument so the driver will fail
+    val args = Array("--class", main,
+      "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
+      "--arg", "yarn-cluster",
+      "--num-executors", "1")
+    val exception = intercept[SparkException] {
+      Client.main(args)
+    }
+    assert(Utils.exceptionString(exception).contains("Application finished 
with failed status"))
+  }
+
   /**
    * This is a workaround for an issue with yarn-cluster mode: the Client 
class will not provide
    * any sort of error when the job process finishes successfully, but the job 
itself fails. So


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to