Repository: spark
Updated Branches:
  refs/heads/master 653fe0241 -> 3cb82047f


[SPARK-22941][CORE] Do not exit JVM when submit fails with in-process launcher.

The current in-process launcher implementation just calls the SparkSubmit
object, which, in case of errors, will more often than not exit the JVM.
This is not desirable since this launcher is meant to be used inside other
applications, and that would kill the application.

The change turns SparkSubmit into a class, and abstracts aways some of
the functionality used to print error messages and abort the submission
process. The default implementation uses the logging system for messages,
and throws exceptions for errors. As part of that I also moved some code
that doesn't really belong in SparkSubmit to a better location.

The command line invocation of spark-submit now uses a special implementation
of the SparkSubmit class that overrides those behaviors to do what is expected
from the command line version (print to the terminal, exit the JVM, etc).

A lot of the changes are to replace calls to methods such as "printErrorAndExit"
with the new API.

As part of adding tests for this, I had to fix some small things in the
launcher option parser so that things like "--version" can work when
used in the launcher library.

There is still code that prints directly to the terminal, like all the
Ivy-related code in SparkSubmitUtils, and other areas where some re-factoring
would help, like the CommandLineUtils class, but I chose to leave those
alone to keep this change more focused.

Aside from existing and added unit tests, I ran command line tools with
a bunch of different arguments to make sure messages and errors behave
like before.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #20925 from vanzin/SPARK-22941.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cb82047
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cb82047
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cb82047

Branch: refs/heads/master
Commit: 3cb82047f2f51af553df09b9323796af507d36f8
Parents: 653fe02
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Apr 11 10:13:44 2018 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Wed Apr 11 10:13:44 2018 -0500

----------------------------------------------------------------------
 .../apache/spark/deploy/DependencyUtils.scala   |  30 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   | 318 ++++++++++---------
 .../spark/deploy/SparkSubmitArguments.scala     |  90 +++---
 .../spark/deploy/worker/DriverWrapper.scala     |   4 +-
 .../apache/spark/util/CommandLineUtils.scala    |  18 +-
 .../spark/launcher/SparkLauncherSuite.java      |  37 ++-
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  69 ++--
 .../deploy/rest/StandaloneRestSubmitSuite.scala |   2 +-
 .../apache/spark/launcher/AbstractLauncher.java |   6 +-
 .../spark/launcher/InProcessLauncher.java       |  14 +-
 .../launcher/SparkSubmitCommandBuilder.java     |  82 +++--
 project/MimaExcludes.scala                      |   7 +-
 .../deploy/mesos/MesosClusterDispatcher.scala   |  10 +-
 .../mesos/MesosClusterDispatcherArguments.scala |   6 +-
 14 files changed, 401 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
