Repository: spark
Updated Branches:
  refs/heads/master f81401e1c -> e1dd03e42


[SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.

The main goal of this change is to allow multiple cluster-mode
submissions from the same JVM, without having them end up with
mixed configuration. That is done by extending the SparkApplication
trait, and doing so was reasonably trivial for standalone and
mesos modes.

For YARN mode, there was a complication. YARN used a "SPARK_YARN_MODE"
system property to control behavior indirectly in a whole bunch of
places, mainly in the SparkHadoopUtil / YarnSparkHadoopUtil classes.
Most of the changes here are removing that.

Since we removed support for Hadoop 1.x, some methods that lived in
YarnSparkHadoopUtil can now live in SparkHadoopUtil. The remaining
methods don't need to be part of the class, and can be called directly
from the YarnSparkHadoopUtil object, so now there's a single
implementation of SparkHadoopUtil.

There were two places in the code that relied on  SPARK_YARN_MODE to
make decisions about YARN-specific functionality, and now explicitly check
the master from the configuration for that instead:

* fetching the external shuffle service port, which can come from the YARN
  configuration.

* propagation of the authentication secret using Hadoop credentials. This also
  was cleaned up a little to not need so many methods in `SparkHadoopUtil`.

With those out of the way, actually changing the YARN client
to extend SparkApplication was easy.

Tested with existing unit tests, and also by running YARN apps
with auth and kerberos both on and off in a real cluster.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #19631 from vanzin/SPARK-22372.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1dd03e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1dd03e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1dd03e4

Branch: refs/heads/master
Commit: e1dd03e42c2131b167b1e80c761291e88bfdf03f
Parents: f81401e
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Mon Dec 4 11:05:03 2017 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Dec 4 11:05:03 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/SecurityManager.scala      | 102 +++++++-------
 .../scala/org/apache/spark/SparkContext.scala   |   3 -
 .../main/scala/org/apache/spark/SparkEnv.scala  |   4 +
 .../scala/org/apache/spark/deploy/Client.scala  |   8 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  48 +------
 .../org/apache/spark/deploy/SparkSubmit.scala   |  31 ++---
 .../deploy/rest/RestSubmissionClient.scala      |  31 ++---
 .../executor/CoarseGrainedExecutorBackend.scala |  10 +-
 .../scala/org/apache/spark/util/Utils.scala     |   6 +-
 .../org/apache/spark/SecurityManagerSuite.scala |  31 ++++-
 .../apache/spark/deploy/SparkSubmitSuite.scala  |   8 +-
 .../deploy/rest/StandaloneRestSubmitSuite.scala |   2 +-
 project/MimaExcludes.scala                      |   6 +
 .../spark/deploy/yarn/ApplicationMaster.scala   |  54 ++++----
 .../org/apache/spark/deploy/yarn/Client.scala   |  65 +++++----
 .../apache/spark/deploy/yarn/YarnRMClient.scala |   2 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 135 ++++++-------------
 .../yarn/security/AMCredentialRenewer.scala     |   2 +-
 .../cluster/YarnClientSchedulerBackend.scala    |   4 +-
 .../cluster/YarnClusterSchedulerBackend.scala   |   2 +-
 .../deploy/yarn/BaseYarnClusterSuite.scala      |   5 -
 .../apache/spark/deploy/yarn/ClientSuite.scala  |  31 +----
 .../spark/deploy/yarn/YarnClusterSuite.scala    |  20 ++-
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  |  49 +------
 .../YARNHadoopDelegationTokenManagerSuite.scala |  11 +-
 25 files changed, 274 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 2480e56..4c1dbe3 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.lang.{Byte => JByte}
 import java.net.{Authenticator, PasswordAuthentication}
+import java.nio.charset.StandardCharsets.UTF_8
 import java.security.{KeyStore, SecureRandom}
 import java.security.cert.X509Certificate
 import javax.net.ssl._
@@ -26,10 +27,11 @@ import javax.net.ssl._
 import com.google.common.hash.HashCodes
 import com.google.common.io.Files
 import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.sasl.SecretKeyHolder
 import org.apache.spark.util.Utils
 
@@ -225,7 +227,6 @@ private[spark] class SecurityManager(
   setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
   setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));
 
-  private val secretKey = generateSecretKey()
   logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else 
"disabled") +
     "; ui acls " + (if (aclsOn) "enabled" else "disabled") +
     "; users  with view permissions: " + viewAcls.toString() +
@@ -417,50 +418,6 @@ private[spark] class SecurityManager(
   def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey
 
   /**
-   * Generates or looks up the secret key.
-   *
-   * The way the key is stored depends on the Spark deployment mode. Yarn
-   * uses the Hadoop UGI.
-   *
-   * For non-Yarn deployments, If the config variable is not set
-   * we throw an exception.
-   */
-  private def generateSecretKey(): String = {
-    if (!isAuthenticationEnabled) {
-      null
-    } else if (SparkHadoopUtil.get.isYarnMode) {
-      // In YARN mode, the secure cookie will be created by the driver and 
stashed in the
-      // user's credentials, where executors can get it. The check for an 
array of size 0
-      // is because of the test code in YarnSparkHadoopUtilSuite.
-      val secretKey = 
SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY)
-      if (secretKey == null || secretKey.length == 0) {
-        logDebug("generateSecretKey: yarn mode, secret key from credentials is 
null")
-        val rnd = new SecureRandom()
-        val length = sparkConf.getInt("spark.authenticate.secretBitLength", 
256) / JByte.SIZE
-        val secret = new Array[Byte](length)
-        rnd.nextBytes(secret)
-
-        val cookie = HashCodes.fromBytes(secret).toString()
-        SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, 
cookie)
-        cookie
-      } else {
-        new Text(secretKey).toString
-      }
-    } else {
-      // user must have set spark.authenticate.secret config
-      // For Master/Worker, auth secret is in conf; for Executors, it is in 
env variable
-      Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))
-        .orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) 
match {
-        case Some(value) => value
-        case None =>
-          throw new IllegalArgumentException(
-            "Error: a secret key must be specified via the " +
-              SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
-      }
-    }
-  }
-
-  /**
    * Check to see if Acls for the UI are enabled
    * @return true if UI authentication is enabled, otherwise false
    */
