Repository: spark
Updated Branches:
  refs/heads/master 592cfeab9 -> 3073344a2


[SPARK-21840][CORE] Add trait that allows conf to be directly set in 
application.

Currently SparkSubmit uses system properties to propagate configuration to
applications. This makes it hard to implement features such as SPARK-11035,
which would allow multiple applications to be started in the same JVM. The
current code would cause the config data from multiple apps to get mixed
up.

This change introduces a new trait, currently internal to Spark, that allows
the app configuration to be passed directly to the application, without
having to use system properties. The current "call main() method" behavior
is maintained as an implementation of this new trait. This will be useful
to allow multiple cluster mode apps to be submitted from the same JVM.

As part of this, SparkSubmit was modified to collect all configuration
directly into a SparkConf instance. Most of the changes are to tests so
they use SparkConf instead of an opaque map.

Tested with existing and added unit tests.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #19519 from vanzin/SPARK-21840.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3073344a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3073344a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3073344a

Branch: refs/heads/master
Commit: 3073344a2551fb198d63f2114a519ab97904cb55
Parents: 592cfea
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Thu Oct 26 15:50:27 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Oct 26 15:50:27 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkApplication.scala  |  55 +++++
 .../org/apache/spark/deploy/SparkSubmit.scala   | 160 +++++++-------
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 213 +++++++++++--------
 .../deploy/rest/StandaloneRestSubmitSuite.scala |   4 +-
 4 files changed, 257 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3073344a/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala
new file mode 100644
index 0000000..118b460
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.lang.reflect.Modifier
+
+import org.apache.spark.SparkConf
+
+/**
+ * Entry point for a Spark application. Implementations must provide a 
no-argument constructor.
+ */
+private[spark] trait SparkApplication {
+
+  def start(args: Array[String], conf: SparkConf): Unit
+
+}
+
+/**
+ * Implementation of SparkApplication that wraps a standard Java class with a 
"main" method.
+ *
+ * Configuration is propagated to the application via system properties, so 
running multiple
+ * of these in the same JVM may lead to undefined behavior due to 
configuration leaks.
+ */
+private[deploy] class JavaMainApplication(klass: Class[_]) extends 
SparkApplication {
+
+  override def start(args: Array[String], conf: SparkConf): Unit = {
+    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
+    if (!Modifier.isStatic(mainMethod.getModifiers)) {
+      throw new IllegalStateException("The main method in the given main class 
must be static")
+    }
+
+    val sysProps = conf.getAll.toMap
+    sysProps.foreach { case (k, v) =>
+      sys.props(k) = v
+    }
+
+    mainMethod.invoke(null, args)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3073344a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b7e6d0e..73b956e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -158,7 +158,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
    */
   @tailrec
   private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
-    val (childArgs, childClasspath, sysProps, childMainClass) = 
prepareSubmitEnvironment(args)
+    val (childArgs, childClasspath, sparkConf, childMainClass) = 
prepareSubmitEnvironment(args)
 
     def doRunMain(): Unit = {
       if (args.proxyUser != null) {
@@ -167,7 +167,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
         try {
           proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
             override def run(): Unit = {
-              runMain(childArgs, childClasspath, sysProps, childMainClass, 
args.verbose)
+              runMain(childArgs, childClasspath, sparkConf, childMainClass, 
args.verbose)
             }
           })
         } catch {
@@ -185,7 +185,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
             }
         }
       } else {
-        runMain(childArgs, childClasspath, sysProps, childMainClass, 
args.verbose)
+        runMain(childArgs, childClasspath, sparkConf, childMainClass, 
args.verbose)
       }
     }
 
@@ -235,11 +235,11 @@ object SparkSubmit extends CommandLineUtils with Logging {
   private[deploy] def prepareSubmitEnvironment(
       args: SparkSubmitArguments,
       conf: Option[HadoopConfiguration] = None)
-      : (Seq[String], Seq[String], Map[String, String], String) = {
+      : (Seq[String], Seq[String], SparkConf, String) = {
     // Return values
     val childArgs = new ArrayBuffer[String]()
     val childClasspath = new ArrayBuffer[String]()
-    val sysProps = new HashMap[String, String]()
+    val sparkConf = new SparkConf()
     var childMainClass = ""
 
     // Set the cluster manager
@@ -337,7 +337,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
       }
     }
 
-    val sparkConf = new SparkConf(false)
     args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
     val hadoopConf = 
conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
     val targetDir = Utils.createTempDir()
@@ -351,8 +350,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
           // for later use; e.g. in spark sql, the isolated class loader used 
to talk
           // to HiveMetastore will use these settings. They will be set as 
Java system
           // properties and then loaded by SparkConf
-          sysProps.put("spark.yarn.keytab", args.keytab)
-          sysProps.put("spark.yarn.principal", args.principal)
+          sparkConf.set(KEYTAB, args.keytab)
+          sparkConf.set(PRINCIPAL, args.principal)
           UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
         }
       }
