Repository: spark
Updated Branches:
  refs/heads/master d0ea49687 -> 9c73822a0


[SPARK-2423] Clean up SparkSubmit for readability

It is currently non-trivial to trace through how different combinations of 
cluster managers (e.g. yarn) and deploy modes (e.g. cluster) are processed in 
SparkSubmit. Moving forward, it will be easier to extend SparkSubmit if we 
first re-organize the code by grouping related logic together.

This is a precursor to fixing standalone-cluster mode, which is currently 
broken (SPARK-2260).

Author: Andrew Or <andrewo...@gmail.com>

Closes #1349 from andrewor14/submit-cleanup and squashes the following commits:

8f99200 [Andrew Or] script -> program (minor)
30f2e65 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
submit-cleanup
fe484a1 [Andrew Or] Move deploy mode checks after yarn code
7167824 [Andrew Or] Re-order config options and update comments
0b01ff8 [Andrew Or] Clean up SparkSubmit for readability


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c73822a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c73822a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c73822a

Branch: refs/heads/master
Commit: 9c73822a08848a0cde545282d3eb1c3f1a4c2a82
Parents: d0ea496
Author: Andrew Or <andrewo...@gmail.com>
Authored: Thu Jul 17 01:13:32 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Thu Jul 17 01:13:32 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   | 289 ++++++++++---------
 1 file changed, 145 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c73822a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b050dcc..3d8373d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -27,25 +27,39 @@ import org.apache.spark.executor.ExecutorURLClassLoader
 import org.apache.spark.util.Utils
 
 /**
- * Scala code behind the spark-submit script.  The script handles setting up 
the classpath with
- * relevant Spark dependencies and provides a layer over the different cluster 
managers and deploy
- * modes that Spark supports.
+ * Main gateway of launching a Spark application.
+ *
+ * This program handles setting up the classpath with relevant Spark 
dependencies and provides
+ * a layer over the different cluster managers and deploy modes that Spark 
supports.
  */
 object SparkSubmit {
+
+  // Cluster managers
   private val YARN = 1
   private val STANDALONE = 2
   private val MESOS = 4
   private val LOCAL = 8
   private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
 
-  private var clusterManager: Int = LOCAL
+  // Deploy modes
+  private val CLIENT = 1
+  private val CLUSTER = 2
+  private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
 
-  /**
-   * Special primary resource names that represent shells rather than 
application jars.
-   */
+  // Special primary resource names that represent shells rather than 
application jars.
   private val SPARK_SHELL = "spark-shell"
   private val PYSPARK_SHELL = "pyspark-shell"
 
+  // Exposed for testing
+  private[spark] var exitFn: () => Unit = () => System.exit(-1)
+  private[spark] var printStream: PrintStream = System.err
+  private[spark] def printWarning(str: String) = printStream.println("Warning: 
" + str)
+  private[spark] def printErrorAndExit(str: String) = {
+    printStream.println("Error: " + str)
+    printStream.println("Run with --help for usage help or --verbose for debug 
output")
+    exitFn()
+  }
+
   def main(args: Array[String]) {
     val appArgs = new SparkSubmitArguments(args)
     if (appArgs.verbose) {
@@ -55,88 +69,80 @@ object SparkSubmit {
     launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
   }
 
-  // Exposed for testing
-  private[spark] var printStream: PrintStream = System.err
-  private[spark] var exitFn: () => Unit = () => System.exit(-1)
-
-  private[spark] def printErrorAndExit(str: String) = {
-    printStream.println("Error: " + str)
-    printStream.println("Run with --help for usage help or --verbose for debug 
output")
-    exitFn()
-  }
-  private[spark] def printWarning(str: String) = printStream.println("Warning: 
" + str)
-
   /**
-   * @return a tuple containing the arguments for the child, a list of 
classpath
-   *         entries for the child, a list of system properties, a list of env 
vars
-   *         and the main class for the child
+   * @return a tuple containing
+   *           (1) the arguments for the child process,
+   *           (2) a list of classpath entries for the child,
+   *           (3) a list of system properties and env vars, and
+   *           (4) the main class for the child
    */
   private[spark] def createLaunchEnv(args: SparkSubmitArguments)
       : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], 
String) = {
-    if (args.master.startsWith("local")) {
-      clusterManager = LOCAL
-    } else if (args.master.startsWith("yarn")) {
-      clusterManager = YARN
-    } else if (args.master.startsWith("spark")) {
-      clusterManager = STANDALONE
-    } else if (args.master.startsWith("mesos")) {
-      clusterManager = MESOS
-    } else {
-      printErrorAndExit("Master must start with yarn, mesos, spark, or local")
-    }
-
-    // Because "yarn-cluster" and "yarn-client" encapsulate both the master
-    // and deploy mode, we have some logic to infer the master and deploy mode
-    // from each other if only one is specified, or exit early if they are at 
odds.
-    if (args.deployMode == null &&
-        (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
-      args.deployMode = "cluster"
-    }
-    if (args.deployMode == "cluster" && args.master == "yarn-client") {
-      printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" 
are not compatible")
-    }
-    if (args.deployMode == "client" &&
-        (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
-      printErrorAndExit("Deploy mode \"client\" and master \"" + args.master
-        + "\" are not compatible")
-    }
-    if (args.deployMode == "cluster" && args.master.startsWith("yarn")) {
-      args.master = "yarn-cluster"
-    }
-    if (args.deployMode != "cluster" && args.master.startsWith("yarn")) {
-      args.master = "yarn-client"
-    }
-
-    val deployOnCluster = Option(args.deployMode).getOrElse("client") == 
"cluster"
 
-    val childClasspath = new ArrayBuffer[String]()
+    // Values to return
     val childArgs = new ArrayBuffer[String]()
+    val childClasspath = new ArrayBuffer[String]()
     val sysProps = new HashMap[String, String]()
     var childMainClass = ""
 
-    val isPython = args.isPython
-    val isYarnCluster = clusterManager == YARN && deployOnCluster
+    // Set the cluster manager
+    val clusterManager: Int = args.master match {
+      case m if m.startsWith("yarn") => YARN
+      case m if m.startsWith("spark") => STANDALONE
+      case m if m.startsWith("mesos") => MESOS
+      case m if m.startsWith("local") => LOCAL
+      case _ => printErrorAndExit("Master must start with yarn, spark, mesos, 
or local"); -1
+    }
 
-    // For mesos, only client mode is supported
-    if (clusterManager == MESOS && deployOnCluster) {
-      printErrorAndExit("Cluster deploy mode is currently not supported for 
Mesos clusters.")
+    // Set the deploy mode; default is client mode
+    var deployMode: Int = args.deployMode match {
+      case "client" | null => CLIENT
+      case "cluster" => CLUSTER
+      case _ => printErrorAndExit("Deploy mode must be either client or 
cluster"); -1
     }
 
-    // For standalone, only client mode is supported
-    if (clusterManager == STANDALONE && deployOnCluster) {
-      printErrorAndExit("Cluster deploy mode is currently not supported for 
standalone clusters.")
+    // Because "yarn-cluster" and "yarn-client" encapsulate both the master
+    // and deploy mode, we have some logic to infer the master and deploy mode
+    // from each other if only one is specified, or exit early if they are at 
odds.
+    if (clusterManager == YARN) {
+      if (args.master == "yarn-standalone") {
+        printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" 
instead.")
+        args.master = "yarn-cluster"
+      }
+      (args.master, args.deployMode) match {
+        case ("yarn-cluster", null) =>
+          deployMode = CLUSTER
+        case ("yarn-cluster", "client") =>
+          printErrorAndExit("Client deploy mode is not compatible with master 
\"yarn-cluster\"")
+        case ("yarn-client", "cluster") =>
+          printErrorAndExit("Cluster deploy mode is not compatible with master 
\"yarn-client\"")
+        case (_, mode) =>
+          args.master = "yarn-" + Option(mode).getOrElse("client")
+      }
+
+      // Make sure YARN is included in our build if we're trying to use it
+      if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && 
!Utils.isTesting) {
+        printErrorAndExit(
+          "Could not load YARN classes. " +
+          "This copy of Spark may not have been compiled with YARN support.")
+      }
     }
 
-    // For shells, only client mode is applicable
-    if (isShell(args.primaryResource) && deployOnCluster) {
-      printErrorAndExit("Cluster deploy mode is not applicable to Spark 
shells.")
+    // The following modes are not supported or applicable
+    (clusterManager, deployMode) match {
+      case (MESOS, CLUSTER) =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for 
Mesos clusters.")
+      case (STANDALONE, CLUSTER) =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for 
Standalone clusters.")
+      case (_, CLUSTER) if args.isPython =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for 
python applications.")
+      case (_, CLUSTER) if isShell(args.primaryResource) =>
+        printErrorAndExit("Cluster deploy mode is not applicable to Spark 
shells.")
+      case _ =>
     }
 
     // If we're running a python app, set the main class to our specific 
python runner
-    if (isPython) {
-      if (deployOnCluster) {
-        printErrorAndExit("Cluster deploy mode is currently not supported for 
python.")
-      }
+    if (args.isPython) {
       if (args.primaryResource == PYSPARK_SHELL) {
         args.mainClass = "py4j.GatewayServer"
         args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
@@ -152,120 +158,115 @@ object SparkSubmit {
       sysProps("spark.submit.pyFiles") = 
PythonRunner.formatPaths(args.pyFiles).mkString(",")
     }
 
-    // If we're deploying into YARN, use yarn.Client as a wrapper around the 
user class
-    if (!deployOnCluster) {
-      childMainClass = args.mainClass
-      if (isUserJar(args.primaryResource)) {
-        childClasspath += args.primaryResource
-      }
-    } else if (clusterManager == YARN) {
-      childMainClass = "org.apache.spark.deploy.yarn.Client"
-      childArgs += ("--jar", args.primaryResource)
-      childArgs += ("--class", args.mainClass)
-    }
-
-    // Make sure YARN is included in our build if we're trying to use it
-    if (clusterManager == YARN) {
-      if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && 
!Utils.isTesting) {
-        printErrorAndExit("Could not load YARN classes. " +
-          "This copy of Spark may not have been compiled with YARN support.")
-      }
-    }
-
     // Special flag to avoid deprecation warnings at the client
     sysProps("SPARK_SUBMIT") = "true"
 
     // A list of rules to map each argument to system properties or 
command-line options in
     // each deploy mode; we iterate through these below
     val options = List[OptionAssigner](
-      OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = 
"spark.master"),
-      OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = 
"spark.app.name"),
-      OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = 
"spark.app.name"),
-      OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
+
+      // All cluster managers
+      OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = 
"spark.master"),
+      OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = 
"spark.app.name"),
+      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = 
"spark.jars"),
+
+      // Standalone cluster only
+      OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = 
"--memory"),
+      OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = 
"--cores"),
+
+      // Yarn client only
+      OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
+      OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = 
"spark.executor.instances"),
+      OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = 
"spark.executor.cores"),
+      OptionAssigner(args.files, YARN, CLIENT, sysProp = 
"spark.yarn.dist.files"),
+      OptionAssigner(args.archives, YARN, CLIENT, sysProp = 
"spark.yarn.dist.archives"),
+
+      // Yarn cluster only
+      OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = 
"spark.app.name"),
+      OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = 
"--driver-memory"),
+      OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
+      OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = 
"--num-executors"),
+      OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = 
"--executor-memory"),
+      OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = 
"--executor-cores"),
+      OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
+      OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
+      OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
+
+      // Other options
+      OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
         sysProp = "spark.driver.extraClassPath"),
-      OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
+      OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
         sysProp = "spark.driver.extraJavaOptions"),
-      OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
+      OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
         sysProp = "spark.driver.extraLibraryPath"),
-      OptionAssigner(args.driverMemory, YARN, true, clOption = 
"--driver-memory"),
-      OptionAssigner(args.driverMemory, STANDALONE, true, clOption = 
"--memory"),
-      OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
-      OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
-      OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
-      OptionAssigner(args.numExecutors, YARN, true, clOption = 
"--num-executors"),
-      OptionAssigner(args.numExecutors, YARN, false, sysProp = 
"spark.executor.instances"),
-      OptionAssigner(args.executorMemory, YARN, true, clOption = 
"--executor-memory"),
-      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
+      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
         sysProp = "spark.executor.memory"),
-      OptionAssigner(args.executorCores, YARN, true, clOption = 
"--executor-cores"),
-      OptionAssigner(args.executorCores, YARN, false, sysProp = 
"spark.executor.cores"),
-      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
+      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
         sysProp = "spark.cores.max"),