index fac834a..178bdcf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
@@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.internal.Logging
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils {
+private[deploy] object DependencyUtils extends Logging {
 
   def resolveMavenDependencies(
       packagesExclusions: String,
@@ -75,7 +76,7 @@ private[deploy] object DependencyUtils {
   def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
     if (jars != null) {
       for (jar <- jars.split(",")) {
-        SparkSubmit.addJarToClasspath(jar, loader)
+        addJarToClasspath(jar, loader)
       }
     }
   }
@@ -151,6 +152,31 @@ private[deploy] object DependencyUtils {
     }.mkString(",")
   }
 
+  def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit 
= {
+    val uri = Utils.resolveURI(localJar)
+    uri.getScheme match {
+      case "file" | "local" =>
+        val file = new File(uri.getPath)
+        if (file.exists()) {
+          loader.addURL(file.toURI.toURL)
+        } else {
+          logWarning(s"Local jar $file does not exist, skipping.")
+        }
+      case _ =>
+        logWarning(s"Skip remote jar $uri.")
+    }
+  }
+
+  /**
+   * Merge a sequence of comma-separated file lists, some of which may be null 
to indicate
+   * no files, into a single comma-separated string.
+   */
+  def mergeFileLists(lists: String*): String = {
+    val merged = lists.filterNot(StringUtils.isBlank)
+      .flatMap(Utils.stringToSeq)
+    if (merged.nonEmpty) merged.mkString(",") else null
+  }
+
   private def splitOnFragment(path: String): (URI, Option[String]) = {
     val uri = Utils.resolveURI(path)
     val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, 
null)

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index eddbede..427c797 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -58,7 +58,7 @@ import org.apache.spark.util._
  */
 private[deploy] object SparkSubmitAction extends Enumeration {
   type SparkSubmitAction = Value
-  val SUBMIT, KILL, REQUEST_STATUS = Value
+  val SUBMIT, KILL, REQUEST_STATUS, PRINT_VERSION = Value
 }
 
 /**
@@ -67,78 +67,32 @@ private[deploy] object SparkSubmitAction extends 
Enumeration {
  * This program handles setting up the classpath with relevant Spark 
dependencies and provides
  * a layer over the different cluster managers and deploy modes that Spark 
supports.
  */
-object SparkSubmit extends CommandLineUtils with Logging {
+private[spark] class SparkSubmit extends Logging {
 
   import DependencyUtils._
+  import SparkSubmit._
 
-  // Cluster managers
-  private val YARN = 1
-  private val STANDALONE = 2
-  private val MESOS = 4
-  private val LOCAL = 8
-  private val KUBERNETES = 16
-  private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
-
-  // Deploy modes
-  private val CLIENT = 1
-  private val CLUSTER = 2
-  private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
-
-  // Special primary resource names that represent shells rather than 
application jars.
-  private val SPARK_SHELL = "spark-shell"
-  private val PYSPARK_SHELL = "pyspark-shell"
-  private val SPARKR_SHELL = "sparkr-shell"
-  private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
-  private val R_PACKAGE_ARCHIVE = "rpkg.zip"
-
-  private val CLASS_NOT_FOUND_EXIT_STATUS = 101
-
-  // Following constants are visible for testing.
-  private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
-    "org.apache.spark.deploy.yarn.YarnClusterApplication"
-  private[deploy] val REST_CLUSTER_SUBMIT_CLASS = 
classOf[RestSubmissionClientApp].getName()
-  private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = 
classOf[ClientApp].getName()
-  private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
-    "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
-
-  // scalastyle:off println
-  private[spark] def printVersionAndExit(): Unit = {
-    printStream.println("""Welcome to
-      ____              __
-     / __/__  ___ _____/ /__
-    _\ \/ _ \/ _ `/ __/  '_/
-   /___/ .__/\_,_/_/ /_/\_\   version %s
-      /_/
-                        """.format(SPARK_VERSION))
-    printStream.println("Using Scala %s, %s, %s".format(
-      Properties.versionString, Properties.javaVmName, Properties.javaVersion))
-    printStream.println("Branch %s".format(SPARK_BRANCH))
-    printStream.println("Compiled by user %s on %s".format(SPARK_BUILD_USER, 
SPARK_BUILD_DATE))
-    printStream.println("Revision %s".format(SPARK_REVISION))
-    printStream.println("Url %s".format(SPARK_REPO_URL))
-    printStream.println("Type --help for more information.")
-    exitFn(0)
-  }
-  // scalastyle:on println
-
-  override def main(args: Array[String]): Unit = {
+  def doSubmit(args: Array[String]): Unit = {
     // Initialize logging if it hasn't been done yet. Keep track of whether 
logging needs to
     // be reset before the application starts.
     val uninitLog = initializeLogIfNecessary(true, silent = true)
 
-    val appArgs = new SparkSubmitArguments(args)
+    val appArgs = parseArguments(args)
     if (appArgs.verbose) {
-      // scalastyle:off println
-      printStream.println(appArgs)
-      // scalastyle:on println
+      logInfo(appArgs.toString)
     }
     appArgs.action match {
       case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
       case SparkSubmitAction.KILL => kill(appArgs)
       case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
+      case SparkSubmitAction.PRINT_VERSION => printVersion()
     }
   }
 
+  protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
+    new SparkSubmitArguments(args)
+  }
+
   /**
    * Kill an existing submission using the REST protocol. Standalone and Mesos 
cluster mode only.
    */
@@ -156,6 +110,24 @@ object SparkSubmit extends CommandLineUtils with Logging {
       .requestSubmissionStatus(args.submissionToRequestStatusFor)
   }
 
+  /** Print version information to the log. */
+  private def printVersion(): Unit = {
+    logInfo("""Welcome to
+      ____              __
+     / __/__  ___ _____/ /__
+    _\ \/ _ \/ _ `/ __/  '_/
+   /___/ .__/\_,_/_/ /_/\_\   version %s
+      /_/
+                        """.format(SPARK_VERSION))
+    logInfo("Using Scala %s, %s, %s".format(
+      Properties.versionString, Properties.javaVmName, Properties.javaVersion))
+    logInfo(s"Branch $SPARK_BRANCH")
+    logInfo(s"Compiled by user $SPARK_BUILD_USER on $SPARK_BUILD_DATE")
+    logInfo(s"Revision $SPARK_REVISION")
+    logInfo(s"Url $SPARK_REPO_URL")
+    logInfo("Type --help for more information.")
+  }
+
   /**
    * Submit the application using the provided parameters.
    *
@@ -185,10 +157,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
             // makes the message printed to the output by the JVM not very 
helpful. Instead,
             // detect exceptions with empty stack traces here, and treat them 
differently.
             if (e.getStackTrace().length == 0) {
-              // scalastyle:off println
-              printStream.println(s"ERROR: ${e.getClass().getName()}: 
${e.getMessage()}")
-              // scalastyle:on println
-              exitFn(1)
+              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
             } else {
               throw e
             }
@@ -210,14 +179,11 @@ object SparkSubmit extends CommandLineUtils with Logging {
     // to use the legacy gateway if the master endpoint turns out to be not a 
REST server.
     if (args.isStandaloneCluster && args.useRest) {
       try {
-        // scalastyle:off println
-        printStream.println("Running Spark using the REST application 
submission protocol.")
-        // scalastyle:on println
-        doRunMain()
+        logInfo("Running Spark using the REST application submission 
protocol.")
       } catch {
         // Fail over to use the legacy submission gateway
         case e: SubmitRestConnectionException =>
-          printWarning(s"Master endpoint ${args.master} was not a REST server. 
" +
+          logWarning(s"Master endpoint ${args.master} was not a REST server. " 
+
             "Falling back to legacy submission gateway instead.")
           args.useRest = false
           submit(args, false)
@@ -245,19 +211,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
       args: SparkSubmitArguments,
       conf: Option[HadoopConfiguration] = None)
       : (Seq[String], Seq[String], SparkConf, String) = {
-    try {
-      doPrepareSubmitEnvironment(args, conf)
-    } catch {
-      case e: SparkException =>
-        printErrorAndExit(e.getMessage)
-        throw e
-    }
-  }
-
-  private def doPrepareSubmitEnvironment(
-      args: SparkSubmitArguments,
-      conf: Option[HadoopConfiguration] = None)
-      : (Seq[String], Seq[String], SparkConf, String) = {
     // Return values
     val childArgs = new ArrayBuffer[String]()
     val childClasspath = new ArrayBuffer[String]()
@@ -268,7 +221,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
     val clusterManager: Int = args.master match {
       case "yarn" => YARN
       case "yarn-client" | "yarn-cluster" =>
-        printWarning(s"Master ${args.master} is deprecated since 2.0." +
+        logWarning(s"Master ${args.master} is deprecated since 2.0." +
           " Please use master \"yarn\" with specified deploy mode instead.")
         YARN
       case m if m.startsWith("spark") => STANDALONE
@@ -276,7 +229,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
       case m if m.startsWith("k8s") => KUBERNETES
       case m if m.startsWith("local") => LOCAL
       case _ =>
-        printErrorAndExit("Master must either be yarn or start with spark, 
mesos, k8s, or local")
+        error("Master must either be yarn or start with spark, mesos, k8s, or 
local")
         -1
     }
 
@@ -284,7 +237,9 @@ object SparkSubmit extends CommandLineUtils with Logging {
     var deployMode: Int = args.deployMode match {
       case "client" | null => CLIENT
       case "cluster" => CLUSTER
-      case _ => printErrorAndExit("Deploy mode must be either client or 
cluster"); -1
+      case _ =>
+        error("Deploy mode must be either client or cluster")
+        -1
     }
 
     // Because the deprecated way of specifying "yarn-cluster" and 
"yarn-client" encapsulate both
@@ -296,16 +251,16 @@ object SparkSubmit extends CommandLineUtils with Logging {
           deployMode = CLUSTER
           args.master = "yarn"
         case ("yarn-cluster", "client") =>
-          printErrorAndExit("Client deploy mode is not compatible with master 
\"yarn-cluster\"")
+          error("Client deploy mode is not compatible with master 
\"yarn-cluster\"")
         case ("yarn-client", "cluster") =>
-          printErrorAndExit("Cluster deploy mode is not compatible with master 
\"yarn-client\"")
+          error("Cluster deploy mode is not compatible with master 
\"yarn-client\"")
         case (_, mode) =>
           args.master = "yarn"
       }
 
       // Make sure YARN is included in our build if we're trying to use it
       if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && 
!Utils.isTesting) {
-        printErrorAndExit(
+        error(
           "Could not load YARN classes. " +
           "This copy of Spark may not have been compiled with YARN support.")
       }
@@ -315,7 +270,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
       args.master = Utils.checkAndGetK8sMasterUrl(args.master)
       // Make sure KUBERNETES is included in our build if we're trying to use 
it
       if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && 
!Utils.isTesting) {
-        printErrorAndExit(
+        error(
           "Could not load KUBERNETES classes. " +
             "This copy of Spark may not have been compiled with KUBERNETES 
support.")
       }
@@ -324,23 +279,23 @@ object SparkSubmit extends CommandLineUtils with Logging {
     // Fail fast, the following modes are not supported or applicable
     (clusterManager, deployMode) match {
       case (STANDALONE, CLUSTER) if args.isPython =>
-        printErrorAndExit("Cluster deploy mode is currently not supported for 
python " +
+        error("Cluster deploy mode is currently not supported for python " +
           "applications on standalone clusters.")
       case (STANDALONE, CLUSTER) if args.isR =>
-        printErrorAndExit("Cluster deploy mode is currently not supported for 
R " +
+        error("Cluster deploy mode is currently not supported for R " +
           "applications on standalone clusters.")
       case (KUBERNETES, _) if args.isPython =>
-        printErrorAndExit("Python applications are currently not supported for 
Kubernetes.")
+        error("Python applications are currently not supported for 
Kubernetes.")
       case (KUBERNETES, _) if args.isR =>
-        printErrorAndExit("R applications are currently not supported for 
Kubernetes.")
+        error("R applications are currently not supported for Kubernetes.")
       case (LOCAL, CLUSTER) =>
-        printErrorAndExit("Cluster deploy mode is not compatible with master 
\"local\"")
+        error("Cluster deploy mode is not compatible with master \"local\"")
       case (_, CLUSTER) if isShell(args.primaryResource) =>
-        printErrorAndExit("Cluster deploy mode is not applicable to Spark 
shells.")
+        error("Cluster deploy mode is not applicable to Spark shells.")
       case (_, CLUSTER) if isSqlShell(args.mainClass) =>
-        printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL 
shell.")
+        error("Cluster deploy mode is not applicable to Spark SQL shell.")
       case (_, CLUSTER) if isThriftServer(args.mainClass) =>
-        printErrorAndExit("Cluster deploy mode is not applicable to Spark 
Thrift server.")
+        error("Cluster deploy mode is not applicable to Spark Thrift server.")
       case _ =>
     }
 
@@ -493,11 +448,11 @@ object SparkSubmit extends CommandLineUtils with Logging {
     if (args.isR && clusterManager == YARN) {
       val sparkRPackagePath = RUtils.localSparkRPackagePath
       if (sparkRPackagePath.isEmpty) {
-        printErrorAndExit("SPARK_HOME does not exist for R application in YARN 
mode.")
+        error("SPARK_HOME does not exist for R application in YARN mode.")
       }
       val sparkRPackageFile = new File(sparkRPackagePath.get, 
SPARKR_PACKAGE_ARCHIVE)
       if (!sparkRPackageFile.exists()) {
-        printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R 
application in YARN mode.")
+        error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in 
YARN mode.")
       }
       val sparkRPackageURI = 
Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
 
@@ -510,7 +465,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
         val rPackageFile =
           RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), 
R_PACKAGE_ARCHIVE)
         if (!rPackageFile.exists()) {
-          printErrorAndExit("Failed to zip all the built R packages.")
+          error("Failed to zip all the built R packages.")
         }
 
         val rPackageURI = 
Utils.resolveURI(rPackageFile.getAbsolutePath).toString
@@ -521,12 +476,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
 
     // TODO: Support distributing R packages with standalone cluster
     if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) 
{
-      printErrorAndExit("Distributing R packages with standalone cluster is 
not supported.")
+      error("Distributing R packages with standalone cluster is not 
supported.")
     }
 
     // TODO: Support distributing R packages with mesos cluster
     if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
-      printErrorAndExit("Distributing R packages with mesos cluster is not 
supported.")
+      error("Distributing R packages with mesos cluster is not supported.")
     }
 
     // If we're running an R app, set the main class to our specific R runner
@@ -799,9 +754,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
   private def setRMPrincipal(sparkConf: SparkConf): Unit = {
     val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
     val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
-    // scalastyle:off println
-    printStream.println(s"Setting ${key} to ${shortUserName}")
-    // scalastyle:off println
+    logInfo(s"Setting ${key} to ${shortUserName}")
     sparkConf.set(key, shortUserName)
   }
 
@@ -817,16 +770,14 @@ object SparkSubmit extends CommandLineUtils with Logging {
       sparkConf: SparkConf,
       childMainClass: String,
       verbose: Boolean): Unit = {
-    // scalastyle:off println
     if (verbose) {
-      printStream.println(s"Main class:\n$childMainClass")
-      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
+      logInfo(s"Main class:\n$childMainClass")
+      logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
       // sysProps may contain sensitive information, so redact before printing
-      printStream.println(s"Spark 
config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
-      printStream.println(s"Classpath 
elements:\n${childClasspath.mkString("\n")}")
-      printStream.println("\n")
+      logInfo(s"Spark 
config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
+      logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
+      logInfo("\n")
     }
-    // scalastyle:on println
 
     val loader =
       if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
@@ -848,23 +799,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
       mainClass = Utils.classForName(childMainClass)
     } catch {
       case e: ClassNotFoundException =>
-        e.printStackTrace(printStream)
+        logWarning(s"Failed to load $childMainClass.", e)
         if (childMainClass.contains("thriftserver")) {
-          // scalastyle:off println
-          printStream.println(s"Failed to load main class $childMainClass.")
-          printStream.println("You need to build Spark with -Phive and 
-Phive-thriftserver.")
-          // scalastyle:on println
+          logInfo(s"Failed to load main class $childMainClass.")
+          logInfo("You need to build Spark with -Phive and 
-Phive-thriftserver.")
         }
-        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
+        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
       case e: NoClassDefFoundError =>
-        e.printStackTrace(printStream)
+        logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
         if (e.getMessage.contains("org/apache/hadoop/hive")) {
-          // scalastyle:off println
-          printStream.println(s"Failed to load hive class.")
-          printStream.println("You need to build Spark with -Phive and 
-Phive-thriftserver.")
-          // scalastyle:on println
+          logInfo(s"Failed to load hive class.")
+          logInfo("You need to build Spark with -Phive and 
-Phive-thriftserver.")
         }
-        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
+        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
     }
 
     val app: SparkApplication = if 
(classOf[SparkApplication].isAssignableFrom(mainClass)) {
@@ -872,7 +819,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
     } else {
       // SPARK-4170
       if (classOf[scala.App].isAssignableFrom(mainClass)) {
-        printWarning("Subclasses of scala.App may not work correctly. Use a 
main() method instead.")
+        logWarning("Subclasses of scala.App may not work correctly. Use a 
main() method instead.")
       }
       new JavaMainApplication(mainClass)
     }
@@ -891,29 +838,90 @@ object SparkSubmit extends CommandLineUtils with Logging {
       app.start(childArgs.toArray, sparkConf)
     } catch {
       case t: Throwable =>
-        findCause(t) match {
-          case SparkUserAppException(exitCode) =>
-            System.exit(exitCode)
-
-          case t: Throwable =>
-            throw t
-        }
+        throw findCause(t)
     }
   }
 
-  private[deploy] def addJarToClasspath(localJar: String, loader: 
MutableURLClassLoader) {
-    val uri = Utils.resolveURI(localJar)
-    uri.getScheme match {
-      case "file" | "local" =>
-        val file = new File(uri.getPath)
-        if (file.exists()) {
-          loader.addURL(file.toURI.toURL)
-        } else {
-          printWarning(s"Local jar $file does not exist, skipping.")
+  /** Throw a SparkException with the given error message. */
+  private def error(msg: String): Unit = throw new SparkException(msg)
+
+}
+
+
+/**
+ * This entry point is used by the launcher library to start in-process Spark 
applications.
+ */
+private[spark] object InProcessSparkSubmit {
+
+  def main(args: Array[String]): Unit = {
+    val submit = new SparkSubmit()
+    submit.doSubmit(args)
+  }
+
+}
+
+object SparkSubmit extends CommandLineUtils with Logging {
+
+  // Cluster managers
+  private val YARN = 1
+  private val STANDALONE = 2
+  private val MESOS = 4
+  private val LOCAL = 8
+  private val KUBERNETES = 16
+  private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
+
+  // Deploy modes
+  private val CLIENT = 1
+  private val CLUSTER = 2
+  private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
+
+  // Special primary resource names that represent shells rather than 
application jars.
+  private val SPARK_SHELL = "spark-shell"
+  private val PYSPARK_SHELL = "pyspark-shell"
+  private val SPARKR_SHELL = "sparkr-shell"
+  private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
+  private val R_PACKAGE_ARCHIVE = "rpkg.zip"
+
+  private val CLASS_NOT_FOUND_EXIT_STATUS = 101
+
+  // Following constants are visible for testing.
+  private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
+    "org.apache.spark.deploy.yarn.YarnClusterApplication"
+  private[deploy] val REST_CLUSTER_SUBMIT_CLASS = 
classOf[RestSubmissionClientApp].getName()
+  private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = 
classOf[ClientApp].getName()
+  private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
+    "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
+
+  override def main(args: Array[String]): Unit = {
+    val submit = new SparkSubmit() {
+      self =>
+
+      override protected def parseArguments(args: Array[String]): 
SparkSubmitArguments = {
+        new SparkSubmitArguments(args) {
+          override protected def logInfo(msg: => String): Unit = 
self.logInfo(msg)
+
+          override protected def logWarning(msg: => String): Unit = 
self.logWarning(msg)
         }
-      case _ =>
-        printWarning(s"Skip remote jar $uri.")
+      }
+
+      override protected def logInfo(msg: => String): Unit = printMessage(msg)
+
+      override protected def logWarning(msg: => String): Unit = 
printMessage(s"Warning: $msg")
+
+      override def doSubmit(args: Array[String]): Unit = {
+        try {
+          super.doSubmit(args)
+        } catch {
+          case e: SparkUserAppException =>
+            exitFn(e.exitCode)
+          case e: SparkException =>
+            printErrorAndExit(e.getMessage())
+        }
+      }
+
     }
+
+    submit.doSubmit(args)
   }
 
   /**
@@ -962,17 +970,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
     res == SparkLauncher.NO_RESOURCE
   }
 
-  /**
-   * Merge a sequence of comma-separated file lists, some of which may be null 
to indicate
-   * no files, into a single comma-separated string.
-   */
-  private[deploy] def mergeFileLists(lists: String*): String = {
-    val merged = lists.filterNot(StringUtils.isBlank)
-                      .flatMap(_.split(","))
-                      .mkString(",")
-    if (merged == "") null else merged
-  }
-
 }
 
 /** Provides utility functions to be used inside SparkSubmit. */
@@ -1000,12 +997,12 @@ private[spark] object SparkSubmitUtils {
     override def toString: String = s"$groupId:$artifactId:$version"
   }
 
-/**
- * Extracts maven coordinates from a comma-delimited string. Coordinates 
should be provided
- * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
- * @param coordinates Comma-delimited string of maven coordinates
- * @return Sequence of Maven coordinates
- */
+  /**
+   * Extracts maven coordinates from a comma-delimited string. Coordinates 
should be provided
+   * in the format `groupId:artifactId:version` or 
`groupId/artifactId:version`.
+   * @param coordinates Comma-delimited string of maven coordinates
+   * @return Sequence of Maven coordinates
+   */
   def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
     coordinates.split(",").map { p =>
       val splits = p.replace("/", ":").split(":")
@@ -1304,6 +1301,13 @@ private[spark] object SparkSubmitUtils {
     rule
   }
 
+  def parseSparkConfProperty(pair: String): (String, String) = {
+    pair.split("=", 2).toSeq match {
+      case Seq(k, v) => (k, v)
+      case _ => throw new SparkException(s"Spark config without '=': $pair")
+    }
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 8e70705..0733fdb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -29,7 +29,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.io.Source
 import scala.util.Try
 
+import org.apache.spark.{SparkException, SparkUserAppException}
 import org.apache.spark.deploy.SparkSubmitAction._
+import org.apache.spark.internal.Logging
 import org.apache.spark.launcher.SparkSubmitArgumentsParser
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.Utils
@@ -40,7 +42,7 @@ import org.apache.spark.util.Utils
  * The env argument is used for testing.
  */
 private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, 
String] = sys.env)
-  extends SparkSubmitArgumentsParser {
+  extends SparkSubmitArgumentsParser with Logging {
   var master: String = null
   var deployMode: String = null
   var executorMemory: String = null
@@ -85,8 +87,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], 
env: Map[String, S
   /** Default properties present in the currently defined defaults file. */
   lazy val defaultSparkProperties: HashMap[String, String] = {
     val defaultProperties = new HashMap[String, String]()
-    // scalastyle:off println
-    if (verbose) SparkSubmit.printStream.println(s"Using properties file: 
$propertiesFile")
+    if (verbose) {
+      logInfo(s"Using properties file: $propertiesFile")
+    }
     Option(propertiesFile).foreach { filename =>
       val properties = Utils.getPropertiesFromFile(filename)
       properties.foreach { case (k, v) =>
@@ -95,21 +98,16 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       // Property files may contain sensitive information, so redact before 
printing
       if (verbose) {
         Utils.redact(properties).foreach { case (k, v) =>
-          SparkSubmit.printStream.println(s"Adding default property: $k=$v")
+          logInfo(s"Adding default property: $k=$v")
         }
       }
     }
-    // scalastyle:on println
     defaultProperties
   }
 
   // Set parameters from command line arguments
-  try {
-    parse(args.asJava)
-  } catch {
-    case e: IllegalArgumentException =>
-      SparkSubmit.printErrorAndExit(e.getMessage())
-  }
+  parse(args.asJava)
+
   // Populate `sparkProperties` map from properties file
   mergeDefaultSparkProperties()
   // Remove keys that don't start with "spark." from `sparkProperties`.
@@ -141,7 +139,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
     sparkProperties.foreach { case (k, v) =>
       if (!k.startsWith("spark.")) {
         sparkProperties -= k
-        SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
+        logWarning(s"Ignoring non-spark config property: $k=$v")
       }
     }
   }
@@ -215,10 +213,10 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
             }
           } catch {
             case _: Exception =>
-              SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR 
$primaryResource")
+              error(s"Cannot load main class from JAR $primaryResource")
           }
         case _ =>
-          SparkSubmit.printErrorAndExit(
+          error(
             s"Cannot load main class from JAR $primaryResource with URI 
$uriScheme. " +
             "Please specify a class through --class.")
       }
@@ -248,6 +246,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       case SUBMIT => validateSubmitArguments()
       case KILL => validateKillArguments()
       case REQUEST_STATUS => validateStatusRequestArguments()
+      case PRINT_VERSION =>
     }
   }
 
@@ -256,62 +255,61 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       printUsageAndExit(-1)
     }
     if (primaryResource == null) {
-      SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or 
Python or R file)")
+      error("Must specify a primary resource (JAR or Python or R file)")
     }
     if (mainClass == null && SparkSubmit.isUserJar(primaryResource)) {
-      SparkSubmit.printErrorAndExit("No main class set in JAR; please specify 
one with --class")
+      error("No main class set in JAR; please specify one with --class")
     }
     if (driverMemory != null
         && Try(JavaUtils.byteStringAsBytes(driverMemory)).getOrElse(-1L) <= 0) 
{
-      SparkSubmit.printErrorAndExit("Driver Memory must be a positive number")
+      error("Driver memory must be a positive number")
     }
     if (executorMemory != null
         && Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 
0) {
-      SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive 
number")
+      error("Executor memory must be a positive number")
     }
     if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
-      SparkSubmit.printErrorAndExit("Executor cores must be a positive number")
+      error("Executor cores must be a positive number")
     }
     if (totalExecutorCores != null && 
Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
-      SparkSubmit.printErrorAndExit("Total executor cores must be a positive 
number")
+      error("Total executor cores must be a positive number")
     }
     if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
-      SparkSubmit.printErrorAndExit("Number of executors must be a positive 
number")
+      error("Number of executors must be a positive number")
     }
     if (pyFiles != null && !isPython) {
-      SparkSubmit.printErrorAndExit("--py-files given but primary resource is 
not a Python script")
+      error("--py-files given but primary resource is not a Python script")
     }
 
     if (master.startsWith("yarn")) {
       val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || 
env.contains("YARN_CONF_DIR")
       if (!hasHadoopEnv && !Utils.isTesting) {
-        throw new Exception(s"When running with master '$master' " +
+        error(s"When running with master '$master' " +
           "either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the 
environment.")
       }
     }
 
     if (proxyUser != null && principal != null) {
-      SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal 
can be provided.")
+      error("Only one of --proxy-user or --principal can be provided.")
     }
   }
 
   private def validateKillArguments(): Unit = {
     if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
-      SparkSubmit.printErrorAndExit(
-        "Killing submissions is only supported in standalone or Mesos mode!")
+      error("Killing submissions is only supported in standalone or Mesos 
mode!")
     }
     if (submissionToKill == null) {
-      SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
+      error("Please specify a submission to kill.")
     }
   }
 
   private def validateStatusRequestArguments(): Unit = {
     if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
-      SparkSubmit.printErrorAndExit(
+      error(
         "Requesting submission statuses is only supported in standalone or 
Mesos mode!")
     }
     if (submissionToRequestStatusFor == null) {
-      SparkSubmit.printErrorAndExit("Please specify a submission to request 
status for.")
+      error("Please specify a submission to request status for.")
     }
   }
 
@@ -368,7 +366,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 
       case DEPLOY_MODE =>
         if (value != "client" && value != "cluster") {
-          SparkSubmit.printErrorAndExit("--deploy-mode must be either 
\"client\" or \"cluster\"")
+          error("--deploy-mode must be either \"client\" or \"cluster\"")
         }
         deployMode = value
 
@@ -405,14 +403,14 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       case KILL_SUBMISSION =>
         submissionToKill = value
         if (action != null) {
-          SparkSubmit.printErrorAndExit(s"Action cannot be both $action and 
$KILL.")
+          error(s"Action cannot be both $action and $KILL.")
         }
         action = KILL
 
       case STATUS =>
         submissionToRequestStatusFor = value
         if (action != null) {
-          SparkSubmit.printErrorAndExit(s"Action cannot be both $action and 
$REQUEST_STATUS.")
+          error(s"Action cannot be both $action and $REQUEST_STATUS.")
         }
         action = REQUEST_STATUS
 
@@ -444,7 +442,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
         repositories = value
 
       case CONF =>
-        val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
+        val (confName, confValue) = 
SparkSubmitUtils.parseSparkConfProperty(value)
         sparkProperties(confName) = confValue
 
       case PROXY_USER =>
@@ -463,15 +461,15 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
         verbose = true
 
       case VERSION =>
-        SparkSubmit.printVersionAndExit()
+        action = SparkSubmitAction.PRINT_VERSION
 
       case USAGE_ERROR =>
         printUsageAndExit(1)
 
       case _ =>
-        throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
+        error(s"Unexpected argument '$opt'.")
     }
-    true
+    action != SparkSubmitAction.PRINT_VERSION
   }
 
   /**
@@ -482,7 +480,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
    */
   override protected def handleUnknown(opt: String): Boolean = {
     if (opt.startsWith("-")) {
-      SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
+      error(s"Unrecognized option '$opt'.")
     }
 
     primaryResource =
@@ -501,20 +499,18 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
   }
 
   private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit 
= {
-    // scalastyle:off println
-    val outStream = SparkSubmit.printStream
     if (unknownParam != null) {
-      outStream.println("Unknown/unsupported param " + unknownParam)
+      logInfo("Unknown/unsupported param " + unknownParam)
     }
     val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
       """Usage: spark-submit [options] <app jar | python file | R file> [app 
arguments]
         |Usage: spark-submit --kill [submission ID] --master [spark://...]
         |Usage: spark-submit --status [submission ID] --master [spark://...]
         |Usage: spark-submit run-example [options] example-class [example 
args]""".stripMargin)
-    outStream.println(command)
+    logInfo(command)
 
     val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
-    outStream.println(
+    logInfo(
       s"""
         |Options:
         |  --master MASTER_URL         spark://host:port, mesos://host:port, 
yarn,
@@ -596,12 +592,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
     )
 
     if (SparkSubmit.isSqlShell(mainClass)) {
-      outStream.println("CLI options:")
-      outStream.println(getSqlShellOptions())
+      logInfo("CLI options:")
+      logInfo(getSqlShellOptions())
     }
-    // scalastyle:on println
 
-    SparkSubmit.exitFn(exitCode)
+    throw new SparkUserAppException(exitCode)
   }
 
   /**
@@ -655,4 +650,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       System.setErr(currentErr)
     }
   }
+
+  private def error(msg: String): Unit = throw new SparkException(msg)
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 3f71237..8d6a2b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -25,7 +25,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, 
Utils}
+import org.apache.spark.util._
 
 /**
  * Utility object for launching driver programs such that they share fate with 
the Worker process.
@@ -93,7 +93,7 @@ object DriverWrapper extends Logging {
     val jars = {
       val jarsProp = sys.props.get("spark.jars").orNull
       if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
-        SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates)
+        DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
       } else {
         jarsProp
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala 
b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
index d739016..4b6602b 100644
--- a/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
@@ -33,24 +33,14 @@ private[spark] trait CommandLineUtils {
   private[spark] var printStream: PrintStream = System.err
 
   // scalastyle:off println
-
-  private[spark] def printWarning(str: String): Unit = 
printStream.println("Warning: " + str)
+  private[spark] def printMessage(str: String): Unit = printStream.println(str)
+  // scalastyle:on println
 
   private[spark] def printErrorAndExit(str: String): Unit = {
-    printStream.println("Error: " + str)
-    printStream.println("Run with --help for usage help or --verbose for debug 
output")
+    printMessage("Error: " + str)
+    printMessage("Run with --help for usage help or --verbose for debug 
output")
     exitFn(1)
   }
 
-  // scalastyle:on println
-
-  private[spark] def parseSparkConfProperty(pair: String): (String, String) = {
-    pair.split("=", 2).toSeq match {
-      case Seq(k, v) => (k, v)
-      case _ => printErrorAndExit(s"Spark config without '=': $pair")
-        throw new SparkException(s"Spark config without '=': $pair")
-    }
-  }
-
   def main(args: Array[String]): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java 
b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 2225591..6a1a38c 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -109,7 +109,7 @@ public class SparkLauncherSuite extends BaseSuite {
       .addSparkArg(opts.CONF,
         String.format("%s=-Dfoo=ShouldBeOverriddenBelow", 
SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
       .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
-        "-Dfoo=bar -Dtest.appender=childproc")
+        "-Dfoo=bar -Dtest.appender=console")
       .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, 
System.getProperty("java.class.path"))
       .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
       .setMainClass(SparkLauncherTestApp.class.getName())
@@ -192,6 +192,41 @@ public class SparkLauncherSuite extends BaseSuite {
     }
   }
 
+  @Test
+  public void testInProcessLauncherDoesNotKillJvm() throws Exception {
+    SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
+    List<String[]> wrongArgs = Arrays.asList(
+      new String[] { "--unknown" },
+      new String[] { opts.DEPLOY_MODE, "invalid" });
+
+    for (String[] args : wrongArgs) {
+      InProcessLauncher launcher = new InProcessLauncher()
+        .setAppResource(SparkLauncher.NO_RESOURCE);
+      switch (args.length) {
+        case 2:
+          launcher.addSparkArg(args[0], args[1]);
+          break;
+
+        case 1:
+          launcher.addSparkArg(args[0]);
+          break;
+
+        default:
+          fail("FIXME: invalid test.");
+      }
+
+      SparkAppHandle handle = launcher.startApplication();
+      waitFor(handle);
+      assertEquals(SparkAppHandle.State.FAILED, handle.getState());
+    }
+
+    // Run --version, which is useless as a use case, but should succeed and 
not exit the JVM.
+    // The expected state is "LOST" since "--version" doesn't report state 
back to the handle.
+    SparkAppHandle handle = new 
InProcessLauncher().addSparkArg(opts.VERSION).startApplication();
+    waitFor(handle);
+    assertEquals(SparkAppHandle.State.LOST, handle.getState());
+  }
+
   public static class SparkLauncherTestApp {
 
     public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 0d7c342..7451e07 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -42,6 +42,7 @@ import org.apache.spark.deploy.SparkSubmit._
 import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.scheduler.EventLoggingListener
 import org.apache.spark.util.{CommandLineUtils, ResetSystemProperties, Utils}
 
@@ -109,6 +110,8 @@ class SparkSubmitSuite
   private val emptyIvySettings = File.createTempFile("ivy", ".xml")
   FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)
 
+  private val submit = new SparkSubmit()
+
   override def beforeEach() {
     super.beforeEach()
   }
@@ -128,13 +131,16 @@ class SparkSubmitSuite
   }
 
   test("handle binary specified but not class") {
-    testPrematureExit(Array("foo.jar"), "No main class")
+    val jar = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+    testPrematureExit(Array(jar.toString()), "No main class")
   }
 
   test("handles arguments with --key=val") {
     val clArgs = Seq(
       "--jars=one.jar,two.jar,three.jar",
-      "--name=myApp")
+      "--name=myApp",
+      "--class=org.FooBar",
+      SparkLauncher.NO_RESOURCE)
     val appArgs = new SparkSubmitArguments(clArgs)
     appArgs.jars should include regex (".*one.jar,.*two.jar,.*three.jar")
     appArgs.name should be ("myApp")
@@ -182,7 +188,7 @@ class SparkSubmitSuite
       "thejar.jar"
     )
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (_, _, conf, _) = prepareSubmitEnvironment(appArgs)
+    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
 
     appArgs.deployMode should be ("client")
     conf.get("spark.submit.deployMode") should be ("client")
@@ -192,11 +198,11 @@ class SparkSubmitSuite
       "--master", "yarn",
       "--deploy-mode", "cluster",
       "--conf", "spark.submit.deployMode=client",
-      "-class", "org.SomeClass",
+      "--class", "org.SomeClass",
       "thejar.jar"
     )
     val appArgs1 = new SparkSubmitArguments(clArgs1)
-    val (_, _, conf1, _) = prepareSubmitEnvironment(appArgs1)
+    val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1)
 
     appArgs1.deployMode should be ("cluster")
     conf1.get("spark.submit.deployMode") should be ("cluster")
@@ -210,7 +216,7 @@ class SparkSubmitSuite
     val appArgs2 = new SparkSubmitArguments(clArgs2)
     appArgs2.deployMode should be (null)
 
-    val (_, _, conf2, _) = prepareSubmitEnvironment(appArgs2)
+    val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
     appArgs2.deployMode should be ("client")
     conf2.get("spark.submit.deployMode") should be ("client")
   }
@@ -233,7 +239,7 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
submit.prepareSubmitEnvironment(appArgs)
     val childArgsStr = childArgs.mkString(" ")
     childArgsStr should include ("--class org.SomeClass")
     childArgsStr should include ("--arg arg1 --arg arg2")
@@ -276,7 +282,7 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
submit.prepareSubmitEnvironment(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
     classpath should have length (4)
@@ -322,7 +328,7 @@ class SparkSubmitSuite
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     appArgs.useRest = useRest
-    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
submit.prepareSubmitEnvironment(appArgs)
     val childArgsStr = childArgs.mkString(" ")
     if (useRest) {
       childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
@@ -359,7 +365,7 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
submit.prepareSubmitEnvironment(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
     classpath should have length (1)
@@ -381,7 +387,7 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
submit.prepareSubmitEnvironment(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
     classpath should have length (1)
@@ -403,7 +409,7 @@ class SparkSubmitSuite
       "/home/thejar.jar",
       "arg1")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
submit.prepareSubmitEnvironment(appArgs)
 
     val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
     childArgsMap.get("--primary-java-resource") should be 
(Some("file:/home/thejar.jar"))
@@ -428,7 +434,7 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (_, _, conf, mainClass) = prepareSubmitEnvironment(appArgs)
+    val (_, _, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
     conf.get("spark.executor.memory") should be ("5g")
     conf.get("spark.master") should be ("yarn")
     conf.get("spark.submit.deployMode") should be ("cluster")
@@ -441,12 +447,12 @@ class SparkSubmitSuite
 
     val clArgs1 = Seq("--class", "org.apache.spark.repl.Main", "spark-shell")
     val appArgs1 = new SparkSubmitArguments(clArgs1)
-    val (_, _, conf1, _) = prepareSubmitEnvironment(appArgs1)
+    val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1)
     conf1.get(UI_SHOW_CONSOLE_PROGRESS) should be (true)
 
     val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
     val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val (_, _, conf2, _) = prepareSubmitEnvironment(appArgs2)
+    val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
     assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS))
   }
 
@@ -625,7 +631,7 @@ class SparkSubmitSuite
       "--files", files,
       "thejar.jar")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs)
+    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
     appArgs.jars should be (Utils.resolveURIs(jars))
     appArgs.files should be (Utils.resolveURIs(files))
     conf.get("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
@@ -640,7 +646,7 @@ class SparkSubmitSuite
       "thejar.jar"
     )
     val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2)
+    val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
     appArgs2.files should be (Utils.resolveURIs(files))
     appArgs2.archives should fullyMatch regex 
("file:/archive1,file:.*#archive3")
     conf2.get("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
@@ -656,7 +662,7 @@ class SparkSubmitSuite
       "mister.py"
     )
     val appArgs3 = new SparkSubmitArguments(clArgs3)
-    val (_, _, conf3, _) = SparkSubmit.prepareSubmitEnvironment(appArgs3)
+    val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
     appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
     conf3.get("spark.submit.pyFiles") should be (
       PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
@@ -708,7 +714,7 @@ class SparkSubmitSuite
       "thejar.jar"
     )
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs)
+    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
     conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
     conf.get("spark.files") should be(Utils.resolveURIs(files))
 
@@ -725,7 +731,7 @@ class SparkSubmitSuite
       "thejar.jar"
     )
     val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2)
+    val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
     conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
     conf2.get("spark.yarn.dist.archives") should 
be(Utils.resolveURIs(archives))
 
@@ -740,7 +746,7 @@ class SparkSubmitSuite
       "mister.py"
     )
     val appArgs3 = new SparkSubmitArguments(clArgs3)
-    val (_, _, conf3, _) = SparkSubmit.prepareSubmitEnvironment(appArgs3)
+    val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
     conf3.get("spark.submit.pyFiles") should be(
       PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
 
@@ -757,7 +763,7 @@ class SparkSubmitSuite
       "hdfs:///tmp/mister.py"
     )
     val appArgs4 = new SparkSubmitArguments(clArgs4)
-    val (_, _, conf4, _) = SparkSubmit.prepareSubmitEnvironment(appArgs4)
+    val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4)
     // Should not format python path for yarn cluster mode
     conf4.get("spark.submit.pyFiles") should 
be(Utils.resolveURIs(remotePyFiles))
   }
@@ -778,17 +784,17 @@ class SparkSubmitSuite
   }
 
   test("SPARK_CONF_DIR overrides spark-defaults.conf") {
-    forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
+    forConfDir(Map("spark.executor.memory" -> "3g")) { path =>
       val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
       val args = Seq(
         "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
         "--name", "testApp",
         "--master", "local",
         unusedJar.toString)
-      val appArgs = new SparkSubmitArguments(args, Map("SPARK_CONF_DIR" -> 
path))
+      val appArgs = new SparkSubmitArguments(args, env = Map("SPARK_CONF_DIR" 
-> path))
       assert(appArgs.propertiesFile != null)
       assert(appArgs.propertiesFile.startsWith(path))
-      appArgs.executorMemory should be ("2.3g")
+      appArgs.executorMemory should be ("3g")
     }
   }
 
@@ -809,6 +815,9 @@ class SparkSubmitSuite
     val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
     val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)
 
+    val tempPyFile = File.createTempFile("tmpApp", ".py")
+    tempPyFile.deleteOnExit()
+
     val args = Seq(
       "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
       "--name", "testApp",
@@ -818,10 +827,10 @@ class SparkSubmitSuite
       "--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
       "--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
       "--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
-      jar2.toString)
+      tempPyFile.toURI().toString())
 
     val appArgs = new SparkSubmitArguments(args)
-    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs)
+    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
     conf.get("spark.yarn.dist.jars").split(",").toSet should be
       (Set(jar1.toURI.toString, jar2.toURI.toString))
     conf.get("spark.yarn.dist.files").split(",").toSet should be
@@ -947,7 +956,7 @@ class SparkSubmitSuite
       )
 
     val appArgs = new SparkSubmitArguments(args)
-    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs, 
Some(hadoopConf))
+    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = 
Some(hadoopConf))
 
     // All the resources should still be remote paths, so that YARN client 
will not upload again.
     conf.get("spark.yarn.dist.jars") should be (tmpJarPath)
@@ -1007,7 +1016,7 @@ class SparkSubmitSuite
     ) ++ forceDownloadArgs ++ Seq(s"s3a://$mainResource")
 
     val appArgs = new SparkSubmitArguments(args)
-    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs, 
Some(hadoopConf))
+    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = 
Some(hadoopConf))
 
     val jars = conf.get("spark.yarn.dist.jars").split(",").toSet
 
@@ -1058,7 +1067,7 @@ class SparkSubmitSuite
       "hello")
 
     val exception = intercept[SparkException] {
-      SparkSubmit.main(args)
+      submit.doSubmit(args)
     }
 
     assert(exception.getMessage() === "hello")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index e505bc0..54c168a 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -445,7 +445,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with 
BeforeAndAfterEach {
       "--class", mainClass,
       mainJar) ++ appArgs
     val args = new SparkSubmitArguments(commandLineArgs)
-    val (_, _, sparkConf, _) = SparkSubmit.prepareSubmitEnvironment(args)
+    val (_, _, sparkConf, _) = new SparkSubmit().prepareSubmitEnvironment(args)
     new RestSubmissionClient("spark://host:port").constructSubmitRequest(
       mainJar, mainClass, appArgs, sparkConf.getAll.toMap, Map.empty)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java
index 44e69fc..4e02843 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java
@@ -139,7 +139,7 @@ public abstract class AbstractLauncher<T extends 
AbstractLauncher> {
   public T addSparkArg(String arg) {
     SparkSubmitOptionParser validator = new ArgumentValidator(false);
     validator.parse(Arrays.asList(arg));
-    builder.sparkArgs.add(arg);
+    builder.userArgs.add(arg);
     return self();
   }
 
@@ -187,8 +187,8 @@ public abstract class AbstractLauncher<T extends 
AbstractLauncher> {
       }
     } else {
       validator.parse(Arrays.asList(name, value));
-      builder.sparkArgs.add(name);
-      builder.sparkArgs.add(value);
+      builder.userArgs.add(name);
+      builder.userArgs.add(value);
     }
     return self();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java 
b/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java
index 6d726b4..688e1f7 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java
@@ -89,10 +89,18 @@ public class InProcessLauncher extends 
AbstractLauncher<InProcessLauncher> {
     }
 
     Class<?> sparkSubmit;
+    // SPARK-22941: first try the new SparkSubmit interface that has better 
error handling,
+    // but fall back to the old interface in case someone is mixing & matching 
launcher and
+    // Spark versions.
     try {
-      sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
-    } catch (Exception e) {
-      throw new IOException("Cannot find SparkSubmit; make sure necessary jars 
are available.", e);
+      sparkSubmit = 
cl.loadClass("org.apache.spark.deploy.InProcessSparkSubmit");
+    } catch (Exception e1) {
+      try {
+        sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
+      } catch (Exception e2) {
+        throw new IOException("Cannot find SparkSubmit; make sure necessary 
jars are available.",
+          e2);
+      }
     }
 
     Method main;

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index e0ef22d..5cb6457 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -88,8 +88,9 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
       SparkLauncher.NO_RESOURCE);
   }
 
-  final List<String> sparkArgs;
-  private final boolean isAppResourceReq;
+  final List<String> userArgs;
+  private final List<String> parsedArgs;
+  private final boolean requiresAppResource;
   private final boolean isExample;
 
   /**
@@ -99,17 +100,27 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
    */
   private boolean allowsMixedArguments;
 
+  /**
+   * This constructor is used when creating a user-configurable launcher. It 
allows the
+   * spark-submit argument list to be modified after creation.
+   */
   SparkSubmitCommandBuilder() {
-    this.sparkArgs = new ArrayList<>();
-    this.isAppResourceReq = true;
+    this.requiresAppResource = true;
     this.isExample = false;
+    this.parsedArgs = new ArrayList<>();
+    this.userArgs = new ArrayList<>();
   }
 
+  /**
+   * This constructor is used when invoking spark-submit; it parses and 
validates arguments
+   * provided by the user on the command line.
+   */
   SparkSubmitCommandBuilder(List<String> args) {
     this.allowsMixedArguments = false;
-    this.sparkArgs = new ArrayList<>();
+    this.parsedArgs = new ArrayList<>();
     boolean isExample = false;
     List<String> submitArgs = args;
+    this.userArgs = Collections.emptyList();
 
     if (args.size() > 0) {
       switch (args.get(0)) {
@@ -131,21 +142,21 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
       }
 
       this.isExample = isExample;
-      OptionParser parser = new OptionParser();
+      OptionParser parser = new OptionParser(true);
       parser.parse(submitArgs);
-      this.isAppResourceReq = parser.isAppResourceReq;
-    }  else {
+      this.requiresAppResource = parser.requiresAppResource;
+    } else {
       this.isExample = isExample;
-      this.isAppResourceReq = false;
+      this.requiresAppResource = false;
     }
   }
 
   @Override
   public List<String> buildCommand(Map<String, String> env)
       throws IOException, IllegalArgumentException {
-    if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) {
+    if (PYSPARK_SHELL.equals(appResource) && requiresAppResource) {
       return buildPySparkShellCommand(env);
-    } else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) {
+    } else if (SPARKR_SHELL.equals(appResource) && requiresAppResource) {
       return buildSparkRCommand(env);
     } else {
       return buildSparkSubmitCommand(env);
@@ -154,9 +165,19 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
 
   List<String> buildSparkSubmitArgs() {
     List<String> args = new ArrayList<>();
-    SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
+    OptionParser parser = new OptionParser(false);
+    final boolean requiresAppResource;
+
+    // If the user args array is not empty, we need to parse it to detect 
exactly what
+    // the user is trying to run, so that checks below are correct.
+    if (!userArgs.isEmpty()) {
+      parser.parse(userArgs);
+      requiresAppResource = parser.requiresAppResource;
+    } else {
+      requiresAppResource = this.requiresAppResource;
+    }
 
-    if (!allowsMixedArguments && isAppResourceReq) {
+    if (!allowsMixedArguments && requiresAppResource) {
       checkArgument(appResource != null, "Missing application resource.");
     }
 
@@ -208,15 +229,16 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
       args.add(join(",", pyFiles));
     }
 
-    if (isAppResourceReq) {
-      checkArgument(!isExample || mainClass != null, "Missing example class 
name.");
+    if (isExample) {
+      checkArgument(mainClass != null, "Missing example class name.");
     }
+
     if (mainClass != null) {
       args.add(parser.CLASS);
       args.add(mainClass);
     }
 
-    args.addAll(sparkArgs);
+    args.addAll(parsedArgs);
     if (appResource != null) {
       args.add(appResource);
     }
@@ -399,7 +421,12 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
 
   private class OptionParser extends SparkSubmitOptionParser {
 
-    boolean isAppResourceReq = true;
+    boolean requiresAppResource = true;
+    private final boolean errorOnUnknownArgs;
+
+    OptionParser(boolean errorOnUnknownArgs) {
+      this.errorOnUnknownArgs = errorOnUnknownArgs;
+    }
 
     @Override
     protected boolean handle(String opt, String value) {
@@ -443,23 +470,23 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
           break;
         case KILL_SUBMISSION:
         case STATUS:
-          isAppResourceReq = false;
-          sparkArgs.add(opt);
-          sparkArgs.add(value);
+          requiresAppResource = false;
+          parsedArgs.add(opt);
+          parsedArgs.add(value);
           break;
         case HELP:
         case USAGE_ERROR:
-          isAppResourceReq = false;
-          sparkArgs.add(opt);
+          requiresAppResource = false;
+          parsedArgs.add(opt);
           break;
         case VERSION:
-          isAppResourceReq = false;
-          sparkArgs.add(opt);
+          requiresAppResource = false;
+          parsedArgs.add(opt);
           break;
         default:
-          sparkArgs.add(opt);
+          parsedArgs.add(opt);
           if (value != null) {
-            sparkArgs.add(value);
+            parsedArgs.add(value);
           }
           break;
       }
@@ -483,12 +510,13 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
         mainClass = className;
         appResource = SparkLauncher.NO_RESOURCE;
         return false;
-      } else {
+      } else if (errorOnUnknownArgs) {
         checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
         checkState(appResource == null, "Found unrecognized argument but 
resource is already set.");
         appResource = opt;
         return false;
       }
+      return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b37b4d5..a87fa68 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,12 +36,17 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+    // [SPARK-22941][core] Do not exit JVM when submit fails with in-process 
launcher.
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.printWarning"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.parseSparkConfProperty"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.printVersionAndExit"),
+
     // [SPARK-23412][ML] Add cosine distance measure to BisectingKmeans
     
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasDistanceMeasure.org$apache$spark$ml$param$shared$HasDistanceMeasure$_setter_$distanceMeasure_="),
     
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasDistanceMeasure.getDistanceMeasure"),
     
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasDistanceMeasure.distanceMeasure"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.BisectingKMeansModel#SaveLoadV1_0.load"),
-    
+
     // [SPARK-20659] Remove StorageStatus, or make it private
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkExecutorInfo.totalOffHeapStorageMemory"),
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkExecutorInfo.usedOffHeapStorageMemory"),

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index aa378c9..ccf33e8 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.mesos
 
 import java.util.concurrent.CountDownLatch
 
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.deploy.mesos.ui.MesosClusterUI
 import org.apache.spark.deploy.rest.mesos.MesosRestServer
@@ -100,7 +100,13 @@ private[mesos] object MesosClusterDispatcher
     Thread.setDefaultUncaughtExceptionHandler(new 
SparkUncaughtExceptionHandler)
     Utils.initDaemon(log)
     val conf = new SparkConf
-    val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
+    val dispatcherArgs = try {
+      new MesosClusterDispatcherArguments(args, conf)
+    } catch {
+      case e: SparkException =>
+        printErrorAndExit(e.getMessage())
+        null
+    }
     conf.setMaster(dispatcherArgs.masterUrl)
     conf.setAppName(dispatcherArgs.name)
     dispatcherArgs.zookeeperUrl.foreach { z =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb82047/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
index 096bb4e..267a428 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
@@ -21,6 +21,7 @@ import scala.annotation.tailrec
 import scala.collection.mutable
 
 import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.util.{IntParam, Utils}
 
 private[mesos] class MesosClusterDispatcherArguments(args: Array[String], 
conf: SparkConf) {
@@ -95,9 +96,8 @@ private[mesos] class MesosClusterDispatcherArguments(args: 
Array[String], conf:
       parse(tail)
 
     case ("--conf") :: value :: tail =>
-      val pair = MesosClusterDispatcher.
-        parseSparkConfProperty(value)
-        confProperties(pair._1) = pair._2
+      val (k, v) = SparkSubmitUtils.parseSparkConfProperty(value)
+      confProperties(k) = v
       parse(tail)
 
     case ("--help") :: tail =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to