Repository: spark Updated Branches: refs/heads/master 2014e7a78 -> 1e8233541
[SPARK-21842][MESOS] Support Kerberos ticket renewal and creation in Mesos ## What changes were proposed in this pull request? tl;dr: Add a class, `MesosHadoopDelegationTokenManager` that updates delegation tokens on a schedule on the behalf of Spark Drivers. Broadcast renewed credentials to the executors. ## The problem We recently added Kerberos support to Mesos-based Spark jobs as well as Secrets support to the Mesos Dispatcher (SPARK-16742, SPARK-20812, respectively). However the delegation tokens have a defined expiration. This poses a problem for long running Spark jobs (e.g. Spark Streaming applications). YARN has a solution for this where a thread is scheduled to renew the tokens they reach 75% of their way to expiration. It then writes the tokens to HDFS for the executors to find (uses a monotonically increasing suffix). ## This solution We replace the current method in `CoarseGrainedSchedulerBackend` which used to discard the token renewal time with a protected method `fetchHadoopDelegationTokens`. Now the individual cluster backends are responsible for overriding this method to fetch and manage token renewal. The delegation tokens themselves, are still part of the `CoarseGrainedSchedulerBackend` as before. In the case of Mesos renewed Credentials are broadcasted to the executors. This maintains all transfer of Credentials within Spark (as opposed to Spark-to-HDFS). It also does not require any writing of Credentials to disk. It also does not require any GC of old files. ## How was this patch tested? Manually against a Kerberized HDFS cluster. Thank you for the reviews. Author: ArtRand <ar...@soe.ucsc.edu> Closes #19272 from ArtRand/spark-21842-450-kerberos-ticket-renewal. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e823354 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e823354 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e823354 Branch: refs/heads/master Commit: 1e82335413bc2384073ead0d6d581c862036d0f5 Parents: 2014e7a Author: ArtRand <ar...@soe.ucsc.edu> Authored: Wed Nov 15 15:53:05 2017 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Wed Nov 15 15:53:05 2017 -0800 ---------------------------------------------------------------------- .../apache/spark/deploy/SparkHadoopUtil.scala | 28 +++- .../security/HadoopDelegationTokenManager.scala | 3 + .../executor/CoarseGrainedExecutorBackend.scala | 9 +- .../cluster/CoarseGrainedClusterMessage.scala | 3 + .../cluster/CoarseGrainedSchedulerBackend.scala | 30 +--- .../cluster/mesos/MesosClusterManager.scala | 2 +- .../MesosCoarseGrainedSchedulerBackend.scala | 19 ++- .../MesosHadoopDelegationTokenManager.scala | 157 +++++++++++++++++++ .../yarn/security/AMCredentialRenewer.scala | 21 +-- 9 files changed, 228 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 1fa10ab..17c7319 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -140,13 +140,24 @@ class SparkHadoopUtil extends Logging { if (!new File(keytabFilename).exists()) { throw new SparkException(s"Keytab file: ${keytabFilename} does not exist") } else { - logInfo("Attempting to login to Kerberos" + - s" using principal: ${principalName} and keytab: ${keytabFilename}") + logInfo("Attempting to login to Kerberos " + + s"using principal: ${principalName} and keytab: ${keytabFilename}") UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } } /** + * Add or overwrite current user's credentials with serialized delegation tokens, + * also confirms correct hadoop configuration is set. + */ + private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) { + UserGroupInformation.setConfiguration(newConfiguration(sparkConf)) + val creds = deserialize(tokens) + logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}") + addCurrentUserCredentials(creds) + } + + /** * Returns a function that can be called to find Hadoop FileSystem bytes read. If * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will * return the bytes read on r since t. @@ -463,6 +474,19 @@ object SparkHadoopUtil { } /** + * Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the date + * when a given fraction of the duration until the expiration date has passed. + * Formula: current time + (fraction * (time until expiration)) + * @param expirationDate Drop-dead expiration date + * @param fraction fraction of the time until expiration return + * @return Date when the fraction of the time until expiration has passed + */ + private[spark] def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = { + val ct = System.currentTimeMillis + (ct + (fraction * (expirationDate - ct))).toLong + } + + /** * Returns a Configuration object with Spark configuration applied on top. Unlike * the instance method, this will always return a Configuration instance, and not a * cluster manager-specific type. http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 483d0de..116a686 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -109,6 +109,8 @@ private[spark] class HadoopDelegationTokenManager( * Writes delegation tokens to creds. Delegation tokens are fetched from all registered * providers. * + * @param hadoopConf hadoop Configuration + * @param creds Credentials that will be updated in place (overwritten) * @return Time after which the fetched delegation tokens should be renewed. */ def obtainDelegationTokens( @@ -125,3 +127,4 @@ private[spark] class HadoopDelegationTokenManager( }.foldLeft(Long.MaxValue)(math.min) } } + http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index d27362a..acefc9d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -123,6 +123,10 @@ private[spark] class CoarseGrainedExecutorBackend( executor.stop() } }.start() + + case UpdateDelegationTokens(tokenBytes) => + logInfo(s"Received tokens of ${tokenBytes.length} bytes") + SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -219,9 +223,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.startCredentialUpdater(driverConf) } - cfg.hadoopDelegationCreds.foreach { hadoopCreds => - val creds = SparkHadoopUtil.get.deserialize(hadoopCreds) - SparkHadoopUtil.get.addCurrentUserCredentials(creds) + cfg.hadoopDelegationCreds.foreach { tokens => + SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf) } val env = SparkEnv.createExecutorEnv( http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 5d65731..e8b7fc0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -54,6 +54,9 @@ private[spark] object CoarseGrainedClusterMessages { case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage with RegisterExecutorResponse + case class UpdateDelegationTokens(tokens: Array[Byte]) + extends CoarseGrainedClusterMessage + // Executors to driver case class RegisterExecutor( executorId: String, http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 424e43b..22d9c4c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -24,11 +24,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future -import org.apache.hadoop.security.UserGroupInformation - import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -99,12 +95,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 - // hadoop token manager used by some sub-classes (e.g. Mesos) - def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None - - // Hadoop delegation tokens to be sent to the executors. - val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -159,6 +149,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) } + + case UpdateDelegationTokens(newDelegationTokens) => + executorDataMap.values.foreach { ed => + ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens)) + } } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -236,7 +231,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val reply = SparkAppConfig( sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), - hadoopDelegationCreds) + fetchHadoopDelegationTokens()) context.reply(reply) } @@ -686,18 +681,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp true } - protected def getHadoopDelegationCreds(): Option[Array[Byte]] = { - if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) { - hadoopDelegationTokenManager.map { manager => - val creds = UserGroupInformation.getCurrentUser.getCredentials - val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - manager.obtainDelegationTokens(hadoopConf, creds) - SparkHadoopUtil.get.serialize(creds) - } - } else { - None - } - } + protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None } } private[spark] object CoarseGrainedSchedulerBackend { http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala index 911a085..da71f8f 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.SparkContext import org.apache.spark.internal.config._ import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 104ed01..c392061 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -22,15 +22,16 @@ import java.util.{Collections, List => JList} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.ReentrantLock -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} -import org.apache.mesos.SchedulerDriver import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future +import org.apache.hadoop.security.UserGroupInformation +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} +import org.apache.mesos.SchedulerDriver + import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.config._ -import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf @@ -58,8 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with org.apache.mesos.Scheduler with MesosSchedulerUtils { - override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = - Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) + private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager = + new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint) // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 @@ -772,6 +773,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( offer.getHostname } } + + override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { + if (UserGroupInformation.isSecurityEnabled) { + Some(hadoopDelegationTokenManager.getTokens()) + } else { + None + } + } } private class Slave(val hostname: String) { http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala new file mode 100644 index 0000000..325dc17 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( + conf: SparkConf, + hadoopConfig: Configuration, + driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = + new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { + try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) + } catch { + case e: Exception => + logError(s"Failed to fetch Hadoop delegation tokens $e") + throw e + } + } + + private val keytabFile: Option[String] = conf.get(config.KEYTAB) + + scheduleTokenRenewal() + + private def scheduleTokenRenewal(): Unit = { + if (keytabFile.isDefined) { + require(principal != null, "Principal is required for Keytab-based authentication") + logInfo(s"Using keytab: ${keytabFile.get} and principal $principal") + } else { + logInfo("Using ticket cache for Kerberos authentication, no token renewal.") + return + } + + def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { + logInfo("Credentials have expired, creating new ones now.") + runnable.run() + } else { + logInfo(s"Scheduling login from keytab in $remainingTime millis.") + credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } + } + + val credentialRenewerRunnable = + new Runnable { + override def run(): Unit = { + try { + getNewDelegationTokens() + broadcastDelegationTokens(tokens) + } catch { + case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying again in an hour", e) + credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) + return + } + scheduleRenewal(this) + } + } + scheduleRenewal(credentialRenewerRunnable) + } + + private def getNewDelegationTokens(): Unit = { + logInfo(s"Attempting to login to KDC with principal ${principal}") + // Get new delegation tokens by logging in with a new UGI inspired by AMCredentialRenewer.scala + // Don't protect against keytabFile being empty because it's guarded above. + val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFile.get) + logInfo("Successfully logged into KDC") + val tempCreds = ugi.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] { + override def run(): Long = { + tokenManager.obtainDelegationTokens(hadoopConf, tempCreds) + } + }) + + val currTime = System.currentTimeMillis() + timeOfNextRenewal = if (nextRenewalTime <= currTime) { + logWarning(s"Next credential renewal time ($nextRenewalTime) is earlier than " + + s"current time ($currTime), which is unexpected, please check your credential renewal " + + "related configurations in the target services.") + currTime + } else { + SparkHadoopUtil.getDateOfNextUpdate(nextRenewalTime, 0.75) + } + logInfo(s"Time of next renewal is in ${timeOfNextRenewal - System.currentTimeMillis()} ms") + + // Add the temp credentials back to the original ones. + UserGroupInformation.getCurrentUser.addCredentials(tempCreds) + // update tokens for late or dynamically added executors + tokens = SparkHadoopUtil.get.serialize(tempCreds) + } + + private def broadcastDelegationTokens(tokens: Array[Byte]) = { + logInfo("Sending new tokens to all executors") + driverEndpoint.send(UpdateDelegationTokens(tokens)) + } + + def getTokens(): Array[Byte] = { + tokens + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala index 68a2e9e..6134757 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn.security import java.security.PrivilegedExceptionAction -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -25,6 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging @@ -58,9 +59,8 @@ private[yarn] class AMCredentialRenewer( private var lastCredentialsFileSuffix = 0 - private val credentialRenewer = - Executors.newSingleThreadScheduledExecutor( - ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + private val credentialRenewerThread: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread") private val hadoopUtil = YarnSparkHadoopUtil.get @@ -70,7 +70,7 @@ private[yarn] class AMCredentialRenewer( private val freshHadoopConf = hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme) - @volatile private var timeOfNextRenewal = sparkConf.get(CREDENTIALS_RENEWAL_TIME) + @volatile private var timeOfNextRenewal: Long = sparkConf.get(CREDENTIALS_RENEWAL_TIME) /** * Schedule a login from the keytab and principal set using the --principal and --keytab @@ -95,7 +95,7 @@ private[yarn] class AMCredentialRenewer( runnable.run() } else { logInfo(s"Scheduling login from keytab in $remainingTime millis.") - credentialRenewer.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) } } @@ -111,7 +111,7 @@ private[yarn] class AMCredentialRenewer( // Log the error and try to write new tokens back in an hour logWarning("Failed to write out new credentials to HDFS, will try again in an " + "hour! If this happens too often tasks will fail.", e) - credentialRenewer.schedule(this, 1, TimeUnit.HOURS) + credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) return } scheduleRenewal(this) @@ -195,8 +195,9 @@ private[yarn] class AMCredentialRenewer( } else { // Next valid renewal time is about 75% of credential renewal time, and update time is // slightly later than valid renewal time (80% of renewal time). - timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + currTime).toLong - ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong + timeOfNextRenewal = + SparkHadoopUtil.getDateOfNextUpdate(nearestNextRenewalTime, 0.75) + SparkHadoopUtil.getDateOfNextUpdate(nearestNextRenewalTime, 0.8) } // Add the temp credentials back to the original ones. @@ -232,6 +233,6 @@ private[yarn] class AMCredentialRenewer( } def stop(): Unit = { - credentialRenewer.shutdown() + credentialRenewerThread.shutdown() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org