@@ -542,7 +499,51 @@ private[spark] class SecurityManager(
    * Gets the secret key.
    * @return the secret key as a String if authentication is enabled, 
otherwise returns null
    */
-  def getSecretKey(): String = secretKey
+  def getSecretKey(): String = {
+    if (isAuthenticationEnabled) {
+      val creds = UserGroupInformation.getCurrentUser().getCredentials()
+      Option(creds.getSecretKey(SECRET_LOOKUP_KEY))
+        .map { bytes => new String(bytes, UTF_8) }
+        .orElse(Option(sparkConf.getenv(ENV_AUTH_SECRET)))
+        .orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
+        .getOrElse {
+          throw new IllegalArgumentException(
+            s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF 
config")
+        }
+    } else {
+      null
+    }
+  }
+
+  /**
+   * Initialize the authentication secret.
+   *
+   * If authentication is disabled, do nothing.
+   *
+   * In YARN mode, generate a new secret and store it in the current user's 
credentials.
+   *
+   * In other modes, assert that the auth secret is set in the configuration.
+   */
+  def initializeAuth(): Unit = {
+    if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
+      return
+    }
+
+    if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
+      require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
+        s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF 
config.")
+      return
+    }
+
+    val rnd = new SecureRandom()
+    val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / 
JByte.SIZE
+    val secretBytes = new Array[Byte](length)
+    rnd.nextBytes(secretBytes)
+
+    val creds = new Credentials()
+    creds.addSecretKey(SECRET_LOOKUP_KEY, secretBytes)
+    UserGroupInformation.getCurrentUser().addCredentials(creds)
+  }
 
   // Default SecurityManager only has a single secret key, so ignore appId.
   override def getSaslUser(appId: String): String = getSaslUser()
