Repository: spark
Updated Branches:
  refs/heads/master af3b81607 -> 68dde3481


[SPARK-23781][CORE] Merge token renewer functionality into 
HadoopDelegationTokenManager.

This avoids having two classes to deal with tokens; now the above
class is a one-stop shop for dealing with delegation tokens. The
YARN backend extends that class instead of doing composition like
before, resulting in a bit less code there too.

The renewer functionality is basically the same code that used to
be in YARN's AMCredentialRenewer. That is also the reason why the
public API of HadoopDelegationTokenManager is a little bit odd;
the YARN AM has some odd requirements for how this all should be
initialized, and the weirdness is needed currently to support that.

Tested:
- YARN with stress app for DT renewal
- Mesos and K8S with basic kerberos tests (both tgt and keytab)

Closes #22624 from vanzin/SPARK-23781.

Authored-by: Marcelo Vanzin <van...@cloudera.com>
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68dde348
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68dde348
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68dde348

Branch: refs/heads/master
Commit: 68dde3481ea458b0b8deeec2f99233c2d4c1e056
Parents: af3b816
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Oct 31 13:00:10 2018 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Wed Oct 31 13:00:10 2018 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |   4 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  14 -
 .../security/HadoopDelegationTokenManager.scala | 278 ++++++++++++++-----
 .../HadoopFSDelegationTokenProvider.scala       |   5 +-
 .../apache/spark/internal/config/package.scala  |   4 +
 .../cluster/CoarseGrainedSchedulerBackend.scala |  40 ++-
 .../HadoopDelegationTokenManagerSuite.scala     | 142 +++-------
 .../spark/deploy/k8s/KubernetesConf.scala       |   3 +-
 .../hadooputils/HadoopKerberosLogin.scala       |  10 +-
 ...KubernetesHadoopDelegationTokenManager.scala |  35 +--
 .../MesosCoarseGrainedSchedulerBackend.scala    |  19 +-
 .../MesosHadoopDelegationTokenManager.scala     | 160 -----------
 .../spark/deploy/yarn/ApplicationMaster.scala   |  24 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |   2 +-
 .../org/apache/spark/deploy/yarn/config.scala   |   4 -
 .../yarn/security/AMCredentialRenewer.scala     | 177 ------------
 .../YARNHadoopDelegationTokenManager.scala      |  48 ++--
 .../cluster/YarnSchedulerBackend.scala          |   5 +-
 .../YARNHadoopDelegationTokenManagerSuite.scala |   5 +-
 19 files changed, 355 insertions(+), 624 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 5166543..8537c53 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -731,7 +731,9 @@ private[spark] object SparkConf extends Logging {
     KEYTAB.key -> Seq(
       AlternateConfig("spark.yarn.keytab", "3.0")),
     PRINCIPAL.key -> Seq(
-      AlternateConfig("spark.yarn.principal", "3.0"))
+      AlternateConfig("spark.yarn.principal", "3.0")),
+    KERBEROS_RELOGIN_PERIOD.key -> Seq(
+      AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0"))
   )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 78a7cf6..5979151 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -414,20 +414,6 @@ object SparkHadoopUtil {
   def get: SparkHadoopUtil = instance
 
   /**
-   * Given an expiration date for the current set of credentials, calculate 
the time when new
-   * credentials should be created.
-   *
-   * @param expirationDate Drop-dead expiration date
-   * @param conf Spark configuration
-   * @return Timestamp when new credentials should be created.
-   */
-  private[spark] def nextCredentialRenewalTime(expirationDate: Long, conf: 
SparkConf): Long = {
-    val ct = System.currentTimeMillis
-    val ratio = conf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
-    (ct + (ratio * (expirationDate - ct))).toLong
-  }
-
-  /**
    * Returns a Configuration object with Spark configuration applied on top. 
Unlike
    * the instance method, this will always return a Configuration instance, 
and not a
    * cluster manager-specific type.

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index ab8d8d9..10cd874 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -17,76 +17,158 @@
 
 package org.apache.spark.deploy.security
 
+import java.io.File
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
+import org.apache.spark.util.ThreadUtils
 
 /**
- * Manages all the registered HadoopDelegationTokenProviders and offer APIs 
for other modules to
- * obtain delegation tokens and their renewal time. By default 
[[HadoopFSDelegationTokenProvider]],
- * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will 
be loaded in if not
- * explicitly disabled.
+ * Manager for delegation tokens in a Spark application.
+ *
+ * This manager has two modes of operation:
+ *
+ * 1.  When configured with a principal and a keytab, it will make sure 
long-running apps can run
+ * without interruption while accessing secured services. It periodically logs 
in to the KDC with
+ * user-provided credentials, and contacts all the configured secure services 
to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * Because the Hadoop UGI API does not expose the TTL of the TGT, a 
configuration controls how often
+ * to check that a relogin is necessary. This is done reasonably often since 
the check is a no-op
+ * when the relogin is not yet needed. The check period can be overridden in 
the configuration.
  *
- * Also, each HadoopDelegationTokenProvider is controlled by
- * spark.security.credentials.{service}.enabled, and will not be loaded if 
this config is set to
- * false. For example, Hive's delegation token provider 
[[HiveDelegationTokenProvider]] can be
- * enabled/disabled by the configuration 
spark.security.credentials.hive.enabled.
+ * New delegation tokens are created once 75% of the renewal interval of the 
original tokens has
+ * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
+ * The driver is tasked with distributing the tokens to other processes that 
might need them.
  *
- * @param sparkConf Spark configuration
- * @param hadoopConf Hadoop configuration
- * @param fileSystems Delegation tokens will be fetched for these Hadoop 
filesystems.
+ * 2. When operating without an explicit principal and keytab, token renewal 
will not be available.
+ * Starting the manager will distribute an initial set of delegation tokens to 
the provided Spark
+ * driver, but the app will not get new tokens when those expire.
+ *
+ * It can also be used just to create delegation tokens, by calling the 
`obtainDelegationTokens`
+ * method. This option does not require calling the `start` method, but leaves 
it up to the
+ * caller to distribute the tokens that were generated.
  */
 private[spark] class HadoopDelegationTokenManager(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration,
-    fileSystems: Configuration => Set[FileSystem])
-  extends Logging {
+    protected val sparkConf: SparkConf,
+    protected val hadoopConf: Configuration) extends Logging {
 
   private val deprecatedProviderEnabledConfigs = List(
     "spark.yarn.security.tokens.%s.enabled",
     "spark.yarn.security.credentials.%s.enabled")
   private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
 
-  // Maintain all the registered delegation token providers
-  private val delegationTokenProviders = getDelegationTokenProviders
+  private val principal = sparkConf.get(PRINCIPAL).orNull
+  private val keytab = sparkConf.get(KEYTAB).orNull
+
+  require((principal == null) == (keytab == null),
+    "Both principal and keytab must be defined, or neither.")
+  require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at 
$keytab.")
+
+  private val delegationTokenProviders = loadProviders()
   logDebug("Using the following builtin delegation token providers: " +
     s"${delegationTokenProviders.keys.mkString(", ")}.")
 
-  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop 
filesystem */
-  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
-    this(
-      sparkConf,
-      hadoopConf,
-      hadoopConf => 
Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
+  private var renewalExecutor: ScheduledExecutorService = _
+  private val driverRef = new AtomicReference[RpcEndpointRef]()
+
+  /** Set the endpoint used to send tokens to the driver. */
+  def setDriverRef(ref: RpcEndpointRef): Unit = {
+    driverRef.set(ref)
   }
 
-  private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
-    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
-      safeCreateProvider(new HiveDelegationTokenProvider) ++
-      safeCreateProvider(new HBaseDelegationTokenProvider)
+  /** @return Whether delegation token renewal is enabled. */
+  def renewalEnabled: Boolean = principal != null
 
-    // Filter out providers for which 
spark.security.credentials.{service}.enabled is false.
-    providers
-      .filter { p => isServiceEnabled(p.serviceName) }
-      .map { p => (p.serviceName, p) }
-      .toMap
+  /**
+   * Start the token renewer. Requires a principal and keytab. Upon start, the 
renewer will:
+   *
+   * - log in the configured principal, and set up a task to keep that user's 
ticket renewed
+   * - obtain delegation tokens from all available providers
+   * - send the tokens to the driver, if it's already registered
+   * - schedule a periodic task to update the tokens when needed.
+   *
+   * @return The newly logged in user.
+   */
+  def start(): UserGroupInformation = {
+    require(renewalEnabled, "Token renewal must be enabled to start the 
renewer.")
+    renewalExecutor =
+      ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
+    val ugi = doLogin()
+
+    val tgtRenewalTask = new Runnable() {
+      override def run(): Unit = {
+        ugi.checkTGTAndReloginFromKeytab()
+      }
+    }
+    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
+    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, 
tgtRenewalPeriod,
+      TimeUnit.SECONDS)
+
+    val creds = obtainTokensAndScheduleRenewal(ugi)
+    ugi.addCredentials(creds)
+
+    val driver = driverRef.get()
+    if (driver != null) {
+      val tokens = SparkHadoopUtil.get.serialize(creds)
+      driver.send(UpdateDelegationTokens(tokens))
+    }
+
+    // Transfer the original user's tokens to the new user, since it may 
contain needed tokens
+    // (such as those user to connect to YARN). Explicitly avoid overwriting 
tokens that already
+    // exist in the current user's credentials, since those were freshly 
obtained above
+    // (see SPARK-23361).
+    val existing = ugi.getCredentials()
+    existing.mergeAll(originalCreds)
+    ugi.addCredentials(existing)
+    ugi
   }
 
-  private def safeCreateProvider(
-      createFn: => HadoopDelegationTokenProvider): 
Option[HadoopDelegationTokenProvider] = {
-    try {
-      Some(createFn)
-    } catch {
-      case t: Throwable =>
-        logDebug(s"Failed to load built in provider.", t)
-        None
+  def stop(): Unit = {
+    if (renewalExecutor != null) {
+      renewalExecutor.shutdown()
     }
   }
 
-  def isServiceEnabled(serviceName: String): Boolean = {
+  /**
+   * Fetch new delegation tokens for configured services, storing them in the 
given credentials.
+   * Tokens are fetched for the current logged in user.
+   *
+   * @param creds Credentials object where to store the delegation tokens.
+   * @return The time by which the tokens must be renewed.
+   */
+  def obtainDelegationTokens(creds: Credentials): Long = {
+    delegationTokenProviders.values.flatMap { provider =>
+      if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
+        provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
+      } else {
+        logDebug(s"Service ${provider.serviceName} does not require a token." +
+          s" Check your configuration to see if security is disabled or not.")
+        None
+      }
+    }.foldLeft(Long.MaxValue)(math.min)
+  }
+
+  // Visible for testing.
+  def isProviderLoaded(serviceName: String): Boolean = {
+    delegationTokenProviders.contains(serviceName)
+  }
+
+  protected def isServiceEnabled(serviceName: String): Boolean = {
     val key = providerEnabledConfig.format(serviceName)
 
     deprecatedProviderEnabledConfigs.foreach { pattern =>
@@ -110,32 +192,104 @@ private[spark] class HadoopDelegationTokenManager(
   }
 
   /**
-   * Get delegation token provider for the specified service.
+   * List of file systems for which to obtain delegation tokens. The base 
implementation
+   * returns just the default file system in the given Hadoop configuration.
    */
-  def getServiceDelegationTokenProvider(service: String): 
Option[HadoopDelegationTokenProvider] = {
-    delegationTokenProviders.get(service)
+  protected def fileSystemsToAccess(): Set[FileSystem] = {
+    Set(FileSystem.get(hadoopConf))
+  }
+
+  private def scheduleRenewal(delay: Long): Unit = {
+    val _delay = math.max(0, delay)
+    logInfo(s"Scheduling login from keytab in 
${UIUtils.formatDuration(delay)}.")
+
+    val renewalTask = new Runnable() {
+      override def run(): Unit = {
+        updateTokensTask()
+      }
+    }
+    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
   }
 
   /**
-   * Writes delegation tokens to creds.  Delegation tokens are fetched from 
all registered
-   * providers.
-   *
-   * @param hadoopConf hadoop Configuration
-   * @param creds Credentials that will be updated in place (overwritten)
-   * @return Time after which the fetched delegation tokens should be renewed.
+   * Periodic task to login to the KDC and create new delegation tokens. 
Re-schedules itself
+   * to fetch the next set of tokens when needed.
    */
-  def obtainDelegationTokens(
-      hadoopConf: Configuration,
-      creds: Credentials): Long = {
-    delegationTokenProviders.values.flatMap { provider =>
-      if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
-        provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
+  private def updateTokensTask(): Unit = {
+    try {
+      val freshUGI = doLogin()
+      val creds = obtainTokensAndScheduleRenewal(freshUGI)
+      val tokens = SparkHadoopUtil.get.serialize(creds)
+
+      val driver = driverRef.get()
+      if (driver != null) {
+        logInfo("Updating delegation tokens.")
+        driver.send(UpdateDelegationTokens(tokens))
       } else {
-        logDebug(s"Service ${provider.serviceName} does not require a token." +
-          s" Check your configuration to see if security is disabled or not.")
-        None
+        // This shouldn't really happen, since the driver should register way 
before tokens expire.
+        logWarning("Delegation tokens close to expiration but no driver has 
registered yet.")
+        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
       }
-    }.foldLeft(Long.MaxValue)(math.min)
+    } catch {
+      case e: Exception =>
+        val delay = 
TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
+        logWarning(s"Failed to update tokens, will try again in 
${UIUtils.formatDuration(delay)}!" +
+          " If this happens too often tasks will fail.", e)
+        scheduleRenewal(delay)
+    }
+  }
+
+  /**
+   * Obtain new delegation tokens from the available providers. Schedules a 
new task to fetch
+   * new tokens before the new set expires.
+   *
+   * @return Credentials containing the new tokens.
+   */
+  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): 
Credentials = {
+    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
+      override def run(): Credentials = {
+        val creds = new Credentials()
+        val nextRenewal = obtainDelegationTokens(creds)
+
+        // Calculate the time when new credentials should be created, based on 
the configured
+        // ratio.
+        val now = System.currentTimeMillis
+        val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
+        val delay = (ratio * (nextRenewal - now)).toLong
+        scheduleRenewal(delay)
+        creds
+      }
+    })
+  }
+
+  private def doLogin(): UserGroupInformation = {
+    logInfo(s"Attempting to login to KDC using principal: $principal")
+    val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
keytab)
+    logInfo("Successfully logged into KDC.")
+    ugi
+  }
+
+  private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
+    val providers = Seq(new 
HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++
+      safeCreateProvider(new HiveDelegationTokenProvider) ++
+      safeCreateProvider(new HBaseDelegationTokenProvider)
+
+    // Filter out providers for which 
spark.security.credentials.{service}.enabled is false.
+    providers
+      .filter { p => isServiceEnabled(p.serviceName) }
+      .map { p => (p.serviceName, p) }
+      .toMap
   }
-}
 
+  private def safeCreateProvider(
+      createFn: => HadoopDelegationTokenProvider): 
Option[HadoopDelegationTokenProvider] = {
+    try {
+      Some(createFn)
+    } catch {
+      case t: Throwable =>
+        logDebug(s"Failed to load built in provider.", t)
+        None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 21ca669..767b552 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 
-private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: 
Configuration => Set[FileSystem])
+private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => 
Set[FileSystem])
     extends HadoopDelegationTokenProvider with Logging {
 
   // This tokenRenewalInterval will be set in the first call to 
obtainDelegationTokens.
@@ -44,8 +44,7 @@ private[deploy] class 
HadoopFSDelegationTokenProvider(fileSystems: Configuration
       hadoopConf: Configuration,
       sparkConf: SparkConf,
       creds: Credentials): Option[Long] = {
-
-    val fsToGetTokens = fileSystems(hadoopConf)
+    val fsToGetTokens = fileSystems()
     val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), 
fsToGetTokens, creds)
 
     // Get the token renewal interval if it is not set. It will only be called 
once.

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 356cf9e..034e5eb 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -179,6 +179,10 @@ package object config {
     .doc("Name of the Kerberos principal.")
     .stringConf.createOptional
 
+  private[spark] val KERBEROS_RELOGIN_PERIOD = 
ConfigBuilder("spark.kerberos.relogin.period")
+    .timeConf(TimeUnit.SECONDS)
+    .createWithDefaultString("1m")
+
   private[spark] val EXECUTOR_INSTANCES = 
ConfigBuilder("spark.executor.instances")
     .intConf
     .createOptional

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index de7c0d8..329158a 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -18,13 +18,17 @@
 package org.apache.spark.scheduler.cluster
 
 import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.concurrent.Future
 
+import org.apache.hadoop.security.UserGroupInformation
+
 import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, 
TaskState}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
@@ -95,6 +99,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // The num of current max ExecutorId used to re-register appMaster
   @volatile protected var currentExecutorIdCounter = 0
 
+  // Current set of delegation tokens to send to executors.
+  private val delegationTokens = new AtomicReference[Array[Byte]]()
+
+  // The token manager used to create security tokens.
+  private var delegationTokenManager: Option[HadoopDelegationTokenManager] = 
None
+
   private val reviveThread =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
 
@@ -152,6 +162,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         }
 
       case UpdateDelegationTokens(newDelegationTokens) =>
+        SparkHadoopUtil.get.addDelegationTokens(newDelegationTokens, conf)
+        delegationTokens.set(newDelegationTokens)
         executorDataMap.values.foreach { ed =>
           ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
         }
@@ -230,7 +242,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         val reply = SparkAppConfig(
           sparkProperties,
           SparkEnv.get.securityManager.getIOEncryptionKey(),
-          fetchHadoopDelegationTokens())
+          Option(delegationTokens.get()))
         context.reply(reply)
     }
 
@@ -390,6 +402,21 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
     // TODO (prashant) send conf instead of properties
     driverEndpoint = createDriverEndpointRef(properties)
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      delegationTokenManager = createTokenManager()
+      delegationTokenManager.foreach { dtm =>
+        dtm.setDriverRef(driverEndpoint)
+        val creds = if (dtm.renewalEnabled) {
+          dtm.start().getCredentials()
+        } else {
+          val creds = UserGroupInformation.getCurrentUser().getCredentials()
+          dtm.obtainDelegationTokens(creds)
+          creds
+        }
+        delegationTokens.set(SparkHadoopUtil.get.serialize(creds))
+      }
+    }
   }
 
   protected def createDriverEndpointRef(
@@ -416,6 +443,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   override def stop() {
     reviveThread.shutdownNow()
     stopExecutors()
+    delegationTokenManager.foreach(_.stop())
     try {
       if (driverEndpoint != null) {
         driverEndpoint.askSync[Boolean](StopDriver)
@@ -684,7 +712,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     true
   }
 
-  protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None }
+  /**
+   * Create the delegation token manager to be used for the application. This 
method is called
+   * once during the start of the scheduler backend (so after the object has 
already been
+   * fully constructed), only if security is enabled in the Hadoop 
configuration.
+   */
+  protected def createTokenManager(): Option[HadoopDelegationTokenManager] = 
None
+
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index 2849a10..e0e630e 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -21,94 +21,36 @@ import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.security.Credentials
-import org.scalatest.Matchers
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.util.Utils
 
-class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
-  private var delegationTokenManager: HadoopDelegationTokenManager = null
-  private var sparkConf: SparkConf = null
-  private var hadoopConf: Configuration = null
+class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
+  private val hadoopConf = new Configuration()
 
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-
-    sparkConf = new SparkConf()
-    hadoopConf = new Configuration()
-  }
-
-  test("Correctly load default credential providers") {
-    delegationTokenManager = new HadoopDelegationTokenManager(
-      sparkConf,
-      hadoopConf,
-      hadoopFSsToAccess)
-
-    delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") 
should not be (None)
-    delegationTokenManager.getServiceDelegationTokenProvider("hbase") should 
not be (None)
-    delegationTokenManager.getServiceDelegationTokenProvider("hive") should 
not be (None)
-    delegationTokenManager.getServiceDelegationTokenProvider("bogus") should 
be (None)
+  test("default configuration") {
+    val manager = new HadoopDelegationTokenManager(new SparkConf(false), 
hadoopConf)
+    assert(manager.isProviderLoaded("hadoopfs"))
+    assert(manager.isProviderLoaded("hbase"))
+    assert(manager.isProviderLoaded("hive"))
   }
 
   test("disable hive credential provider") {
-    sparkConf.set("spark.security.credentials.hive.enabled", "false")
-    delegationTokenManager = new HadoopDelegationTokenManager(
-      sparkConf,
-      hadoopConf,
-      hadoopFSsToAccess)
-
-    delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") 
should not be (None)
-    delegationTokenManager.getServiceDelegationTokenProvider("hbase") should 
not be (None)
-    delegationTokenManager.getServiceDelegationTokenProvider("hive") should be 
(None)
+    val sparkConf = new 
SparkConf(false).set("spark.security.credentials.hive.enabled", "false")
+    val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
+    assert(manager.isProviderLoaded("hadoopfs"))
+    assert(manager.isProviderLoaded("hbase"))
+    assert(!manager.isProviderLoaded("hive"))
   }
 
   test("using deprecated configurations") {
-    sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
-    sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
-    delegationTokenManager = new HadoopDelegationTokenManager(
-      sparkConf,
-      hadoopConf,
-      hadoopFSsToAccess)
-
-    delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") 
should be (None)
-    delegationTokenManager.getServiceDelegationTokenProvider("hive") should be 
(None)
-    delegationTokenManager.getServiceDelegationTokenProvider("hbase") should 
not be (None)
-  }
-
-  test("verify no credentials are obtained") {
-    delegationTokenManager = new HadoopDelegationTokenManager(
-      sparkConf,
-      hadoopConf,
-      hadoopFSsToAccess)
-    val creds = new Credentials()
-
-    // Tokens cannot be obtained from HDFS, Hive, HBase in unit tests.
-    delegationTokenManager.obtainDelegationTokens(hadoopConf, creds)
-    val tokens = creds.getAllTokens
-    tokens.size() should be (0)
-  }
-
-  test("obtain tokens For HiveMetastore") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("hive.metastore.kerberos.principal", "bob")
-    // thrift picks up on port 0 and bails out, without trying to talk to 
endpoint
-    hadoopConf.set("hive.metastore.uris", "http://localhost:0";)
-
-    val hiveCredentialProvider = new HiveDelegationTokenProvider()
-    val credentials = new Credentials()
-    hiveCredentialProvider.obtainDelegationTokens(hadoopConf, sparkConf, 
credentials)
-
-    credentials.getAllTokens.size() should be (0)
-  }
-
-  test("Obtain tokens For HBase") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("hbase.security.authentication", "kerberos")
-
-    val hbaseTokenProvider = new HBaseDelegationTokenProvider()
-    val creds = new Credentials()
-    hbaseTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
-
-    creds.getAllTokens.size should be (0)
+    val sparkConf = new SparkConf(false)
+      .set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
+      .set("spark.yarn.security.credentials.hive.enabled", "false")
+    val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
+    assert(!manager.isProviderLoaded("hadoopfs"))
+    assert(manager.isProviderLoaded("hbase"))
+    assert(!manager.isProviderLoaded("hive"))
   }
 
   test("SPARK-23209: obtain tokens when Hive classes are not available") {
@@ -123,43 +65,41 @@ class HadoopDelegationTokenManagerSuite extends 
SparkFunSuite with Matchers {
           throw new ClassNotFoundException(name)
         }
 
-        if (name.startsWith("java") || name.startsWith("scala")) {
-          currentLoader.loadClass(name)
-        } else {
-          val classFileName = name.replaceAll("\\.", "/") + ".class"
-          val in = currentLoader.getResourceAsStream(classFileName)
-          if (in != null) {
-            val bytes = IOUtils.toByteArray(in)
-            defineClass(name, bytes, 0, bytes.length)
-          } else {
-            throw new ClassNotFoundException(name)
-          }
+        val prefixBlacklist = Seq("java", "scala", "com.sun.", "sun.")
+        if (prefixBlacklist.exists(name.startsWith(_))) {
+          return currentLoader.loadClass(name)
         }
+
+        val found = findLoadedClass(name)
+        if (found != null) {
+          return found
+        }
+
+        val classFileName = name.replaceAll("\\.", "/") + ".class"
+        val in = currentLoader.getResourceAsStream(classFileName)
+        if (in != null) {
+          val bytes = IOUtils.toByteArray(in)
+          return defineClass(name, bytes, 0, bytes.length)
+        }
+
+        throw new ClassNotFoundException(name)
       }
     }
 
-    try {
-      Thread.currentThread().setContextClassLoader(noHive)
+    Utils.withContextClassLoader(noHive) {
       val test = 
noHive.loadClass(NoHiveTest.getClass.getName().stripSuffix("$"))
       test.getMethod("runTest").invoke(null)
-    } finally {
-      Thread.currentThread().setContextClassLoader(currentLoader)
     }
   }
-
-  private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): 
Set[FileSystem] = {
-    Set(FileSystem.get(hadoopConf))
-  }
 }
 
 /** Test code for SPARK-23209 to avoid using too much reflection above. */
