Repository: spark Updated Branches: refs/heads/master d0ea49687 -> 9c73822a0
[SPARK-2423] Clean up SparkSubmit for readability It is currently non-trivial to trace through how different combinations of cluster managers (e.g. yarn) and deploy modes (e.g. cluster) are processed in SparkSubmit. Moving forward, it will be easier to extend SparkSubmit if we first re-organize the code by grouping related logic together. This is a precursor to fixing standalone-cluster mode, which is currently broken (SPARK-2260). Author: Andrew Or <andrewo...@gmail.com> Closes #1349 from andrewor14/submit-cleanup and squashes the following commits: 8f99200 [Andrew Or] script -> program (minor) 30f2e65 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-cleanup fe484a1 [Andrew Or] Move deploy mode checks after yarn code 7167824 [Andrew Or] Re-order config options and update comments 0b01ff8 [Andrew Or] Clean up SparkSubmit for readability Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c73822a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c73822a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c73822a Branch: refs/heads/master Commit: 9c73822a08848a0cde545282d3eb1c3f1a4c2a82 Parents: d0ea496 Author: Andrew Or <andrewo...@gmail.com> Authored: Thu Jul 17 01:13:32 2014 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Thu Jul 17 01:13:32 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/SparkSubmit.scala | 289 ++++++++++--------- 1 file changed, 145 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9c73822a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b050dcc..3d8373d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -27,25 +27,39 @@ import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.util.Utils /** - * Scala code behind the spark-submit script. The script handles setting up the classpath with - * relevant Spark dependencies and provides a layer over the different cluster managers and deploy - * modes that Spark supports. + * Main gateway of launching a Spark application. + * + * This program handles setting up the classpath with relevant Spark dependencies and provides + * a layer over the different cluster managers and deploy modes that Spark supports. */ object SparkSubmit { + + // Cluster managers private val YARN = 1 private val STANDALONE = 2 private val MESOS = 4 private val LOCAL = 8 private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL - private var clusterManager: Int = LOCAL + // Deploy modes + private val CLIENT = 1 + private val CLUSTER = 2 + private val ALL_DEPLOY_MODES = CLIENT | CLUSTER - /** - * Special primary resource names that represent shells rather than application jars. - */ + // Special primary resource names that represent shells rather than application jars. private val SPARK_SHELL = "spark-shell" private val PYSPARK_SHELL = "pyspark-shell" + // Exposed for testing + private[spark] var exitFn: () => Unit = () => System.exit(-1) + private[spark] var printStream: PrintStream = System.err + private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) + private[spark] def printErrorAndExit(str: String) = { + printStream.println("Error: " + str) + printStream.println("Run with --help for usage help or --verbose for debug output") + exitFn() + } + def main(args: Array[String]) { val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { @@ -55,88 +69,80 @@ object SparkSubmit { launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose) } - // Exposed for testing - private[spark] var printStream: PrintStream = System.err - private[spark] var exitFn: () => Unit = () => System.exit(-1) - - private[spark] def printErrorAndExit(str: String) = { - printStream.println("Error: " + str) - printStream.println("Run with --help for usage help or --verbose for debug output") - exitFn() - } - private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) - /** - * @return a tuple containing the arguments for the child, a list of classpath - * entries for the child, a list of system properties, a list of env vars - * and the main class for the child + * @return a tuple containing + * (1) the arguments for the child process, + * (2) a list of classpath entries for the child, + * (3) a list of system properties and env vars, and + * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { - if (args.master.startsWith("local")) { - clusterManager = LOCAL - } else if (args.master.startsWith("yarn")) { - clusterManager = YARN - } else if (args.master.startsWith("spark")) { - clusterManager = STANDALONE - } else if (args.master.startsWith("mesos")) { - clusterManager = MESOS - } else { - printErrorAndExit("Master must start with yarn, mesos, spark, or local") - } - - // Because "yarn-cluster" and "yarn-client" encapsulate both the master - // and deploy mode, we have some logic to infer the master and deploy mode - // from each other if only one is specified, or exit early if they are at odds. - if (args.deployMode == null && - (args.master == "yarn-standalone" || args.master == "yarn-cluster")) { - args.deployMode = "cluster" - } - if (args.deployMode == "cluster" && args.master == "yarn-client") { - printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible") - } - if (args.deployMode == "client" && - (args.master == "yarn-standalone" || args.master == "yarn-cluster")) { - printErrorAndExit("Deploy mode \"client\" and master \"" + args.master - + "\" are not compatible") - } - if (args.deployMode == "cluster" && args.master.startsWith("yarn")) { - args.master = "yarn-cluster" - } - if (args.deployMode != "cluster" && args.master.startsWith("yarn")) { - args.master = "yarn-client" - } - - val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster" - val childClasspath = new ArrayBuffer[String]() + // Values to return val childArgs = new ArrayBuffer[String]() + val childClasspath = new ArrayBuffer[String]() val sysProps = new HashMap[String, String]() var childMainClass = "" - val isPython = args.isPython - val isYarnCluster = clusterManager == YARN && deployOnCluster + // Set the cluster manager + val clusterManager: Int = args.master match { + case m if m.startsWith("yarn") => YARN + case m if m.startsWith("spark") => STANDALONE + case m if m.startsWith("mesos") => MESOS + case m if m.startsWith("local") => LOCAL + case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1 + } - // For mesos, only client mode is supported - if (clusterManager == MESOS && deployOnCluster) { - printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") + // Set the deploy mode; default is client mode + var deployMode: Int = args.deployMode match { + case "client" | null => CLIENT + case "cluster" => CLUSTER + case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1 } - // For standalone, only client mode is supported - if (clusterManager == STANDALONE && deployOnCluster) { - printErrorAndExit("Cluster deploy mode is currently not supported for standalone clusters.") + // Because "yarn-cluster" and "yarn-client" encapsulate both the master + // and deploy mode, we have some logic to infer the master and deploy mode + // from each other if only one is specified, or exit early if they are at odds. + if (clusterManager == YARN) { + if (args.master == "yarn-standalone") { + printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.") + args.master = "yarn-cluster" + } + (args.master, args.deployMode) match { + case ("yarn-cluster", null) => + deployMode = CLUSTER + case ("yarn-cluster", "client") => + printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"") + case ("yarn-client", "cluster") => + printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"") + case (_, mode) => + args.master = "yarn-" + Option(mode).getOrElse("client") + } + + // Make sure YARN is included in our build if we're trying to use it + if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { + printErrorAndExit( + "Could not load YARN classes. " + + "This copy of Spark may not have been compiled with YARN support.") + } } - // For shells, only client mode is applicable - if (isShell(args.primaryResource) && deployOnCluster) { - printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") + // The following modes are not supported or applicable + (clusterManager, deployMode) match { + case (MESOS, CLUSTER) => + printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") + case (STANDALONE, CLUSTER) => + printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.") + case (_, CLUSTER) if args.isPython => + printErrorAndExit("Cluster deploy mode is currently not supported for python applications.") + case (_, CLUSTER) if isShell(args.primaryResource) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") + case _ => } // If we're running a python app, set the main class to our specific python runner - if (isPython) { - if (deployOnCluster) { - printErrorAndExit("Cluster deploy mode is currently not supported for python.") - } + if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { args.mainClass = "py4j.GatewayServer" args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") @@ -152,120 +158,115 @@ object SparkSubmit { sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",") } - // If we're deploying into YARN, use yarn.Client as a wrapper around the user class - if (!deployOnCluster) { - childMainClass = args.mainClass - if (isUserJar(args.primaryResource)) { - childClasspath += args.primaryResource - } - } else if (clusterManager == YARN) { - childMainClass = "org.apache.spark.deploy.yarn.Client" - childArgs += ("--jar", args.primaryResource) - childArgs += ("--class", args.mainClass) - } - - // Make sure YARN is included in our build if we're trying to use it - if (clusterManager == YARN) { - if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { - printErrorAndExit("Could not load YARN classes. " + - "This copy of Spark may not have been compiled with YARN support.") - } - } - // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( - OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), - OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"), - OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"), - OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true, + + // All cluster managers + OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"), + OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"), + OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), + + // Standalone cluster only + OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), + OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"), + + // Yarn client only + OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), + OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"), + OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"), + OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), + OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), + + // Yarn cluster only + OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"), + OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), + OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), + OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), + OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), + OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), + OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), + OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), + OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), + + // Other options + OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraClassPath"), - OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true, + OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraJavaOptions"), - OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true, + OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraLibraryPath"), - OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"), - OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"), - OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"), - OptionAssigner(args.queue, YARN, true, clOption = "--queue"), - OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"), - OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"), - OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"), - OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"), - OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false, + OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT, sysProp = "spark.executor.memory"), - OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"), - OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"), - OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false, + OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT, sysProp = "spark.cores.max"), - OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"), - OptionAssigner(args.files, YARN, true, clOption = "--files"), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), - OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), - OptionAssigner(args.archives, YARN, true, clOption = "--archives"), - OptionAssigner(args.jars, YARN, true, clOption = "--addJars"), - OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars") + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, + sysProp = "spark.files") ) - // For client mode make any added jars immediately visible on the classpath - if (args.jars != null && !deployOnCluster) { - for (jar <- args.jars.split(",")) { - childClasspath += jar + // In client mode, launch the application main class directly + // In addition, add the main application jar and any added jars (if any) to the classpath + if (deployMode == CLIENT) { + childMainClass = args.mainClass + if (isUserJar(args.primaryResource)) { + childClasspath += args.primaryResource } + if (args.jars != null) { childClasspath ++= args.jars.split(",") } + if (args.childArgs != null) { childArgs ++= args.childArgs } } + // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { - if (opt.value != null && deployOnCluster == opt.deployOnCluster && + if (opt.value != null && + (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) { - if (opt.clOption != null) { - childArgs += (opt.clOption, opt.value) - } - if (opt.sysProp != null) { - sysProps.put(opt.sysProp, opt.value) - } + if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } + if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) } } } // Add the application jar automatically so the user doesn't have to call sc.addJar // For YARN cluster mode, the jar is already distributed on each node as "app.jar" // For python files, the primary resource is already distributed as a regular file - if (!isYarnCluster && !isPython) { - var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) + val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER + if (!isYarnCluster && !args.isPython) { + var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty) if (isUserJar(args.primaryResource)) { jars = jars ++ Seq(args.primaryResource) } sysProps.put("spark.jars", jars.mkString(",")) } - // Standalone cluster specific configurations - if (deployOnCluster && clusterManager == STANDALONE) { + // In standalone-cluster mode, use Client as a wrapper around the user class + if (clusterManager == STANDALONE && deployMode == CLUSTER) { + childMainClass = "org.apache.spark.deploy.Client" if (args.supervise) { childArgs += "--supervise" } - childMainClass = "org.apache.spark.deploy.Client" childArgs += "launch" childArgs += (args.master, args.primaryResource, args.mainClass) + if (args.childArgs != null) { + childArgs ++= args.childArgs + } } - // Arguments to be passed to user program - if (args.childArgs != null) { - if (!deployOnCluster || clusterManager == STANDALONE) { - childArgs ++= args.childArgs - } else if (clusterManager == YARN) { - for (arg <- args.childArgs) { - childArgs += ("--arg", arg) - } + // In yarn-cluster mode, use yarn.Client as a wrapper around the user class + if (clusterManager == YARN && deployMode == CLUSTER) { + childMainClass = "org.apache.spark.deploy.yarn.Client" + childArgs += ("--jar", args.primaryResource) + childArgs += ("--class", args.mainClass) + if (args.childArgs != null) { + args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } } // Read from default spark properties, if any for ((k, v) <- args.getDefaultSparkProperties) { - if (!sysProps.contains(k)) sysProps(k) = v + sysProps.getOrElseUpdate(k, v) } (childArgs, childClasspath, sysProps, childMainClass) @@ -364,6 +365,6 @@ object SparkSubmit { private[spark] case class OptionAssigner( value: String, clusterManager: Int, - deployOnCluster: Boolean, + deployMode: Int, clOption: String = null, sysProp: String = null)