@@ -551,13 +552,12 @@ private[spark] class SecurityManager(
 
 private[spark] object SecurityManager {
 
-  val SPARK_AUTH_CONF: String = "spark.authenticate"
-  val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
+  val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key
+  val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret"
   // This is used to set auth secret to an executor's env variable. It should 
have the same
   // value as SPARK_AUTH_SECRET_CONF set in SparkConf
   val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
 
   // key used to store the spark secret in the Hadoop UGI
-  val SECRET_LOOKUP_KEY = "sparkCookie"
-
+  val SECRET_LOOKUP_KEY = new Text("sparkCookie")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c174939..71f1e7c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -413,8 +413,6 @@ class SparkContext(config: SparkConf) extends Logging {
       }
     }
 
-    if (master == "yarn" && deployMode == "client") 
System.setProperty("SPARK_YARN_MODE", "true")
-
     _listenerBus = new LiveListenerBus(_conf)
 
     // Initialize the app status store and listener before SparkEnv is created 
so that it gets
@@ -1955,7 +1953,6 @@ class SparkContext(config: SparkConf) extends Logging {
     // `SparkContext` is stopped.
     localProperties.remove()
     // Unset YARN mode system env variable, to allow switching between cluster 
types.
-    System.clearProperty("SPARK_YARN_MODE")
     SparkContext.clearActiveContext()
     logInfo("Successfully stopped SparkContext")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 2492815..72123f2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -234,6 +234,10 @@ object SparkEnv extends Logging {
     }
 
     val securityManager = new SecurityManager(conf, ioEncryptionKey)
+    if (isDriver) {
+      securityManager.initializeAuth()
+    }
+
     ioEncryptionKey.foreach { _ =>
       if (!securityManager.isEncryptionEnabled()) {
         logWarning("I/O encryption enabled without RPC encryption: keys will 
be visible on the " +

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 7acb5c5..d514509 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -217,8 +217,13 @@ object Client {
       println("Use ./bin/spark-submit with \"--master spark://host:port\"")
     }
     // scalastyle:on println
+    new ClientApp().start(args, new SparkConf())
+  }
+}
 
-    val conf = new SparkConf()
+private[spark] class ClientApp extends SparkApplication {
+
+  override def start(args: Array[String], conf: SparkConf): Unit = {
     val driverArgs = new ClientArguments(args)
 
     if (!conf.contains("spark.rpc.askTimeout")) {
@@ -235,4 +240,5 @@ object Client {
 
     rpcEnv.awaitTermination()
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 17c7319..e14f984 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -75,9 +75,7 @@ class SparkHadoopUtil extends Logging {
   }
 
   def transferCredentials(source: UserGroupInformation, dest: 
UserGroupInformation) {
-    for (token <- source.getTokens.asScala) {
-      dest.addToken(token)
-    }
+    dest.addCredentials(source.getCredentials())
   }
 
   /**
@@ -120,16 +118,9 @@ class SparkHadoopUtil extends Logging {
    * Add any user credentials to the job conf which are necessary for running 
on a secure Hadoop
    * cluster.
    */
-  def addCredentials(conf: JobConf) {}
-
-  def isYarnMode(): Boolean = { false }
-
-  def addSecretKeyToUserCredentials(key: String, secret: String) {}
-
-  def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
-
-  def getCurrentUserCredentials(): Credentials = {
-    UserGroupInformation.getCurrentUser().getCredentials()
+  def addCredentials(conf: JobConf): Unit = {
+    val jobCreds = conf.getCredentials()
+    jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
   }
 
   def addCurrentUserCredentials(creds: Credentials): Unit = {
@@ -329,17 +320,6 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
-   * Start a thread to periodically update the current user's credentials with 
new credentials so
-   * that access to secured service does not fail.
-   */
-  private[spark] def startCredentialUpdater(conf: SparkConf) {}
-
-  /**
-   * Stop the thread that does the credential updates.
-   */
-  private[spark] def stopCredentialUpdater() {}
-
-  /**
    * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
    * This is to prevent the DFSClient from using an old cached token to 
connect to the NameNode.
    */
@@ -441,14 +421,7 @@ class SparkHadoopUtil extends Logging {
 
 object SparkHadoopUtil {
 
-  private lazy val hadoop = new SparkHadoopUtil
-  private lazy val yarn = try {
-    Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
-      .newInstance()
-      .asInstanceOf[SparkHadoopUtil]
-  } catch {
-    case e: Exception => throw new SparkException("Unable to load YARN 
support", e)
-  }
+  private lazy val instance = new SparkHadoopUtil
 
   val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
 
@@ -462,16 +435,7 @@ object SparkHadoopUtil {
    */
   private[spark] val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = 1000
 
-  def get: SparkHadoopUtil = {
-    // Check each time to support changing to/from YARN
-    val yarnMode = java.lang.Boolean.parseBoolean(
-        System.getProperty("SPARK_YARN_MODE", 
System.getenv("SPARK_YARN_MODE")))
-    if (yarnMode) {
-      yarn
-    } else {
-      hadoop
-    }
-  }
+  def get: SparkHadoopUtil = instance
 
   /**
    * Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the 
date

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 73b956e..cfcdce6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -92,6 +92,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
 
   private val CLASS_NOT_FOUND_EXIT_STATUS = 101
 
+  // Following constants are visible for testing.
+  private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
+    "org.apache.spark.deploy.yarn.YarnClusterApplication"
+  private[deploy] val REST_CLUSTER_SUBMIT_CLASS = 
classOf[RestSubmissionClientApp].getName()
+  private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = 
classOf[ClientApp].getName()
+
   // scalastyle:off println
   private[spark] def printVersionAndExit(): Unit = {
     printStream.println("""Welcome to
@@ -281,7 +287,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
       }
 
       // Make sure YARN is included in our build if we're trying to use it
-      if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && 
!Utils.isTesting) {
+      if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && 
!Utils.isTesting) {
         printErrorAndExit(
           "Could not load YARN classes. " +
           "This copy of Spark may not have been compiled with YARN support.")
@@ -363,10 +369,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
     args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, 
hadoopConf)).orNull
     args.archives = Option(args.archives).map(resolveGlobPaths(_, 
hadoopConf)).orNull
 
-    // This security manager will not need an auth secret, but set a dummy 
value in case
-    // spark.authenticate is enabled, otherwise an exception is thrown.
-    lazy val downloadConf = 
sparkConf.clone().set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
-    lazy val secMgr = new SecurityManager(downloadConf)
+    lazy val secMgr = new SecurityManager(sparkConf)
 
     // In client mode, download remote files.
     var localPrimaryResource: String = null
@@ -374,13 +377,13 @@ object SparkSubmit extends CommandLineUtils with Logging {
     var localPyFiles: String = null
     if (deployMode == CLIENT) {
       localPrimaryResource = Option(args.primaryResource).map {
-        downloadFile(_, targetDir, downloadConf, hadoopConf, secMgr)
+        downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
       }.orNull
       localJars = Option(args.jars).map {
-        downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr)
+        downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
       }.orNull
       localPyFiles = Option(args.pyFiles).map {
-        downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr)
+        downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
       }.orNull
     }
 
@@ -391,8 +394,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
     // For yarn client mode, since we already download them with above code, 
so we only need to
     // figure out the local path and replace the remote one.
     if (clusterManager == YARN) {
-      sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
-      val secMgr = new SecurityManager(sparkConf)
       val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
 
       def shouldDownload(scheme: String): Boolean = {
@@ -409,7 +410,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
             if (file.exists()) {
               file.toURI.toString
             } else {
-              downloadFile(resource, targetDir, downloadConf, hadoopConf, 
secMgr)
+              downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
             }
           case _ => uri.toString
         }
@@ -634,11 +635,11 @@ object SparkSubmit extends CommandLineUtils with Logging {
     // All Spark parameters are expected to be passed to the client through 
system properties.
     if (args.isStandaloneCluster) {
       if (args.useRest) {
-        childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
+        childMainClass = REST_CLUSTER_SUBMIT_CLASS
         childArgs += (args.primaryResource, args.mainClass)
       } else {
         // In legacy standalone cluster mode, use Client as a wrapper around 
the user class
-        childMainClass = "org.apache.spark.deploy.Client"
+        childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
         if (args.supervise) { childArgs += "--supervise" }
         Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
         Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
@@ -663,7 +664,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
 
     // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
     if (isYarnCluster) {
-      childMainClass = "org.apache.spark.deploy.yarn.Client"
+      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
       if (args.isPython) {
         childArgs += ("--primary-py-file", args.primaryResource)
         childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
@@ -684,7 +685,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
 
     if (isMesosCluster) {
       assert(args.useRest, "Mesos cluster mode is only supported through the 
REST submission API")
-      childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
+      childMainClass = REST_CLUSTER_SUBMIT_CLASS
       if (args.isPython) {
         // Second argument is main class
         childArgs += (args.primaryResource, "")

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 21cb941..742a958 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -32,6 +32,7 @@ import scala.util.control.NonFatal
 import com.fasterxml.jackson.core.JsonProcessingException
 
 import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, 
SparkException}
+import org.apache.spark.deploy.SparkApplication
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
@@ -398,9 +399,20 @@ private[spark] object RestSubmissionClient {
   val PROTOCOL_VERSION = "v1"
 
   /**
-   * Submit an application, assuming Spark parameters are specified through 
the given config.
-   * This is abstracted to its own method for testing purposes.
+   * Filter non-spark environment variables from any environment.
    */
+  private[rest] def filterSystemEnvironment(env: Map[String, String]): 
Map[String, String] = {
+    env.filterKeys { k =>
+      // SPARK_HOME is filtered out because it is usually wrong on the remote 
machine (SPARK-12345)
+      (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != "SPARK_HOME") 
||
+        k.startsWith("MESOS_")
+    }
+  }
+}
+
+private[spark] class RestSubmissionClientApp extends SparkApplication {
+
+  /** Submits a request to run the application and return the response. 
Visible for testing. */
   def run(
       appResource: String,
       mainClass: String,
@@ -417,7 +429,7 @@ private[spark] object RestSubmissionClient {
     client.createSubmission(submitRequest)
   }
 
-  def main(args: Array[String]): Unit = {
+  override def start(args: Array[String], conf: SparkConf): Unit = {
     if (args.length < 2) {
       sys.error("Usage: RestSubmissionClient [app resource] [main class] [app 
args*]")
       sys.exit(1)
@@ -425,19 +437,8 @@ private[spark] object RestSubmissionClient {
     val appResource = args(0)
     val mainClass = args(1)
     val appArgs = args.slice(2, args.length)
-    val conf = new SparkConf
-    val env = filterSystemEnvironment(sys.env)
+    val env = RestSubmissionClient.filterSystemEnvironment(sys.env)
     run(appResource, mainClass, appArgs, conf, env)
   }
 
-  /**
-   * Filter non-spark environment variables from any environment.
-   */
-  private[rest] def filterSystemEnvironment(env: Map[String, String]): 
Map[String, String] = {
-    env.filterKeys { k =>
-      // SPARK_HOME is filtered out because it is usually wrong on the remote 
machine (SPARK-12345)
-      (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != "SPARK_HOME") 
||
-        k.startsWith("MESOS_")
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index acefc9d..4c1f92a 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -220,7 +220,9 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       if (driverConf.contains("spark.yarn.credentials.file")) {
         logInfo("Will periodically update credentials from: " +
           driverConf.get("spark.yarn.credentials.file"))
-        SparkHadoopUtil.get.startCredentialUpdater(driverConf)
+        Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
+          .getMethod("startCredentialUpdater", classOf[SparkConf])
+          .invoke(null, driverConf)
       }
 
       cfg.hadoopDelegationCreds.foreach { tokens =>
@@ -236,7 +238,11 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
         env.rpcEnv.setupEndpoint("WorkerWatcher", new 
WorkerWatcher(env.rpcEnv, url))
       }
       env.rpcEnv.awaitTermination()
-      SparkHadoopUtil.get.stopCredentialUpdater()
+      if (driverConf.contains("spark.yarn.credentials.file")) {
+        Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
+          .getMethod("stopCredentialUpdater")
+          .invoke(null)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 930e09d..51bf916 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -50,6 +50,7 @@ import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
 import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.log4j.PropertyConfigurator
 import org.eclipse.jetty.util.MultiException
 import org.json4s._
@@ -59,6 +60,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, SerializerInstance}
 
@@ -2405,8 +2407,8 @@ private[spark] object Utils extends Logging {
    */
   def getSparkOrYarnConfig(conf: SparkConf, key: String, default: String): 
String = {
     val sparkValue = conf.get(key, default)
-    if (SparkHadoopUtil.get.isYarnMode) {
-      SparkHadoopUtil.get.newConfiguration(conf).get(key, sparkValue)
+    if (conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn") {
+      new 
YarnConfiguration(SparkHadoopUtil.get.newConfiguration(conf)).get(key, 
sparkValue)
     } else {
       sparkValue
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index 9801b26..cf59265 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -18,7 +18,13 @@
 package org.apache.spark
 
 import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.security.PrivilegedExceptionAction
 
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.security.GroupMappingServiceProvider
 import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}
 
@@ -411,8 +417,12 @@ class SecurityManagerSuite extends SparkFunSuite with 
ResetSystemProperties {
 
   test("missing secret authentication key") {
     val conf = new SparkConf().set("spark.authenticate", "true")
+    val mgr = new SecurityManager(conf)
+    intercept[IllegalArgumentException] {
+      mgr.getSecretKey()
+    }
     intercept[IllegalArgumentException] {
-      new SecurityManager(conf)
+      mgr.initializeAuth()
     }
   }
 
@@ -430,5 +440,24 @@ class SecurityManagerSuite extends SparkFunSuite with 
ResetSystemProperties {
     assert(keyFromEnv === new SecurityManager(conf2).getSecretKey())
   }
 
+  test("secret key generation in yarn mode") {
+    val conf = new SparkConf()
+      .set(NETWORK_AUTH_ENABLED, true)
+      .set(SparkLauncher.SPARK_MASTER, "yarn")
+    val mgr = new SecurityManager(conf)
+
+    UserGroupInformation.createUserForTesting("authTest", Array()).doAs(
+      new PrivilegedExceptionAction[Unit]() {
+        override def run(): Unit = {
+          mgr.initializeAuth()
+          val creds = UserGroupInformation.getCurrentUser().getCredentials()
+          val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
+          assert(secret != null)
+          assert(new String(secret, UTF_8) === mgr.getSecretKey())
+        }
+      }
+    )
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index d0a34c5..e200755 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -235,7 +235,7 @@ class SparkSubmitSuite
     childArgsStr should include ("--class org.SomeClass")
     childArgsStr should include ("--arg arg1 --arg arg2")
     childArgsStr should include regex ("--jar .*thejar.jar")
-    mainClass should be ("org.apache.spark.deploy.yarn.Client")
+    mainClass should be (SparkSubmit.YARN_CLUSTER_SUBMIT_CLASS)
 
     // In yarn cluster mode, also adding jars to classpath
     classpath(0) should endWith ("thejar.jar")
@@ -323,11 +323,11 @@ class SparkSubmitSuite
     val childArgsStr = childArgs.mkString(" ")
     if (useRest) {
       childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
-      mainClass should be ("org.apache.spark.deploy.rest.RestSubmissionClient")
+      mainClass should be (SparkSubmit.REST_CLUSTER_SUBMIT_CLASS)
     } else {
       childArgsStr should startWith ("--supervise --memory 4g --cores 5")
       childArgsStr should include regex "launch spark://h:p .*thejar.jar 
org.SomeClass arg1 arg2"
-      mainClass should be ("org.apache.spark.deploy.Client")
+      mainClass should be (SparkSubmit.STANDALONE_CLUSTER_SUBMIT_CLASS)
     }
     classpath should have size 0
     sys.props("SPARK_SUBMIT") should be ("true")
@@ -402,7 +402,7 @@ class SparkSubmitSuite
     conf.get("spark.executor.memory") should be ("5g")
     conf.get("spark.master") should be ("yarn")
     conf.get("spark.submit.deployMode") should be ("cluster")
-    mainClass should be ("org.apache.spark.deploy.yarn.Client")
+    mainClass should be (SparkSubmit.YARN_CLUSTER_SUBMIT_CLASS)
   }
 
   test("SPARK-21568 ConsoleProgressBar should be enabled only in shells") {

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 490baf0..e505bc0 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -92,7 +92,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     conf.set("spark.app.name", "dreamer")
     val appArgs = Array("one", "two", "six")
     // main method calls this
-    val response = RestSubmissionClient.run("app-resource", "main-class", 
appArgs, conf)
+    val response = new RestSubmissionClientApp().run("app-resource", 
"main-class", appArgs, conf)
     val submitResponse = getSubmitResponse(response)
     assert(submitResponse.action === 
Utils.getFormattedClassName(submitResponse))
     assert(submitResponse.serverSparkVersion === SPARK_VERSION)

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 5b8dcd0..9be01f6 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,12 @@ object MimaExcludes {
 
   // Exclude rules for 2.3.x
   lazy val v23excludes = v22excludes ++ Seq(
+    // SPARK-22372: Make cluster submission use SparkApplication.
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getSecretKeyFromUserCredentials"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.isYarnMode"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getCurrentUserCredentials"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.addSecretKeyToUserCredentials"),
+
     // SPARK-18085: Better History Server scalability for many / large 
applications
     
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"),

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ca0aa0e..b2576b0 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -56,11 +56,28 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   // TODO: Currently, task to container is computed once (TaskSetManager) - 
which need not be
   // optimal as more containers are available. Might need to handle this 
better.
 
-  private val sparkConf = new SparkConf()
-  private val yarnConf: YarnConfiguration = 
SparkHadoopUtil.get.newConfiguration(sparkConf)
-    .asInstanceOf[YarnConfiguration]
   private val isClusterMode = args.userClass != null
 
+  private val sparkConf = new SparkConf()
+  if (args.propertiesFile != null) {
+    Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
+      sparkConf.set(k, v)
+    }
+  }
+
+  private val securityMgr = new SecurityManager(sparkConf)
+
+  // Set system properties for each config entry. This covers two use cases:
+  // - The default configuration stored by the SparkHadoopUtil class
+  // - The user application creating a new SparkConf in cluster mode
+  //
+  // Both cases create a new SparkConf object which reads these configs from 
system properties.
+  sparkConf.getAll.foreach { case (k, v) =>
+    sys.props(k) = v
+  }
+
+  private val yarnConf = new 
YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
+
   private val ugi = {
     val original = UserGroupInformation.getCurrentUser()
 
@@ -311,7 +328,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
             val credentialManager = new YARNHadoopDelegationTokenManager(
               sparkConf,
               yarnConf,
-              conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, 
conf))
+              conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
 
             val credentialRenewer =
               new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
@@ -323,13 +340,10 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
         credentialRenewerThread.join()
       }
 
-      // Call this to force generation of secret so it gets populated into the 
Hadoop UGI.
-      val securityMgr = new SecurityManager(sparkConf)
-
       if (isClusterMode) {
-        runDriver(securityMgr)
+        runDriver()
       } else {
-        runExecutorLauncher(securityMgr)
+        runExecutorLauncher()
       }
     } catch {
       case e: Exception =>
@@ -410,8 +424,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
       _sparkConf: SparkConf,
       _rpcEnv: RpcEnv,
       driverRef: RpcEndpointRef,
-      uiAddress: Option[String],
-      securityMgr: SecurityManager) = {
+      uiAddress: Option[String]) = {
     val appId = client.getAttemptId().getApplicationId().toString()
     val attemptId = client.getAttemptId().getAttemptId().toString()
     val historyAddress =
@@ -463,7 +476,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
       YarnSchedulerBackend.ENDPOINT_NAME)
   }
 
-  private def runDriver(securityMgr: SecurityManager): Unit = {
+  private def runDriver(): Unit = {
     addAmIpFilter(None)
     userClassThread = startUserApplication()
 
@@ -479,7 +492,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
         val driverRef = createSchedulerRef(
           sc.getConf.get("spark.driver.host"),
           sc.getConf.get("spark.driver.port"))
-        registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), 
securityMgr)
+        registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl))
         registered = true
       } else {
         // Sanity check; should never happen in normal operation, since sc 
should only be null
@@ -498,15 +511,14 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     }
   }
 
-  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
+  private def runExecutorLauncher(): Unit = {
     val hostname = Utils.localHostName
     val amCores = sparkConf.get(AM_CORES)
     rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, 
securityMgr,
       amCores, true)
     val driverRef = waitForSparkDriver()
     addAmIpFilter(Some(driverRef))
-    registerAM(sparkConf, rpcEnv, driverRef, 
sparkConf.getOption("spark.driver.appUIAddress"),
-      securityMgr)
+    registerAM(sparkConf, rpcEnv, driverRef, 
sparkConf.getOption("spark.driver.appUIAddress"))
     registered = true
 
     // In client mode the actor will stop the reporter thread.
@@ -686,6 +698,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
       // TODO(davies): add R dependencies here
     }
+
     val mainMethod = userClassLoader.loadClass(args.userClass)
       .getMethod("main", classOf[Array[String]])
 
@@ -809,15 +822,6 @@ object ApplicationMaster extends Logging {
   def main(args: Array[String]): Unit = {
     SignalUtils.registerLogger(log)
     val amArgs = new ApplicationMasterArguments(args)
-
-    // Load the properties file with the Spark configuration and set entries 
as system properties,
-    // so that user code run inside the AM also has access to them.
-    // Note: we must do this before SparkHadoopUtil instantiated
-    if (amArgs.propertiesFile != null) {
-      Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) 
=>
-        sys.props(k) = v
-      }
-    }
     master = new ApplicationMaster(amArgs)
     System.exit(master.run())
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 99e7d46..3781b26 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -48,7 +48,7 @@ import 
org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
@@ -58,18 +58,14 @@ import org.apache.spark.util.{CallerContext, Utils}
 
 private[spark] class Client(
     val args: ClientArguments,
-    val hadoopConf: Configuration,
     val sparkConf: SparkConf)
   extends Logging {
 
   import Client._
   import YarnSparkHadoopUtil._
 
-  def this(clientArgs: ClientArguments, spConf: SparkConf) =
-    this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
-
   private val yarnClient = YarnClient.createYarnClient
-  private val yarnConf = new YarnConfiguration(hadoopConf)
+  private val hadoopConf = new 
YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
 
   private val isClusterMode = sparkConf.get("spark.submit.deployMode", 
"client") == "cluster"
 
@@ -125,7 +121,7 @@ private[spark] class Client(
   private val credentialManager = new YARNHadoopDelegationTokenManager(
     sparkConf,
     hadoopConf,
-    conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
+    conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
 
   def reportLauncherState(state: SparkAppHandle.State): Unit = {
     launcherBackend.setState(state)
@@ -134,8 +130,6 @@ private[spark] class Client(
   def stop(): Unit = {
     launcherBackend.close()
     yarnClient.stop()
-    // Unset YARN mode system env variable, to allow switching between cluster 
types.
-    System.clearProperty("SPARK_YARN_MODE")
   }
 
   /**
@@ -152,7 +146,7 @@ private[spark] class Client(
       // Setup the credentials before doing anything else,
       // so we have don't have issues at any point.
       setupCredentials()
-      yarnClient.init(yarnConf)
+      yarnClient.init(hadoopConf)
       yarnClient.start()
 
       logInfo("Requesting a new application from cluster with %d NodeManagers"
@@ -398,7 +392,7 @@ private[spark] class Client(
       if (SparkHadoopUtil.get.isProxyUser(currentUser)) {
         currentUser.addCredentials(credentials)
       }
-      logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
+      logDebug(SparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
     }
 
     // If we use principal and keytab to login, also credentials can be 
renewed some time
@@ -758,12 +752,14 @@ private[spark] class Client(
       // Save the YARN configuration into a separate file that will be 
overlayed on top of the
       // cluster's Hadoop conf.
       confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE))
-      yarnConf.writeXml(confStream)
+      hadoopConf.writeXml(confStream)
       confStream.closeEntry()
 
-      // Save Spark configuration to a file in the archive.
+      // Save Spark configuration to a file in the archive, but filter out the 
app's secret.
       val props = new Properties()
-      sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
+      sparkConf.getAll.foreach { case (k, v) =>
+        props.setProperty(k, v)
+      }
       // Override spark.yarn.key to point to the location in distributed cache 
which will be used
       // by AM.
       Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) 
}
@@ -786,8 +782,7 @@ private[spark] class Client(
       pySparkArchives: Seq[String]): HashMap[String, String] = {
     logInfo("Setting up the launch environment for our AM container")
     val env = new HashMap[String, String]()
-    populateClasspath(args, yarnConf, sparkConf, env, 
sparkConf.get(DRIVER_CLASS_PATH))
-    env("SPARK_YARN_MODE") = "true"
+    populateClasspath(args, hadoopConf, sparkConf, env, 
sparkConf.get(DRIVER_CLASS_PATH))
     env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
     env("SPARK_USER") = 
UserGroupInformation.getCurrentUser().getShortUserName()
     if (loginFromKeytab) {
@@ -861,6 +856,7 @@ private[spark] class Client(
       } else {
         Nil
       }
+
     val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
     val localResources = prepareLocalResources(appStagingDirPath, 
pySparkArchives)
 
@@ -991,7 +987,11 @@ private[spark] class Client(
     logDebug("YARN AM launch context:")
     logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
     logDebug("    env:")
-    launchEnv.foreach { case (k, v) => logDebug(s"        $k -> $v") }
+    if (log.isDebugEnabled) {
+      Utils.redact(sparkConf, launchEnv.toSeq).foreach { case (k, v) =>
+        logDebug(s"        $k -> $v")
+      }
+    }
     logDebug("    resources:")
     localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
     logDebug("    command:")
@@ -1185,24 +1185,6 @@ private[spark] class Client(
 
 private object Client extends Logging {
 
-  def main(argStrings: Array[String]) {
-    if (!sys.props.contains("SPARK_SUBMIT")) {
-      logWarning("WARNING: This client is deprecated and will be removed in a 
" +
-        "future version of Spark. Use ./bin/spark-submit with \"--master 
yarn\"")
-    }
-
-    // Set an env variable indicating we are running in YARN mode.
-    // Note that any env variable with the SPARK_ prefix gets propagated to 
all (remote) processes
-    System.setProperty("SPARK_YARN_MODE", "true")
-    val sparkConf = new SparkConf
-    // SparkSubmit would use yarn cache to distribute files & jars in yarn 
mode,
-    // so remove them from sparkConf here for yarn mode.
-    sparkConf.remove("spark.jars")
-    sparkConf.remove("spark.files")
-    val args = new ClientArguments(argStrings)
-    new Client(args, sparkConf).run()
-  }
-
   // Alias for the user jar
   val APP_JAR_NAME: String = "__app__.jar"
 
@@ -1506,3 +1488,16 @@ private object Client extends Logging {
   }
 
 }
+
+private[spark] class YarnClusterApplication extends SparkApplication {
+
+  override def start(args: Array[String], conf: SparkConf): Unit = {
+    // SparkSubmit would use yarn cache to distribute files & jars in yarn 
mode,
+    // so remove them from sparkConf here for yarn mode.
+    conf.remove("spark.jars")
+    conf.remove("spark.files")
+
+    new Client(new ClientArguments(args), conf).run()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 72f4d27..c1ae12a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -92,7 +92,7 @@ private[spark] class YarnRMClient extends Logging {
 
   /** Returns the attempt ID. */
   def getAttemptId(): ApplicationAttemptId = {
-    YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
+    YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
   }
 
   /** Returns the configuration for the AmIpFilter to add to the Spark UI. */

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 9c1472c..f406fab 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,21 +17,14 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util.regex.Matcher
-import java.util.regex.Pattern
+import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.mutable.{HashMap, ListBuffer}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.{JobConf, Master}
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, 
Priority}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@@ -43,87 +36,10 @@ import org.apache.spark.internal.config._
 import org.apache.spark.launcher.YarnCommandBuilderUtils
 import org.apache.spark.util.Utils
 
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-class YarnSparkHadoopUtil extends SparkHadoopUtil {
+object YarnSparkHadoopUtil {
 
   private var credentialUpdater: CredentialUpdater = _
 
-  override def transferCredentials(source: UserGroupInformation, dest: 
UserGroupInformation) {
-    dest.addCredentials(source.getCredentials())
-  }
-
-  // Note that all params which start with SPARK are propagated all the way 
through, so if in yarn
-  // mode, this MUST be set to true.
-  override def isYarnMode(): Boolean = { true }
-
-  // Return an appropriate (subclass) of Configuration. Creating a config 
initializes some Hadoop
-  // subsystems. Always create a new config, don't reuse yarnConf.
-  override def newConfiguration(conf: SparkConf): Configuration = {
-    val hadoopConf = new YarnConfiguration(super.newConfiguration(conf))
-    hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE)
-    hadoopConf
-  }
-
-  // Add any user credentials to the job conf which are necessary for running 
on a secure Hadoop
-  // cluster
-  override def addCredentials(conf: JobConf) {
-    val jobCreds = conf.getCredentials()
-    jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
-  }
-
-  override def addSecretKeyToUserCredentials(key: String, secret: String) {
-    val creds = new Credentials()
-    creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))
-    addCurrentUserCredentials(creds)
-  }
-
-  override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
-    val credentials = getCurrentUserCredentials()
-    if (credentials != null) credentials.getSecretKey(new Text(key)) else null
-  }
-
-  private[spark] override def startCredentialUpdater(sparkConf: SparkConf): 
Unit = {
-    val hadoopConf = newConfiguration(sparkConf)
-    val credentialManager = new YARNHadoopDelegationTokenManager(
-      sparkConf,
-      hadoopConf,
-      conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
-    credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, 
credentialManager)
-    credentialUpdater.start()
-  }
-
-  private[spark] override def stopCredentialUpdater(): Unit = {
-    if (credentialUpdater != null) {
-      credentialUpdater.stop()
-      credentialUpdater = null
-    }
-  }
-
-  private[spark] def getContainerId: ContainerId = {
-    val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
-    ConverterUtils.toContainerId(containerIdString)
-  }
-
-  /** The filesystems for which YARN should fetch delegation tokens. */
-  private[spark] def hadoopFSsToAccess(
-      sparkConf: SparkConf,
-      hadoopConf: Configuration): Set[FileSystem] = {
-    val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS)
-      .map(new Path(_).getFileSystem(hadoopConf))
-      .toSet
-
-    val stagingFS = sparkConf.get(STAGING_DIR)
-      .map(new Path(_).getFileSystem(hadoopConf))
-      .getOrElse(FileSystem.get(hadoopConf))
-
-    filesystemsToAccess + stagingFS
-  }
-}
-
-object YarnSparkHadoopUtil {
   // Additional memory overhead
   // 10% was arrived at experimentally. In the interest of minimizing memory 
waste while covering
   // the common cases. Memory overhead tends to grow with container size.
@@ -137,14 +53,6 @@ object YarnSparkHadoopUtil {
   // request types (like map/reduce in hadoop for example)
   val RM_REQUEST_PRIORITY = Priority.newInstance(1)
 
-  def get: YarnSparkHadoopUtil = {
-    val yarnMode = java.lang.Boolean.parseBoolean(
-      System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
-    if (!yarnMode) {
-      throw new SparkException("YarnSparkHadoopUtil is not available in 
non-YARN mode!")
-    }
-    SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil]
-  }
   /**
    * Add a path variable to the given environment map.
    * If the map already contains this key, append the value to the existing 
value instead.
@@ -277,5 +185,42 @@ object YarnSparkHadoopUtil {
         securityMgr.getModifyAclsGroups)
     )
   }
-}
 
+  def getContainerId: ContainerId = {
+    val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+    ConverterUtils.toContainerId(containerIdString)
+  }
+
+  /** The filesystems for which YARN should fetch delegation tokens. */
+  def hadoopFSsToAccess(
+      sparkConf: SparkConf,
+      hadoopConf: Configuration): Set[FileSystem] = {
+    val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS)
+      .map(new Path(_).getFileSystem(hadoopConf))
+      .toSet
+
+    val stagingFS = sparkConf.get(STAGING_DIR)
+      .map(new Path(_).getFileSystem(hadoopConf))
+      .getOrElse(FileSystem.get(hadoopConf))
+
+    filesystemsToAccess + stagingFS
+  }
+
+  def startCredentialUpdater(sparkConf: SparkConf): Unit = {
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+    val credentialManager = new YARNHadoopDelegationTokenManager(
+      sparkConf,
+      hadoopConf,
+      conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
+    credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, 
credentialManager)
+    credentialUpdater.start()
+  }
+
+  def stopCredentialUpdater(): Unit = {
+    if (credentialUpdater != null) {
+      credentialUpdater.stop()
+      credentialUpdater = null
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
index 6134757..eaf2cff 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -62,7 +62,7 @@ private[yarn] class AMCredentialRenewer(
   private val credentialRenewerThread: ScheduledExecutorService =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = YarnSparkHadoopUtil.get
+  private val hadoopUtil = SparkHadoopUtil.get
 
   private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
   private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index b722cc4..0c6206e 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -66,7 +66,7 @@ private[spark] class YarnClientSchedulerBackend(
     // reads the credentials from HDFS, just like the executors and updates 
its own credentials
     // cache.
     if (conf.contains("spark.yarn.credentials.file")) {
-      YarnSparkHadoopUtil.get.startCredentialUpdater(conf)
+      YarnSparkHadoopUtil.startCredentialUpdater(conf)
     }
     monitorThread = asyncMonitorApplication()
     monitorThread.start()
@@ -153,7 +153,7 @@ private[spark] class YarnClientSchedulerBackend(
     client.reportLauncherState(SparkAppHandle.State.FINISHED)
 
     super.stop()
-    YarnSparkHadoopUtil.get.stopCredentialUpdater()
+    YarnSparkHadoopUtil.stopCredentialUpdater()
     client.stop()
     logInfo("Stopped")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index e2d477b..62bf981 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -41,7 +41,7 @@ private[spark] class YarnClusterSchedulerBackend(
     var driverLogs: Option[Map[String, String]] = None
     try {
       val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
-      val containerId = YarnSparkHadoopUtil.get.getContainerId
+      val containerId = YarnSparkHadoopUtil.getContainerId
 
       val httpAddress = System.getenv(Environment.NM_HOST.name()) +
         ":" + System.getenv(Environment.NM_HTTP_PORT.name())

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 9c3b18e..ac67f21 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -62,18 +62,14 @@ abstract class BaseYarnClusterSuite
   protected var hadoopConfDir: File = _
   private var logConfDir: File = _
 
-  var oldSystemProperties: Properties = null
-
   def newYarnConfig(): YarnConfiguration
 
   override def beforeAll() {
     super.beforeAll()
-    oldSystemProperties = SerializationUtils.clone(System.getProperties)
 
     tempDir = Utils.createTempDir()
     logConfDir = new File(tempDir, "log4j")
     logConfDir.mkdir()
-    System.setProperty("SPARK_YARN_MODE", "true")
 
     val logConfFile = new File(logConfDir, "log4j.properties")
     Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8)
@@ -124,7 +120,6 @@ abstract class BaseYarnClusterSuite
     try {
       yarnCluster.stop()
     } finally {
-      System.setProperties(oldSystemProperties)
       super.afterAll()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 6cf6842..9d5f5eb 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -24,7 +24,6 @@ import java.util.Properties
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap => MutableHashMap}
 
-import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.MRJobConfig
@@ -36,34 +35,18 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
 import org.mockito.Matchers.{eq => meq, _}
 import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterAll, Matchers}
+import org.scalatest.Matchers
 
 import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}
+import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
-class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
-  with ResetSystemProperties {
+class ClientSuite extends SparkFunSuite with Matchers {
 
   import Client._
 
   var oldSystemProperties: Properties = null
 
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    oldSystemProperties = SerializationUtils.clone(System.getProperties)
-    System.setProperty("SPARK_YARN_MODE", "true")
-  }
-
-  override def afterAll(): Unit = {
-    try {
-      System.setProperties(oldSystemProperties)
-      oldSystemProperties = null
-    } finally {
-      super.afterAll()
-    }
-  }
-
   test("default Yarn application classpath") {
     getDefaultYarnApplicationClasspath should be(Fixtures.knownDefYarnAppCP)
   }
@@ -185,7 +168,6 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
   }
 
   test("configuration and args propagate through 
createApplicationSubmissionContext") {
-    val conf = new Configuration()
     // When parsing tags, duplicates and leading/trailing whitespace should be 
removed.
     // Spaces between non-comma strings should be preserved as single tags. 
Empty strings may or
     // may not be removed depending on the version of Hadoop being used.
@@ -200,7 +182,7 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
     val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
     val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
 
-    val client = new Client(args, conf, sparkConf)
+    val client = new Client(args, sparkConf)
     client.createApplicationSubmissionContext(
       new YarnClientApplication(getNewApplicationResponse, appContext),
       containerLaunchContext)
@@ -407,15 +389,14 @@ class ClientSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterAll
 
   private def createClient(
       sparkConf: SparkConf,
-      conf: Configuration = new Configuration(),
       args: Array[String] = Array()): Client = {
     val clientArgs = new ClientArguments(args)
-    spy(new Client(clientArgs, conf, sparkConf))
+    spy(new Client(clientArgs, sparkConf))
   }
 
   private def classpath(client: Client): Array[String] = {
     val env = new MutableHashMap[String, String]()
-    populateClasspath(null, client.hadoopConf, client.sparkConf, env)
+    populateClasspath(null, new Configuration(), client.sparkConf, env)
     classpath(env)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d5de190..ab0005d 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -115,8 +115,13 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       ))
   }
 
-  test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") {
-    testYarnAppUseSparkHadoopUtilConf()
+  test("yarn-cluster should respect conf overrides in SparkHadoopUtil 
(SPARK-16414)") {
+    val result = File.createTempFile("result", null, tempDir)
+    val finalState = runSpark(false,
+      mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass),
+      appArgs = Seq("key=value", result.getAbsolutePath()),
+      extraConf = Map("spark.hadoop.key" -> "value"))
+    checkResult(finalState, result)
   }
 
   test("run Spark in yarn-client mode with additional jar") {
@@ -216,15 +221,6 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     checkResult(finalState, result)
   }
 
-  private def testYarnAppUseSparkHadoopUtilConf(): Unit = {
-    val result = File.createTempFile("result", null, tempDir)
-    val finalState = runSpark(false,
-      mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass),
-      appArgs = Seq("key=value", result.getAbsolutePath()),
-      extraConf = Map("spark.hadoop.key" -> "value"))
-    checkResult(finalState, result)
-  }
-
   private def testWithAddJar(clientMode: Boolean): Unit = {
     val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> 
"ORIGINAL"), tempDir)
     val driverResult = File.createTempFile("driver", null, tempDir)
@@ -424,7 +420,7 @@ private object YarnClusterDriver extends Logging with 
Matchers {
             s"Driver logs contain sensitive info (${SECRET_PASSWORD}): 
\n${log} "
           )
         }
-        val containerId = YarnSparkHadoopUtil.get.getContainerId
+        val containerId = YarnSparkHadoopUtil.getContainerId
         val user = Utils.getCurrentUserName()
         
assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096"))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index a057618..f21353a 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -71,14 +71,10 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with 
Matchers with Logging
 
   test("Yarn configuration override") {
     val key = "yarn.nodemanager.hostname"
-    val default = new YarnConfiguration()
-
     val sparkConf = new SparkConf()
       .set("spark.hadoop." + key, "someHostName")
-    val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf)
-
-    yarnConf.getClass() should be (classOf[YarnConfiguration])
-    yarnConf.get(key) should not be default.get(key)
+    val yarnConf = new 
YarnConfiguration(SparkHadoopUtil.get.newConfiguration(sparkConf))
+    yarnConf.get(key) should be ("someHostName")
   }
 
 
@@ -145,45 +141,4 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with 
Matchers with Logging
 
   }
 
-  test("check different hadoop utils based on env variable") {
-    try {
-      System.setProperty("SPARK_YARN_MODE", "true")
-      assert(SparkHadoopUtil.get.getClass === classOf[YarnSparkHadoopUtil])
-      System.setProperty("SPARK_YARN_MODE", "false")
-      assert(SparkHadoopUtil.get.getClass === classOf[SparkHadoopUtil])
-    } finally {
-      System.clearProperty("SPARK_YARN_MODE")
-    }
-  }
-
-
-
-  // This test needs to live here because it depends on isYarnMode returning 
true, which can only
-  // happen in the YARN module.
-  test("security manager token generation") {
-    try {
-      System.setProperty("SPARK_YARN_MODE", "true")
-      val initial = SparkHadoopUtil.get
-        .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
-      assert(initial === null || initial.length === 0)
-
-      val conf = new SparkConf()
-        .set(SecurityManager.SPARK_AUTH_CONF, "true")
-        .set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
-      val sm = new SecurityManager(conf)
-
-      val generated = SparkHadoopUtil.get
-        .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
-      assert(generated != null)
-      val genString = new Text(generated).toString()
-      assert(genString != "unused")
-      assert(sm.getSecretKey() === genString)
-    } finally {
-      // removeSecretKey() was only added in Hadoop 2.6, so instead we just 
set the secret
-      // to an empty string.
-      
SparkHadoopUtil.get.addSecretKeyToUserCredentials(SecurityManager.SECRET_LOOKUP_KEY,
 "")
-      System.clearProperty("SPARK_YARN_MODE")
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1dd03e4/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
index c918998..3c7cdc0 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -31,24 +31,15 @@ class YARNHadoopDelegationTokenManagerSuite extends 
SparkFunSuite with Matchers
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-
-    System.setProperty("SPARK_YARN_MODE", "true")
-
     sparkConf = new SparkConf()
     hadoopConf = new Configuration()
   }
 
-  override def afterAll(): Unit = {
-    super.afterAll()
-
-    System.clearProperty("SPARK_YARN_MODE")
-  }
-
   test("Correctly loads credential providers") {
     credentialManager = new YARNHadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
+      conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
 
     credentialManager.credentialProviders.get("yarn-test") should not be (None)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to