@@ -364,23 +363,24 @@ object SparkSubmit extends CommandLineUtils with Logging {
     args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, 
hadoopConf)).orNull
     args.archives = Option(args.archives).map(resolveGlobPaths(_, 
hadoopConf)).orNull
 
+    // This security manager will not need an auth secret, but set a dummy 
value in case
+    // spark.authenticate is enabled, otherwise an exception is thrown.
+    lazy val downloadConf = 
sparkConf.clone().set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
+    lazy val secMgr = new SecurityManager(downloadConf)
+
     // In client mode, download remote files.
     var localPrimaryResource: String = null
     var localJars: String = null
     var localPyFiles: String = null
     if (deployMode == CLIENT) {
-      // This security manager will not need an auth secret, but set a dummy 
value in case
-      // spark.authenticate is enabled, otherwise an exception is thrown.
-      sparkConf.set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
-      val secMgr = new SecurityManager(sparkConf)
       localPrimaryResource = Option(args.primaryResource).map {
-        downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
+        downloadFile(_, targetDir, downloadConf, hadoopConf, secMgr)
       }.orNull
       localJars = Option(args.jars).map {
-        downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
+        downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr)
       }.orNull
       localPyFiles = Option(args.pyFiles).map {
-        downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
+        downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr)
       }.orNull
     }
 
@@ -409,7 +409,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
             if (file.exists()) {
               file.toURI.toString
             } else {
-              downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
+              downloadFile(resource, targetDir, downloadConf, hadoopConf, 
secMgr)
             }
           case _ => uri.toString
         }
@@ -449,7 +449,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
         args.files = mergeFileLists(args.files, args.pyFiles)
       }
       if (localPyFiles != null) {
-        sysProps("spark.submit.pyFiles") = localPyFiles
+        sparkConf.set("spark.submit.pyFiles", localPyFiles)
       }
     }
 
@@ -515,69 +515,69 @@ object SparkSubmit extends CommandLineUtils with Logging {
     }
 
     // Special flag to avoid deprecation warnings at the client
-    sysProps("SPARK_SUBMIT") = "true"
+    sys.props("SPARK_SUBMIT") = "true"
 
     // A list of rules to map each argument to system properties or 
command-line options in
     // each deploy mode; we iterate through these below
     val options = List[OptionAssigner](
 
       // All cluster managers
-      OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp 
= "spark.master"),
+      OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey 
= "spark.master"),
       OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
-        sysProp = "spark.submit.deployMode"),
-      OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = 
"spark.app.name"),
-      OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = 
"spark.jars.ivy"),
+        confKey = "spark.submit.deployMode"),
+      OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = 
"spark.app.name"),
+      OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = 
"spark.jars.ivy"),
       OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
-        sysProp = "spark.driver.memory"),
+        confKey = "spark.driver.memory"),
       OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
-        sysProp = "spark.driver.extraClassPath"),
+        confKey = "spark.driver.extraClassPath"),
       OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
-        sysProp = "spark.driver.extraJavaOptions"),
+        confKey = "spark.driver.extraJavaOptions"),
       OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
-        sysProp = "spark.driver.extraLibraryPath"),
+        confKey = "spark.driver.extraLibraryPath"),
 
       // Propagate attributes for dependency resolution at the driver side
-      OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, sysProp = 
"spark.jars.packages"),
+      OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = 
"spark.jars.packages"),
       OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
-        sysProp = "spark.jars.repositories"),
-      OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, sysProp = 
"spark.jars.ivy"),
+        confKey = "spark.jars.repositories"),
+      OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = 
"spark.jars.ivy"),
       OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
-        CLUSTER, sysProp = "spark.jars.excludes"),
+        CLUSTER, confKey = "spark.jars.excludes"),
 
       // Yarn only
-      OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = 
"spark.yarn.queue"),
+      OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.queue"),
       OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
-        sysProp = "spark.executor.instances"),
-      OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = 
"spark.yarn.dist.pyFiles"),
-      OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = 
"spark.yarn.dist.jars"),
-      OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = 
"spark.yarn.dist.files"),
-      OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = 
"spark.yarn.dist.archives"),
-      OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = 
"spark.yarn.principal"),
-      OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = 
"spark.yarn.keytab"),
+        confKey = "spark.executor.instances"),
+      OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.pyFiles"),
+      OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.jars"),
+      OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.files"),
+      OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.archives"),
+      OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.principal"),
+      OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.keytab"),
 
       // Other options
       OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