-      OptionAssigner(args.files, YARN, false, sysProp = 
"spark.yarn.dist.files"),
-      OptionAssigner(args.files, YARN, true, clOption = "--files"),
-      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = 
"spark.files"),
-      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = 
"spark.files"),
-      OptionAssigner(args.archives, YARN, false, sysProp = 
"spark.yarn.dist.archives"),
-      OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
-      OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
-      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = 
"spark.jars")
+      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
+        sysProp = "spark.files")
     )
 
-    // For client mode make any added jars immediately visible on the classpath
-    if (args.jars != null && !deployOnCluster) {
-      for (jar <- args.jars.split(",")) {
-        childClasspath += jar
+    // In client mode, launch the application main class directly
+    // In addition, add the main application jar and any added jars (if any) 
to the classpath
+    if (deployMode == CLIENT) {
+      childMainClass = args.mainClass
+      if (isUserJar(args.primaryResource)) {
+        childClasspath += args.primaryResource
       }
+      if (args.jars != null) { childClasspath ++= args.jars.split(",") }
+      if (args.childArgs != null) { childArgs ++= args.childArgs }
     }
 
+
     // Map all arguments to command-line options or system properties for our 
chosen mode
     for (opt <- options) {
-      if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
+      if (opt.value != null &&
+          (deployMode & opt.deployMode) != 0 &&
           (clusterManager & opt.clusterManager) != 0) {
-        if (opt.clOption != null) {
-          childArgs += (opt.clOption, opt.value)
-        }
-        if (opt.sysProp != null) {
-          sysProps.put(opt.sysProp, opt.value)
-        }
+        if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
+        if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
       }
     }
 
     // Add the application jar automatically so the user doesn't have to call 
sc.addJar
     // For YARN cluster mode, the jar is already distributed on each node as 
"app.jar"
     // For python files, the primary resource is already distributed as a 
regular file
-    if (!isYarnCluster && !isPython) {
-      var jars = sysProps.get("spark.jars").map(x => 
x.split(",").toSeq).getOrElse(Seq())
+    val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
+    if (!isYarnCluster && !args.isPython) {
+      var jars = sysProps.get("spark.jars").map(x => 
x.split(",").toSeq).getOrElse(Seq.empty)
       if (isUserJar(args.primaryResource)) {
         jars = jars ++ Seq(args.primaryResource)
       }
       sysProps.put("spark.jars", jars.mkString(","))
     }
 
-    // Standalone cluster specific configurations
-    if (deployOnCluster && clusterManager == STANDALONE) {
+    // In standalone-cluster mode, use Client as a wrapper around the user 
class
+    if (clusterManager == STANDALONE && deployMode == CLUSTER) {
+      childMainClass = "org.apache.spark.deploy.Client"
       if (args.supervise) {
         childArgs += "--supervise"
       }
-      childMainClass = "org.apache.spark.deploy.Client"
       childArgs += "launch"
       childArgs += (args.master, args.primaryResource, args.mainClass)
+      if (args.childArgs != null) {
+        childArgs ++= args.childArgs
+      }
     }
 
-    // Arguments to be passed to user program
-    if (args.childArgs != null) {
-      if (!deployOnCluster || clusterManager == STANDALONE) {
-        childArgs ++= args.childArgs
-      } else if (clusterManager == YARN) {
-        for (arg <- args.childArgs) {
-          childArgs += ("--arg", arg)
-        }
+    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
+    if (clusterManager == YARN && deployMode == CLUSTER) {
+      childMainClass = "org.apache.spark.deploy.yarn.Client"
+      childArgs += ("--jar", args.primaryResource)
+      childArgs += ("--class", args.mainClass)
+      if (args.childArgs != null) {
+        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
       }
     }
 
     // Read from default spark properties, if any
     for ((k, v) <- args.getDefaultSparkProperties) {
-      if (!sysProps.contains(k)) sysProps(k) = v
+      sysProps.getOrElseUpdate(k, v)
     }
 
     (childArgs, childClasspath, sysProps, childMainClass)
@@ -364,6 +365,6 @@ object SparkSubmit {
 private[spark] case class OptionAssigner(
     value: String,
     clusterManager: Int,
-    deployOnCluster: Boolean,
+    deployMode: Int,
     clOption: String = null,
     sysProp: String = null)

Reply via email to