-private object NoHiveTest extends Matchers {
+private object NoHiveTest {
 
   def runTest(): Unit = {
     try {
-      val manager = new HadoopDelegationTokenManager(new SparkConf(), new 
Configuration(),
-        _ => Set())
-      manager.getServiceDelegationTokenProvider("hive") should be (None)
+      val manager = new HadoopDelegationTokenManager(new SparkConf(), new 
Configuration())
+      require(!manager.isProviderLoaded("hive"))
     } catch {
       case e: Throwable =>
         // Throw a better exception in case the test fails, since there may be 
a lot of nesting.

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 3e30ab2..066547d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Constants._
 import 
org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config.ConfigEntry
 
 
@@ -79,7 +78,7 @@ private[spark] case class KubernetesConf[T <: 
KubernetesRoleSpecificConf](
   def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"
 
   def tokenManager(conf: SparkConf, hConf: Configuration): 
KubernetesHadoopDelegationTokenManager =
-    new KubernetesHadoopDelegationTokenManager(new 
HadoopDelegationTokenManager(conf, hConf))
+    new KubernetesHadoopDelegationTokenManager(conf, hConf)
 
   def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
index 67a5849..0022d8f 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
@@ -38,16 +38,14 @@ private[spark] object HadoopKerberosLogin {
       submissionSparkConf: SparkConf,
       kubernetesResourceNamePrefix: String,
       tokenManager: KubernetesHadoopDelegationTokenManager): 
KerberosConfigSpec = {
-    val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf)
     // The JobUserUGI will be taken fom the Local Ticket Cache or via 
keytab+principal
     // The login happens in the SparkSubmit so login logic is not necessary to 
include
     val jobUserUGI = tokenManager.getCurrentUser
     val originalCredentials = jobUserUGI.getCredentials
-    val (tokenData, renewalInterval) = tokenManager.getDelegationTokens(
-      originalCredentials,
-      submissionSparkConf,
-      hadoopConf)
-    require(tokenData.nonEmpty, "Did not obtain any delegation tokens")
+    tokenManager.obtainDelegationTokens(originalCredentials)
+
+    val tokenData = SparkHadoopUtil.get.serialize(originalCredentials)
+
     val initialTokenDataKeyName = KERBEROS_SECRET_KEY
     val newSecretName = 
s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME"
     val secretDT =

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
index 135e2c4..3e98d58 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
@@ -18,45 +18,20 @@
 package org.apache.spark.deploy.k8s.security
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.internal.Logging
 
 /**
- * The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens
- * on the behalf of the Kubernetes submission client. The new credentials
- * (called Tokens when they are serialized) are stored in Secrets accessible
- * to the driver and executors, when new Tokens are received they overwrite 
the current Secrets.
+ * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
  */
 private[spark] class KubernetesHadoopDelegationTokenManager(
-    tokenManager: HadoopDelegationTokenManager) extends Logging {
+    _sparkConf: SparkConf,
+    _hadoopConf: Configuration)
+  extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {
 
-  // HadoopUGI Util methods
   def getCurrentUser: UserGroupInformation = 
UserGroupInformation.getCurrentUser
-  def getShortUserName: String = getCurrentUser.getShortUserName
-  def getFileSystem(hadoopConf: Configuration): FileSystem = 
FileSystem.get(hadoopConf)
   def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
-  def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): 
UserGroupInformation =
-    UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
-  def serializeCreds(creds: Credentials): Array[Byte] = 
SparkHadoopUtil.get.serialize(creds)
-  def nextRT(rt: Long, conf: SparkConf): Long = 
SparkHadoopUtil.nextCredentialRenewalTime(rt, conf)
 
-  def getDelegationTokens(
-      creds: Credentials,
-      conf: SparkConf,
-      hadoopConf: Configuration): (Array[Byte], Long) = {
-    try {
-      val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
-      logDebug(s"Initialized tokens")
-      (serializeCreds(creds), nextRT(rt, conf))
-    } catch {
-      case e: Exception =>
-        logError(s"Failed to fetch Hadoop delegation tokens $e")
-        throw e
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index bac0246..f586665 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -26,12 +26,12 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.Future
 
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.SchedulerDriver
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkContext, 
SparkException, TaskState}
 import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
@@ -60,9 +60,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
     with org.apache.mesos.Scheduler with MesosSchedulerUtils {
 
-  private lazy val hadoopDelegationTokenManager: 
MesosHadoopDelegationTokenManager =
-    new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, 
driverEndpoint)
-
   // Blacklist a slave after this many failures
   private val MAX_SLAVE_FAILURES = 2
 
@@ -678,7 +675,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     launcherBackend.close()
   }
 
-  private def stopSchedulerBackend() {
+  private def stopSchedulerBackend(): Unit = {
     // Make sure we're not launching tasks during shutdown
     stateLock.synchronized {
       if (stopCalled) {
@@ -777,6 +774,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     }
   }
 
+  override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
+    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
+  }
+
   private def numExecutors(): Int = {
     slaves.values.map(_.taskIDs.size).sum
   }
@@ -789,14 +790,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       offer.getHostname
     }
   }
-
-  override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
-    if (UserGroupInformation.isSecurityEnabled) {
-      Some(hadoopDelegationTokenManager.getTokens())
-    } else {
-      None
-    }
-  }
 }
 
 private class Slave(val hostname: String) {

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
deleted file mode 100644
index a1bf4f0..0000000
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.internal.{config, Logging}
-import org.apache.spark.rpc.RpcEndpointRef
-import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
-import org.apache.spark.ui.UIUtils
-import org.apache.spark.util.ThreadUtils
-
-
-/**
- * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation 
tokens on the behalf
- * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
- * and similarly will renew the Credentials when 75% of the renewal interval 
has passed.
- * The principal difference is that instead of writing the new credentials to 
HDFS and
- * incrementing the timestamp of the file, the new credentials (called Tokens 
when they are
- * serialized) are broadcast to all running executors. On the executor side, 
when new Tokens are
- * received they overwrite the current credentials.
- */
-private[spark] class MesosHadoopDelegationTokenManager(
-    conf: SparkConf,
-    hadoopConfig: Configuration,
-    driverEndpoint: RpcEndpointRef)
-  extends Logging {
-
-  require(driverEndpoint != null, "DriverEndpoint is not initialized")
-
-  private val credentialRenewerThread: ScheduledExecutorService =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
-
-  private val tokenManager: HadoopDelegationTokenManager =
-    new HadoopDelegationTokenManager(conf, hadoopConfig)
-
-  private val principal: String = conf.get(config.PRINCIPAL).orNull
-
-  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
-    try {
-      val creds = UserGroupInformation.getCurrentUser.getCredentials
-      val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
-      val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
-      logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
-      (SparkHadoopUtil.get.serialize(creds), 
SparkHadoopUtil.nextCredentialRenewalTime(rt, conf))
-    } catch {
-      case e: Exception =>
-        logError(s"Failed to fetch Hadoop delegation tokens $e")
-        throw e
-    }
-  }
-
-  private val keytabFile: Option[String] = conf.get(config.KEYTAB)
-
-  scheduleTokenRenewal()
-
-  private def scheduleTokenRenewal(): Unit = {
-    if (keytabFile.isDefined) {
-      require(principal != null, "Principal is required for Keytab-based 
authentication")
-      logInfo(s"Using keytab: ${keytabFile.get} and principal $principal")
-    } else {
-      logInfo("Using ticket cache for Kerberos authentication, no token 
renewal.")
-      return
-    }
-
-    def scheduleRenewal(runnable: Runnable): Unit = {
-      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
-      if (remainingTime <= 0) {
-        logInfo("Credentials have expired, creating new ones now.")
-        runnable.run()
-      } else {
-        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
-        credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
-      }
-    }
-
-    val credentialRenewerRunnable =
-      new Runnable {
-        override def run(): Unit = {
-          try {
-            getNewDelegationTokens()
-            broadcastDelegationTokens(tokens)
-          } catch {
-            case e: Exception =>
-              // Log the error and try to write new tokens back in an hour
-              val delay = 
TimeUnit.SECONDS.toMillis(conf.get(config.CREDENTIALS_RENEWAL_RETRY_WAIT))
-              logWarning(
-                s"Couldn't broadcast tokens, trying again in 
${UIUtils.formatDuration(delay)}", e)
-              credentialRenewerThread.schedule(this, delay, 
TimeUnit.MILLISECONDS)
-              return
-          }
-          scheduleRenewal(this)
-        }
-      }
-    scheduleRenewal(credentialRenewerRunnable)
-  }
-
-  private def getNewDelegationTokens(): Unit = {
-    logInfo(s"Attempting to login to KDC with principal ${principal}")
-    // Get new delegation tokens by logging in with a new UGI inspired by 
AMCredentialRenewer.scala
-    // Don't protect against keytabFile being empty because it's guarded above.
-    val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
keytabFile.get)
-    logInfo("Successfully logged into KDC")
-    val tempCreds = ugi.getCredentials
-    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
-    val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] {
-      override def run(): Long = {
-        tokenManager.obtainDelegationTokens(hadoopConf, tempCreds)
-      }
-    })
-
-    val currTime = System.currentTimeMillis()
-    timeOfNextRenewal = if (nextRenewalTime <= currTime) {
-      logWarning(s"Next credential renewal time ($nextRenewalTime) is earlier 
than " +
-        s"current time ($currTime), which is unexpected, please check your 
credential renewal " +
-        "related configurations in the target services.")
-      currTime
-    } else {
-      SparkHadoopUtil.nextCredentialRenewalTime(nextRenewalTime, conf)
-    }
-    logInfo(s"Time of next renewal is in ${timeOfNextRenewal - 
System.currentTimeMillis()} ms")
-
-    // Add the temp credentials back to the original ones.
-    UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
-    // update tokens for late or dynamically added executors
-    tokens = SparkHadoopUtil.get.serialize(tempCreds)
-  }
-
-  private def broadcastDelegationTokens(tokens: Array[Byte]) = {
-    logInfo("Sending new tokens to all executors")
-    driverEndpoint.send(UpdateDelegationTokens(tokens))
-  }
-
-  def getTokens(): Array[Byte] = {
-    tokens
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8f94e3f..c1f3211 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -41,7 +41,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.security.AMCredentialRenewer
+import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.metrics.MetricsSystem
@@ -99,20 +99,18 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     }
   }
 
-  private val credentialRenewer: Option[AMCredentialRenewer] = 
sparkConf.get(KEYTAB).map { _ =>
-    new AMCredentialRenewer(sparkConf, yarnConf)
+  private val tokenManager: Option[YARNHadoopDelegationTokenManager] = {
+    sparkConf.get(KEYTAB).map { _ =>
+      new YARNHadoopDelegationTokenManager(sparkConf, yarnConf)
+    }
   }
 
-  private val ugi = credentialRenewer match {
-    case Some(cr) =>
+  private val ugi = tokenManager match {
+    case Some(tm) =>
       // Set the context class loader so that the token renewer has access to 
jars distributed
       // by the user.
-      val currentLoader = Thread.currentThread().getContextClassLoader()
-      Thread.currentThread().setContextClassLoader(userClassLoader)
-      try {
-        cr.start()
-      } finally {
-        Thread.currentThread().setContextClassLoader(currentLoader)
+      Utils.withContextClassLoader(userClassLoader) {
+        tm.start()
       }
 
     case _ =>
@@ -380,7 +378,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
           userClassThread.interrupt()
         }
         if (!inShutdown) {
-          credentialRenewer.foreach(_.stop())
+          tokenManager.foreach(_.stop())
         }
       }
     }
@@ -440,7 +438,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
       securityMgr,
       localResources)
 
-    credentialRenewer.foreach(_.setDriverRef(driverRef))
+    tokenManager.foreach(_.setDriverRef(driverRef))
 
     // Initialize the AM endpoint *after* the allocator has been initialized. 
This ensures
     // that when the driver sends an initial executor request (e.g. after an 
AM restart),

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 49b7f62..6240f7b 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -310,7 +310,7 @@ private[spark] class Client(
   private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
     val credentials = UserGroupInformation.getCurrentUser().getCredentials()
     val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, 
hadoopConf)
-    credentialManager.obtainDelegationTokens(hadoopConf, credentials)
+    credentialManager.obtainDelegationTokens(credentials)
 
     // When using a proxy user, copy the delegation tokens to the user's 
credentials. Avoid
     // that for regular users, since in those case the user already has access 
to the TGT,

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index f2ed555..b257d8f 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -325,10 +325,6 @@ package object config {
     .stringConf
     .createOptional
 
-  private[spark] val KERBEROS_RELOGIN_PERIOD = 
ConfigBuilder("spark.yarn.kerberos.relogin.period")
-    .timeConf(TimeUnit.SECONDS)
-    .createWithDefaultString("1m")
-
   // The list of cache-related config entries. This is used by Client and the 
AM to clean
   // up the environment so that these settings do not appear on the web UI.
   private[yarn] val CACHE_CONFIGS = Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
deleted file mode 100644
index bc8d47d..0000000
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.yarn.security
-
-import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
-import java.util.concurrent.atomic.AtomicReference
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.rpc.RpcEndpointRef
-import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
-import org.apache.spark.ui.UIUtils
-import org.apache.spark.util.ThreadUtils
-
-/**
- * A manager tasked with periodically updating delegation tokens needed by the 
application.
- *
- * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
- * without interruption while accessing secured services. It periodically logs 
in to the KDC with
- * user-provided credentials, and contacts all the configured secure services 
to obtain delegation
- * tokens to be distributed to the rest of the application.
- *
- * This class will manage the kerberos login, by renewing the TGT when needed. 
Because the UGI API
- * does not expose the TTL of the TGT, a configuration controls how often to 
check that a relogin is
- * necessary. This is done reasonably often since the check is a no-op when 
the relogin is not yet
- * needed. The check period can be overridden in the configuration.
- *
- * New delegation tokens are created once 75% of the renewal interval of the 
original tokens has
- * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
- * The driver is tasked with distributing the tokens to other processes that 
might need them.
- */
-private[yarn] class AMCredentialRenewer(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration) extends Logging {
-
-  private val principal = sparkConf.get(PRINCIPAL).get
-  private val keytab = sparkConf.get(KEYTAB).get
-  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
-
-  private val renewalExecutor: ScheduledExecutorService =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
-
-  private val driverRef = new AtomicReference[RpcEndpointRef]()
-
-  private val renewalTask = new Runnable() {
-    override def run(): Unit = {
-      updateTokensTask()
-    }
-  }
-
-  def setDriverRef(ref: RpcEndpointRef): Unit = {
-    driverRef.set(ref)
-  }
-
-  /**
-   * Start the token renewer. Upon start, the renewer will:
-   *
-   * - log in the configured user, and set up a task to keep that user's 
ticket renewed
-   * - obtain delegation tokens from all available providers
-   * - schedule a periodic task to update the tokens when needed.
-   *
-   * @return The newly logged in user.
-   */
-  def start(): UserGroupInformation = {
-    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
-    val ugi = doLogin()
-
-    val tgtRenewalTask = new Runnable() {
-      override def run(): Unit = {
-        ugi.checkTGTAndReloginFromKeytab()
-      }
-    }
-    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
-    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, 
tgtRenewalPeriod,
-      TimeUnit.SECONDS)
-
-    val creds = obtainTokensAndScheduleRenewal(ugi)
-    ugi.addCredentials(creds)
-
-    // Transfer the original user's tokens to the new user, since that's 
needed to connect to
-    // YARN. Explicitly avoid overwriting tokens that already exist in the 
current user's
-    // credentials, since those were freshly obtained above (see SPARK-23361).
-    val existing = ugi.getCredentials()
-    existing.mergeAll(originalCreds)
-    ugi.addCredentials(existing)
-
-    ugi
-  }
-
-  def stop(): Unit = {
-    renewalExecutor.shutdown()
-  }
-
-  private def scheduleRenewal(delay: Long): Unit = {
-    val _delay = math.max(0, delay)
-    logInfo(s"Scheduling login from keytab in 
${UIUtils.formatDuration(delay)}.")
-    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
-  }
-
-  /**
-   * Periodic task to login to the KDC and create new delegation tokens. 
Re-schedules itself
-   * to fetch the next set of tokens when needed.
-   */
-  private def updateTokensTask(): Unit = {
-    try {
-      val freshUGI = doLogin()
-      val creds = obtainTokensAndScheduleRenewal(freshUGI)
-      val tokens = SparkHadoopUtil.get.serialize(creds)
-
-      val driver = driverRef.get()
-      if (driver != null) {
-        logInfo("Updating delegation tokens.")
-        driver.send(UpdateDelegationTokens(tokens))
-      } else {
-        // This shouldn't really happen, since the driver should register way 
before tokens expire
-        // (or the AM should time out the application).
-        logWarning("Delegation tokens close to expiration but no driver has 
registered yet.")
-        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
-      }
-    } catch {
-      case e: Exception =>
-        val delay = 
TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
-        logWarning(s"Failed to update tokens, will try again in 
${UIUtils.formatDuration(delay)}!" +
-          " If this happens too often tasks will fail.", e)
-        scheduleRenewal(delay)
-    }
-  }
-
-  /**
-   * Obtain new delegation tokens from the available providers. Schedules a 
new task to fetch
-   * new tokens before the new set expires.
-   *
-   * @return Credentials containing the new tokens.
-   */
-  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): 
Credentials = {
-    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
-      override def run(): Credentials = {
-        val creds = new Credentials()
-        val nextRenewal = credentialManager.obtainDelegationTokens(hadoopConf, 
creds)
-
-        val timeToWait = 
SparkHadoopUtil.nextCredentialRenewalTime(nextRenewal, sparkConf) -
-          System.currentTimeMillis()
-        scheduleRenewal(timeToWait)
-        creds
-      }
-    })
-  }
-
-  private def doLogin(): UserGroupInformation = {
-    logInfo(s"Attempting to login to KDC using principal: $principal")
-    val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
keytab)
-    logInfo("Successfully logged into KDC.")
-    ugi
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
index 26a2e5d..2d9a3f0 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
@@ -22,12 +22,13 @@ import java.util.ServiceLoader
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.security.Credentials
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.Utils
 
 /**
@@ -36,27 +37,25 @@ import org.apache.spark.util.Utils
  * in [[HadoopDelegationTokenManager]].
  */
 private[yarn] class YARNHadoopDelegationTokenManager(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration) extends Logging {
+    _sparkConf: SparkConf,
+    _hadoopConf: Configuration)
+  extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {
 
-  private val delegationTokenManager = new 
HadoopDelegationTokenManager(sparkConf, hadoopConf,
-    conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
-
-  // public for testing
-  val credentialProviders = getCredentialProviders
+  private val credentialProviders = {
+    ServiceLoader.load(classOf[ServiceCredentialProvider], 
Utils.getContextOrSparkClassLoader)
+      .asScala
+      .toList
+      .filter { p => isServiceEnabled(p.serviceName) }
+      .map { p => (p.serviceName, p) }
+      .toMap
+  }
   if (credentialProviders.nonEmpty) {
     logDebug("Using the following YARN-specific credential providers: " +
       s"${credentialProviders.keys.mkString(", ")}.")
   }
 
-  /**
-   * Writes delegation tokens to creds.  Delegation tokens are fetched from 
all registered
-   * providers.
-   *
-   * @return Time after which the fetched delegation tokens should be renewed.
-   */
-  def obtainDelegationTokens(hadoopConf: Configuration, creds: Credentials): 
Long = {
-    val superInterval = 
delegationTokenManager.obtainDelegationTokens(hadoopConf, creds)
+  override def obtainDelegationTokens(creds: Credentials): Long = {
+    val superInterval = super.obtainDelegationTokens(creds)
 
     credentialProviders.values.flatMap { provider =>
       if (provider.credentialsRequired(hadoopConf)) {
@@ -69,18 +68,13 @@ private[yarn] class YARNHadoopDelegationTokenManager(
     }.foldLeft(superInterval)(math.min)
   }
 
-  private def getCredentialProviders: Map[String, ServiceCredentialProvider] = 
{
-    val providers = loadCredentialProviders
-
-    providers.
-      filter { p => delegationTokenManager.isServiceEnabled(p.serviceName) }
-      .map { p => (p.serviceName, p) }
-      .toMap
+  // For testing.
+  override def isProviderLoaded(serviceName: String): Boolean = {
+    credentialProviders.contains(serviceName) || 
super.isProviderLoaded(serviceName)
   }
 
-  private def loadCredentialProviders: List[ServiceCredentialProvider] = {
-    ServiceLoader.load(classOf[ServiceCredentialProvider], 
Utils.getContextOrSparkClassLoader)
-      .asScala
-      .toList
+  override protected def fileSystemsToAccess(): Set[FileSystem] = {
+    YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, hadoopConf)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 63bea3e..67c36aa 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -19,16 +19,14 @@ package org.apache.spark.scheduler.cluster
 
 import java.util.concurrent.atomic.{AtomicBoolean}
 
-import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
 
 import org.apache.spark.SparkContext
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
@@ -270,7 +268,6 @@ private[spark] abstract class YarnSchedulerBackend(
       case u @ UpdateDelegationTokens(tokens) =>
         // Add the tokens to the current user and send a message to the 
scheduler so that it
         // notifies all registered executors of the new tokens.
-        SparkHadoopUtil.get.addDelegationTokens(tokens, sc.conf)
         driverEndpoint.send(u)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/68dde348/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
index 9fa749b..98315e4 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -19,11 +19,10 @@ package org.apache.spark.deploy.yarn.security
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.Credentials
-import org.scalatest.Matchers
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 
-class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with 
Matchers {
+class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite {
   private var credentialManager: YARNHadoopDelegationTokenManager = null
   private var sparkConf: SparkConf = null
   private var hadoopConf: Configuration = null
@@ -36,7 +35,7 @@ class YARNHadoopDelegationTokenManagerSuite extends 
SparkFunSuite with Matchers
 
   test("Correctly loads credential providers") {
     credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, 
hadoopConf)
-    credentialManager.credentialProviders.get("yarn-test") should not be (None)
+    assert(credentialManager.isProviderLoaded("yarn-test"))
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to