-        sysProp = "spark.executor.cores"),
+        confKey = "spark.executor.cores"),
       OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, 
ALL_DEPLOY_MODES,
-        sysProp = "spark.executor.memory"),
+        confKey = "spark.executor.memory"),
       OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, 
ALL_DEPLOY_MODES,
-        sysProp = "spark.cores.max"),
+        confKey = "spark.cores.max"),
       OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
-        sysProp = "spark.files"),
-      OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
-      OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp 
= "spark.jars"),
+        confKey = "spark.files"),
+      OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
+      OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey 
= "spark.jars"),
       OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
-        sysProp = "spark.driver.memory"),
+        confKey = "spark.driver.memory"),
       OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
-        sysProp = "spark.driver.cores"),
+        confKey = "spark.driver.cores"),
       OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
-        sysProp = "spark.driver.supervise"),
-      OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = 
"spark.jars.ivy"),
+        confKey = "spark.driver.supervise"),
+      OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = 
"spark.jars.ivy"),
 
       // An internal option used only for spark-shell to add user jars to 
repl's classloader,
       // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now 
may be pointed to
       // remote jars, so adding a new option to only specify local jars for 
spark-shell internally.
-      OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = 
"spark.repl.local.jars")
+      OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = 
"spark.repl.local.jars")
     )
 
     // In client mode, launch the application main class directly
@@ -610,24 +610,24 @@ object SparkSubmit extends CommandLineUtils with Logging {
           (deployMode & opt.deployMode) != 0 &&
           (clusterManager & opt.clusterManager) != 0) {
         if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
-        if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
+        if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) }
       }
     }
 
     // In case of shells, spark.ui.showConsoleProgress can be true by default 
or by user.
     if (isShell(args.primaryResource) && 
!sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
-      sysProps(UI_SHOW_CONSOLE_PROGRESS.key) = "true"
+      sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
     }
 
     // Add the application jar automatically so the user doesn't have to call 
sc.addJar
     // For YARN cluster mode, the jar is already distributed on each node as 
"app.jar"
     // For python and R files, the primary resource is already distributed as 
a regular file
     if (!isYarnCluster && !args.isPython && !args.isR) {
-      var jars = sysProps.get("spark.jars").map(x => 
x.split(",").toSeq).getOrElse(Seq.empty)
+      var jars = sparkConf.getOption("spark.jars").map(x => 
x.split(",").toSeq).getOrElse(Seq.empty)
       if (isUserJar(args.primaryResource)) {
         jars = jars ++ Seq(args.primaryResource)
       }
-      sysProps.put("spark.jars", jars.mkString(","))
+      sparkConf.set("spark.jars", jars.mkString(","))
     }
 
     // In standalone cluster mode, use the REST client to submit the 
application (Spark 1.3+).
@@ -653,12 +653,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
     // Let YARN know it's a pyspark app, so it distributes needed libraries.
     if (clusterManager == YARN) {
       if (args.isPython) {
-        sysProps.put("spark.yarn.isPython", "true")
+        sparkConf.set("spark.yarn.isPython", "true")
       }
     }
 
     if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
-      setRMPrincipal(sysProps)
+      setRMPrincipal(sparkConf)
     }
 
     // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
@@ -689,7 +689,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
         // Second argument is main class
         childArgs += (args.primaryResource, "")
         if (args.pyFiles != null) {
-          sysProps("spark.submit.pyFiles") = args.pyFiles
+          sparkConf.set("spark.submit.pyFiles", args.pyFiles)
         }
       } else if (args.isR) {
         // Second argument is main class
@@ -704,12 +704,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
 
     // Load any properties specified through --conf and the default properties 
file
     for ((k, v) <- args.sparkProperties) {
-      sysProps.getOrElseUpdate(k, v)
+      sparkConf.setIfMissing(k, v)
     }
 
     // Ignore invalid spark.driver.host in cluster modes.
     if (deployMode == CLUSTER) {
-      sysProps -= "spark.driver.host"
+      sparkConf.remove("spark.driver.host")
     }
 
     // Resolve paths in certain spark properties
@@ -721,15 +721,15 @@ object SparkSubmit extends CommandLineUtils with Logging {
       "spark.yarn.dist.jars")
     pathConfigs.foreach { config =>
       // Replace old URIs with resolved URIs, if they exist
-      sysProps.get(config).foreach { oldValue =>
-        sysProps(config) = Utils.resolveURIs(oldValue)
+      sparkConf.getOption(config).foreach { oldValue =>
+        sparkConf.set(config, Utils.resolveURIs(oldValue))
       }
     }
 
     // Resolve and format python file paths properly before adding them to the 
PYTHONPATH.
     // The resolving part is redundant in the case of --py-files, but 
necessary if the user
     // explicitly sets `spark.submit.pyFiles` in his/her default properties 
file.
-    sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
+    sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
       val resolvedPyFiles = Utils.resolveURIs(pyFiles)
       val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
         PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
@@ -739,22 +739,22 @@ object SparkSubmit extends CommandLineUtils with Logging {
         // locally.
         resolvedPyFiles
       }
-      sysProps("spark.submit.pyFiles") = formattedPyFiles
+      sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
     }
 
-    (childArgs, childClasspath, sysProps, childMainClass)
+    (childArgs, childClasspath, sparkConf, childMainClass)
   }
 
   // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches 
delegation tokens with
   // renewer set to the YARN ResourceManager.  Since YARN isn't configured in 
Mesos mode, we
   // must trick it into thinking we're YARN.
-  private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = {
+  private def setRMPrincipal(sparkConf: SparkConf): Unit = {
     val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
     val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
     // scalastyle:off println
     printStream.println(s"Setting ${key} to ${shortUserName}")
     // scalastyle:off println
-    sysProps.put(key, shortUserName)
+    sparkConf.set(key, shortUserName)
   }
 
   /**
@@ -766,7 +766,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
   private def runMain(
       childArgs: Seq[String],
       childClasspath: Seq[String],
-      sysProps: Map[String, String],
+      sparkConf: SparkConf,
       childMainClass: String,
       verbose: Boolean): Unit = {
     // scalastyle:off println
@@ -774,14 +774,14 @@ object SparkSubmit extends CommandLineUtils with Logging {
       printStream.println(s"Main class:\n$childMainClass")
       printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
       // sysProps may contain sensitive information, so redact before printing
-      printStream.println(s"System 
properties:\n${Utils.redact(sysProps).mkString("\n")}")
+      printStream.println(s"Spark 
config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
       printStream.println(s"Classpath 
elements:\n${childClasspath.mkString("\n")}")
       printStream.println("\n")
     }
     // scalastyle:on println
 
     val loader =
-      if (sysProps.getOrElse("spark.driver.userClassPathFirst", 
"false").toBoolean) {
+      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
         new ChildFirstURLClassLoader(new Array[URL](0),
           Thread.currentThread.getContextClassLoader)
       } else {
@@ -794,10 +794,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
       addJarToClasspath(jar, loader)
     }
 
-    for ((key, value) <- sysProps) {
-      System.setProperty(key, value)
-    }
-
     var mainClass: Class[_] = null
 
     try {
@@ -823,14 +819,14 @@ object SparkSubmit extends CommandLineUtils with Logging {
         System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
     }
 
-    // SPARK-4170
-    if (classOf[scala.App].isAssignableFrom(mainClass)) {
-      printWarning("Subclasses of scala.App may not work correctly. Use a 
main() method instead.")
-    }
-
-    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
-    if (!Modifier.isStatic(mainMethod.getModifiers)) {
-      throw new IllegalStateException("The main method in the given main class 
must be static")
+    val app: SparkApplication = if 
(classOf[SparkApplication].isAssignableFrom(mainClass)) {
+      mainClass.newInstance().asInstanceOf[SparkApplication]
+    } else {
+      // SPARK-4170
+      if (classOf[scala.App].isAssignableFrom(mainClass)) {
+        printWarning("Subclasses of scala.App may not work correctly. Use a 
main() method instead.")
+      }
+      new JavaMainApplication(mainClass)
     }
 
     @tailrec
@@ -844,7 +840,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
     }
 
     try {
-      mainMethod.invoke(null, childArgs.toArray)
+      app.start(childArgs.toArray, sparkConf)
     } catch {
       case t: Throwable =>
         findCause(t) match {
@@ -1271,4 +1267,4 @@ private case class OptionAssigner(
     clusterManager: Int,
     deployMode: Int,
     clOption: String = null,
-    sysProp: String = null)
+    confKey: String = null)

http://git-wip-us.apache.org/repos/asf/spark/blob/3073344a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b52da4c..cfbf56f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -176,10 +176,10 @@ class SparkSubmitSuite
       "thejar.jar"
     )
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (_, _, sysProps, _) = prepareSubmitEnvironment(appArgs)
+    val (_, _, conf, _) = prepareSubmitEnvironment(appArgs)
 
     appArgs.deployMode should be ("client")
-    sysProps("spark.submit.deployMode") should be ("client")
+    conf.get("spark.submit.deployMode") should be ("client")
 
     // Both cmd line and configuration are specified, cmdline option takes the 
priority
     val clArgs1 = Seq(
@@ -190,10 +190,10 @@ class SparkSubmitSuite
       "thejar.jar"
     )
     val appArgs1 = new SparkSubmitArguments(clArgs1)
-    val (_, _, sysProps1, _) = prepareSubmitEnvironment(appArgs1)
+    val (_, _, conf1, _) = prepareSubmitEnvironment(appArgs1)
 
     appArgs1.deployMode should be ("cluster")
-    sysProps1("spark.submit.deployMode") should be ("cluster")
+    conf1.get("spark.submit.deployMode") should be ("cluster")
 
     // Neither cmdline nor configuration are specified, client mode is the 
default choice
     val clArgs2 = Seq(
@@ -204,9 +204,9 @@ class SparkSubmitSuite
     val appArgs2 = new SparkSubmitArguments(clArgs2)
     appArgs2.deployMode should be (null)
 
-    val (_, _, sysProps2, _) = prepareSubmitEnvironment(appArgs2)
+    val (_, _, conf2, _) = prepareSubmitEnvironment(appArgs2)
     appArgs2.deployMode should be ("client")
-    sysProps2("spark.submit.deployMode") should be ("client")
+    conf2.get("spark.submit.deployMode") should be ("client")
   }
 
   test("handles YARN cluster mode") {
@@ -227,7 +227,7 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
     val childArgsStr = childArgs.mkString(" ")
     childArgsStr should include ("--class org.SomeClass")
     childArgsStr should include ("--arg arg1 --arg arg2")
@@ -240,16 +240,16 @@ class SparkSubmitSuite
     classpath(2) should endWith ("two.jar")
     classpath(3) should endWith ("three.jar")
 
-    sysProps("spark.executor.memory") should be ("5g")
-    sysProps("spark.driver.memory") should be ("4g")
-    sysProps("spark.executor.cores") should be ("5")
-    sysProps("spark.yarn.queue") should be ("thequeue")
-    sysProps("spark.yarn.dist.jars") should include regex 
(".*one.jar,.*two.jar,.*three.jar")
-    sysProps("spark.yarn.dist.files") should include regex 
(".*file1.txt,.*file2.txt")
-    sysProps("spark.yarn.dist.archives") should include regex 
(".*archive1.txt,.*archive2.txt")
-    sysProps("spark.app.name") should be ("beauty")
-    sysProps("spark.ui.enabled") should be ("false")
-    sysProps("SPARK_SUBMIT") should be ("true")
+    conf.get("spark.executor.memory") should be ("5g")
+    conf.get("spark.driver.memory") should be ("4g")
+    conf.get("spark.executor.cores") should be ("5")
+    conf.get("spark.yarn.queue") should be ("thequeue")
+    conf.get("spark.yarn.dist.jars") should include regex 
(".*one.jar,.*two.jar,.*three.jar")
+    conf.get("spark.yarn.dist.files") should include regex 
(".*file1.txt,.*file2.txt")
+    conf.get("spark.yarn.dist.archives") should include regex 
(".*archive1.txt,.*archive2.txt")
+    conf.get("spark.app.name") should be ("beauty")
+    conf.get("spark.ui.enabled") should be ("false")
+    sys.props("SPARK_SUBMIT") should be ("true")
   }
 
   test("handles YARN client mode") {
@@ -270,7 +270,7 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
     classpath should have length (4)
@@ -278,17 +278,17 @@ class SparkSubmitSuite
     classpath(1) should endWith ("one.jar")
     classpath(2) should endWith ("two.jar")
     classpath(3) should endWith ("three.jar")
-    sysProps("spark.app.name") should be ("trill")
-    sysProps("spark.executor.memory") should be ("5g")
-    sysProps("spark.executor.cores") should be ("5")
-    sysProps("spark.yarn.queue") should be ("thequeue")
-    sysProps("spark.executor.instances") should be ("6")
-    sysProps("spark.yarn.dist.files") should include regex 
(".*file1.txt,.*file2.txt")
-    sysProps("spark.yarn.dist.archives") should include regex 
(".*archive1.txt,.*archive2.txt")
-    sysProps("spark.yarn.dist.jars") should include
+    conf.get("spark.app.name") should be ("trill")
+    conf.get("spark.executor.memory") should be ("5g")
+    conf.get("spark.executor.cores") should be ("5")
+    conf.get("spark.yarn.queue") should be ("thequeue")
+    conf.get("spark.executor.instances") should be ("6")
+    conf.get("spark.yarn.dist.files") should include regex 
(".*file1.txt,.*file2.txt")
+    conf.get("spark.yarn.dist.archives") should include regex 
(".*archive1.txt,.*archive2.txt")
+    conf.get("spark.yarn.dist.jars") should include
       regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
-    sysProps("SPARK_SUBMIT") should be ("true")
-    sysProps("spark.ui.enabled") should be ("false")
+    conf.get("spark.ui.enabled") should be ("false")
+    sys.props("SPARK_SUBMIT") should be ("true")
   }
 
   test("handles standalone cluster mode") {
@@ -316,7 +316,7 @@ class SparkSubmitSuite
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     appArgs.useRest = useRest
-    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
     val childArgsStr = childArgs.mkString(" ")
     if (useRest) {
       childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
@@ -327,17 +327,18 @@ class SparkSubmitSuite
       mainClass should be ("org.apache.spark.deploy.Client")
     }
     classpath should have size 0
-    sysProps should have size 9
-    sysProps.keys should contain ("SPARK_SUBMIT")
-    sysProps.keys should contain ("spark.master")
-    sysProps.keys should contain ("spark.app.name")
-    sysProps.keys should contain ("spark.jars")
-    sysProps.keys should contain ("spark.driver.memory")
-    sysProps.keys should contain ("spark.driver.cores")
-    sysProps.keys should contain ("spark.driver.supervise")
-    sysProps.keys should contain ("spark.ui.enabled")
-    sysProps.keys should contain ("spark.submit.deployMode")
-    sysProps("spark.ui.enabled") should be ("false")
+    sys.props("SPARK_SUBMIT") should be ("true")
+
+    val confMap = conf.getAll.toMap
+    confMap.keys should contain ("spark.master")
+    confMap.keys should contain ("spark.app.name")
+    confMap.keys should contain ("spark.jars")
+    confMap.keys should contain ("spark.driver.memory")
+    confMap.keys should contain ("spark.driver.cores")
+    confMap.keys should contain ("spark.driver.supervise")
+    confMap.keys should contain ("spark.ui.enabled")
+    confMap.keys should contain ("spark.submit.deployMode")
+    conf.get("spark.ui.enabled") should be ("false")
   }
 
   test("handles standalone client mode") {
@@ -352,14 +353,14 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
     classpath should have length (1)
     classpath(0) should endWith ("thejar.jar")
-    sysProps("spark.executor.memory") should be ("5g")
-    sysProps("spark.cores.max") should be ("5")
-    sysProps("spark.ui.enabled") should be ("false")
+    conf.get("spark.executor.memory") should be ("5g")
+    conf.get("spark.cores.max") should be ("5")
+    conf.get("spark.ui.enabled") should be ("false")
   }
 
   test("handles mesos client mode") {
@@ -374,14 +375,14 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (childArgs, classpath, sysProps, mainClass) = 
prepareSubmitEnvironment(appArgs)
+    val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
     classpath should have length (1)
     classpath(0) should endWith ("thejar.jar")
-    sysProps("spark.executor.memory") should be ("5g")
-    sysProps("spark.cores.max") should be ("5")
-    sysProps("spark.ui.enabled") should be ("false")
+    conf.get("spark.executor.memory") should be ("5g")
+    conf.get("spark.cores.max") should be ("5")
+    conf.get("spark.ui.enabled") should be ("false")
   }
 
   test("handles confs with flag equivalents") {
@@ -394,23 +395,26 @@ class SparkSubmitSuite
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val (_, _, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
-    sysProps("spark.executor.memory") should be ("5g")
-    sysProps("spark.master") should be ("yarn")
-    sysProps("spark.submit.deployMode") should be ("cluster")
+    val (_, _, conf, mainClass) = prepareSubmitEnvironment(appArgs)
+    conf.get("spark.executor.memory") should be ("5g")
+    conf.get("spark.master") should be ("yarn")
+    conf.get("spark.submit.deployMode") should be ("cluster")
     mainClass should be ("org.apache.spark.deploy.yarn.Client")
   }
 
   test("SPARK-21568 ConsoleProgressBar should be enabled only in shells") {
+    // Unset from system properties since this config is defined in the root 
pom's test config.
+    sys.props -= UI_SHOW_CONSOLE_PROGRESS.key
+
     val clArgs1 = Seq("--class", "org.apache.spark.repl.Main", "spark-shell")
     val appArgs1 = new SparkSubmitArguments(clArgs1)
-    val (_, _, sysProps1, _) = prepareSubmitEnvironment(appArgs1)
-    sysProps1(UI_SHOW_CONSOLE_PROGRESS.key) should be ("true")
+    val (_, _, conf1, _) = prepareSubmitEnvironment(appArgs1)
+    conf1.get(UI_SHOW_CONSOLE_PROGRESS) should be (true)
 
     val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
     val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val (_, _, sysProps2, _) = prepareSubmitEnvironment(appArgs2)
-    sysProps2.keys should not contain UI_SHOW_CONSOLE_PROGRESS.key
+    val (_, _, conf2, _) = prepareSubmitEnvironment(appArgs2)
+    assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS))
   }
 
   test("launch simple application with spark-submit") {
@@ -585,11 +589,11 @@ class SparkSubmitSuite
       "--files", files,
       "thejar.jar")
     val appArgs = new SparkSubmitArguments(clArgs)
-    val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3
+    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs)
     appArgs.jars should be (Utils.resolveURIs(jars))
     appArgs.files should be (Utils.resolveURIs(files))
-    sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
-    sysProps("spark.files") should be (Utils.resolveURIs(files))
+    conf.get("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
+    conf.get("spark.files") should be (Utils.resolveURIs(files))
 
     // Test files and archives (Yarn)
     val clArgs2 = Seq(
@@ -600,11 +604,11 @@ class SparkSubmitSuite
       "thejar.jar"
     )
     val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3
+    val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2)
     appArgs2.files should be (Utils.resolveURIs(files))
     appArgs2.archives should be (Utils.resolveURIs(archives))
-    sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
-    sysProps2("spark.yarn.dist.archives") should be 
(Utils.resolveURIs(archives))
+    conf2.get("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
+    conf2.get("spark.yarn.dist.archives") should be 
(Utils.resolveURIs(archives))
 
     // Test python files
     val clArgs3 = Seq(
@@ -615,12 +619,12 @@ class SparkSubmitSuite
       "mister.py"
     )
     val appArgs3 = new SparkSubmitArguments(clArgs3)
-    val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3
+    val (_, _, conf3, _) = SparkSubmit.prepareSubmitEnvironment(appArgs3)
     appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
-    sysProps3("spark.submit.pyFiles") should be (
+    conf3.get("spark.submit.pyFiles") should be (
       PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
-    sysProps3(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4")
-    sysProps3(PYSPARK_PYTHON.key) should be ("python3.5")
+    conf3.get(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4")
+    conf3.get(PYSPARK_PYTHON.key) should be ("python3.5")
   }
 
   test("resolves config paths correctly") {
@@ -644,9 +648,9 @@ class SparkSubmitSuite
       "thejar.jar"
     )
     val appArgs = new SparkSubmitArguments(clArgs)
-    val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3
-    sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
-    sysProps("spark.files") should be(Utils.resolveURIs(files))
+    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs)
+    conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+    conf.get("spark.files") should be(Utils.resolveURIs(files))
 
     // Test files and archives (Yarn)
     val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
@@ -661,9 +665,9 @@ class SparkSubmitSuite
       "thejar.jar"
     )
     val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3
-    sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
-    sysProps2("spark.yarn.dist.archives") should 
be(Utils.resolveURIs(archives))
+    val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2)
+    conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
+    conf2.get("spark.yarn.dist.archives") should 
be(Utils.resolveURIs(archives))
 
     // Test python files
     val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
@@ -676,8 +680,8 @@ class SparkSubmitSuite
       "mister.py"
     )
     val appArgs3 = new SparkSubmitArguments(clArgs3)
-    val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3
-    sysProps3("spark.submit.pyFiles") should be(
+    val (_, _, conf3, _) = SparkSubmit.prepareSubmitEnvironment(appArgs3)
+    conf3.get("spark.submit.pyFiles") should be(
       PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
 
     // Test remote python files
@@ -693,11 +697,9 @@ class SparkSubmitSuite
       "hdfs:///tmp/mister.py"
     )
     val appArgs4 = new SparkSubmitArguments(clArgs4)
-    val sysProps4 = SparkSubmit.prepareSubmitEnvironment(appArgs4)._3
+    val (_, _, conf4, _) = SparkSubmit.prepareSubmitEnvironment(appArgs4)
     // Should not format python path for yarn cluster mode
-    sysProps4("spark.submit.pyFiles") should be(
-      Utils.resolveURIs(remotePyFiles)
-    )
+    conf4.get("spark.submit.pyFiles") should 
be(Utils.resolveURIs(remotePyFiles))
   }
 
   test("user classpath first in driver") {
@@ -771,14 +773,14 @@ class SparkSubmitSuite
       jar2.toString)
 
     val appArgs = new SparkSubmitArguments(args)
-    val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3
-    sysProps("spark.yarn.dist.jars").split(",").toSet should be
+    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs)
+    conf.get("spark.yarn.dist.jars").split(",").toSet should be
       (Set(jar1.toURI.toString, jar2.toURI.toString))
-    sysProps("spark.yarn.dist.files").split(",").toSet should be
+    conf.get("spark.yarn.dist.files").split(",").toSet should be
       (Set(file1.toURI.toString, file2.toURI.toString))
-    sysProps("spark.yarn.dist.pyFiles").split(",").toSet should be
+    conf.get("spark.yarn.dist.pyFiles").split(",").toSet should be
       (Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath))
-    sysProps("spark.yarn.dist.archives").split(",").toSet should be
+    conf.get("spark.yarn.dist.archives").split(",").toSet should be
       (Set(archive1.toURI.toString, archive2.toURI.toString))
   }
 
@@ -897,18 +899,18 @@ class SparkSubmitSuite
       )
 
     val appArgs = new SparkSubmitArguments(args)
-    val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, 
Some(hadoopConf))._3
+    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs, 
Some(hadoopConf))
 
     // All the resources should still be remote paths, so that YARN client 
will not upload again.
-    sysProps("spark.yarn.dist.jars") should be (tmpJarPath)
-    sysProps("spark.yarn.dist.files") should be 
(s"s3a://${file.getAbsolutePath}")
-    sysProps("spark.yarn.dist.pyFiles") should be 
(s"s3a://${pyFile.getAbsolutePath}")
+    conf.get("spark.yarn.dist.jars") should be (tmpJarPath)
+    conf.get("spark.yarn.dist.files") should be 
(s"s3a://${file.getAbsolutePath}")
+    conf.get("spark.yarn.dist.pyFiles") should be 
(s"s3a://${pyFile.getAbsolutePath}")
 
     // Local repl jars should be a local path.
-    sysProps("spark.repl.local.jars") should (startWith("file:"))
+    conf.get("spark.repl.local.jars") should (startWith("file:"))
 
     // local py files should not be a URI format.
-    sysProps("spark.submit.pyFiles") should (startWith("/"))
+    conf.get("spark.submit.pyFiles") should (startWith("/"))
   }
 
   test("download remote resource if it is not supported by yarn service") {
@@ -955,9 +957,9 @@ class SparkSubmitSuite
     )
 
     val appArgs = new SparkSubmitArguments(args)
-    val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, 
Some(hadoopConf))._3
+    val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs, 
Some(hadoopConf))
 
-    val jars = sysProps("spark.yarn.dist.jars").split(",").toSet
+    val jars = conf.get("spark.yarn.dist.jars").split(",").toSet
 
     // The URI of remote S3 resource should still be remote.
     assert(jars.contains(tmpS3JarPath))
@@ -996,6 +998,21 @@ class SparkSubmitSuite
     conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName)
     conf.set("fs.s3a.impl.disable.cache", "true")
   }
+
+  test("start SparkApplication without modifying system properties") {
+    val args = Array(
+      "--class", classOf[TestSparkApplication].getName(),
+      "--master", "local",
+      "--conf", "spark.test.hello=world",
+      "spark-internal",
+      "hello")
+
+    val exception = intercept[SparkException] {
+      SparkSubmit.main(args)
+    }
+
+    assert(exception.getMessage() === "hello")
+  }
 }
 
 object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
@@ -1115,3 +1132,17 @@ class TestFileSystem extends 
org.apache.hadoop.fs.LocalFileSystem {
 
   override def open(path: Path): FSDataInputStream = super.open(local(path))
 }
+
+class TestSparkApplication extends SparkApplication with Matchers {
+
+  override def start(args: Array[String], conf: SparkConf): Unit = {
+    assert(args.size === 1)
+    assert(args(0) === "hello")
+    assert(conf.get("spark.test.hello") === "world")
+    assert(sys.props.get("spark.test.hello") === None)
+
+    // This is how the test verifies the application was actually run.
+    throw new SparkException(args(0))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3073344a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 70887dc..490baf0 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -445,9 +445,9 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with 
BeforeAndAfterEach {
       "--class", mainClass,
       mainJar) ++ appArgs
     val args = new SparkSubmitArguments(commandLineArgs)
-    val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args)
+    val (_, _, sparkConf, _) = SparkSubmit.prepareSubmitEnvironment(args)
     new RestSubmissionClient("spark://host:port").constructSubmitRequest(
-      mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty)
+      mainJar, mainClass, appArgs, sparkConf.getAll.toMap, Map.empty)
   }
 
   /** Return the response as a submit response, or fail with error otherwise. 
*/


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to