This is an automated email from the ASF dual-hosted git repository. irashid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 669e8a1 [SPARK-25689][YARN] Make driver, not AM, manage delegation tokens. 669e8a1 is described below commit 669e8a155987995a1a5d49a96b88c05f39e41723 Author: Marcelo Vanzin <van...@cloudera.com> AuthorDate: Mon Jan 7 14:40:08 2019 -0600 [SPARK-25689][YARN] Make driver, not AM, manage delegation tokens. This change modifies the behavior of the delegation token code when running on YARN, so that the driver controls the renewal, in both client and cluster mode. For that, a few different things were changed: * The AM code only runs code that needs DTs when DTs are available. In a way, this restores the AM behavior to what it was pre-SPARK-23361, but keeping the fix added in that bug. Basically, all the AM code is run in a "UGI.doAs()" block; but code that needs to talk to HDFS (basically the distributed cache handling code) was delayed to the point where the driver is up and running, and thus when valid delegation tokens are available. * SparkSubmit / ApplicationMaster now handle user login, not the token manager. The previous AM code was relying on the token manager to keep the user logged in when keytabs are used. This required some odd APIs in the token manager and the AM so that the right UGI was exposed and used in the right places. After this change, the logged in user is handled separately from the token manager, so the API was cleaned up, and, as explained above, the whole AM runs under the logged in user, which also helps with simplifying some more code. * Distributed cache configs are sent separately to the AM. Because of the delayed initialization of the cached resources in the AM, it became easier to write the cache config to a separate properties file instead of bundling it with the rest of the Spark config. This also avoids having to modify the SparkConf to hide things from the UI. * Finally, the AM doesn't manage the token manager anymore. The above changes allow the token manager to be completely handled by the driver's scheduler backend code also in YARN mode (whether client or cluster), making it similar to other RMs. To maintain the fix added in SPARK-23361 also in client mode, the AM now sends an extra message to the driver on initialization to fetch delegation tokens; and although it might not really be needed, the driver also keeps the running AM updated when new tokens are created. Tested in a kerberized cluster with the same tests used to validate SPARK-23361, in both client and cluster mode. Also tested with a non-kerberized cluster. Closes #23338 from vanzin/SPARK-25689. Authored-by: Marcelo Vanzin <van...@cloudera.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> --- .../security/HadoopDelegationTokenManager.scala | 110 ++++++----------- .../security/HiveDelegationTokenProvider.scala | 16 ++- .../cluster/CoarseGrainedClusterMessage.scala | 3 + .../cluster/CoarseGrainedSchedulerBackend.scala | 40 ++++-- .../HadoopDelegationTokenManagerSuite.scala | 8 +- .../features/KerberosConfDriverFeatureStep.scala | 2 +- .../k8s/KubernetesClusterSchedulerBackend.scala | 7 +- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 7 +- .../spark/deploy/yarn/ApplicationMaster.scala | 135 ++++++++++----------- .../deploy/yarn/ApplicationMasterArguments.scala | 5 + .../org/apache/spark/deploy/yarn/Client.scala | 100 ++++++++------- .../apache/spark/deploy/yarn/YarnRMClient.scala | 8 +- .../org/apache/spark/deploy/yarn/config.scala | 10 -- .../YARNHadoopDelegationTokenManager.scala | 7 +- .../cluster/YarnClientSchedulerBackend.scala | 6 + .../scheduler/cluster/YarnSchedulerBackend.scala | 17 ++- .../YARNHadoopDelegationTokenManagerSuite.scala | 2 +- 17 files changed, 246 insertions(+), 237 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index f7e3dde..d97857a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -21,7 +21,6 @@ import java.io.File import java.net.URI import java.security.PrivilegedExceptionAction import java.util.concurrent.{ScheduledExecutorService, TimeUnit} -import java.util.concurrent.atomic.AtomicReference import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem @@ -39,32 +38,24 @@ import org.apache.spark.util.ThreadUtils /** * Manager for delegation tokens in a Spark application. * - * This manager has two modes of operation: - * - * 1. When configured with a principal and a keytab, it will make sure long-running apps can run - * without interruption while accessing secured services. It periodically logs in to the KDC with - * user-provided credentials, and contacts all the configured secure services to obtain delegation - * tokens to be distributed to the rest of the application. - * - * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often - * to check that a relogin is necessary. This is done reasonably often since the check is a no-op - * when the relogin is not yet needed. The check period can be overridden in the configuration. + * When configured with a principal and a keytab, this manager will make sure long-running apps can + * run without interruption while accessing secured services. It periodically logs in to the KDC + * with user-provided credentials, and contacts all the configured secure services to obtain + * delegation tokens to be distributed to the rest of the application. * * New delegation tokens are created once 75% of the renewal interval of the original tokens has - * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM. - * The driver is tasked with distributing the tokens to other processes that might need them. + * elapsed. The new tokens are sent to the Spark driver endpoint. The driver is tasked with + * distributing the tokens to other processes that might need them. * - * 2. When operating without an explicit principal and keytab, token renewal will not be available. - * Starting the manager will distribute an initial set of delegation tokens to the provided Spark - * driver, but the app will not get new tokens when those expire. - * - * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens` - * method. This option does not require calling the `start` method, but leaves it up to the - * caller to distribute the tokens that were generated. + * This class can also be used just to create delegation tokens, by calling the + * `obtainDelegationTokens` method. This option does not require calling the `start` method nor + * providing a driver reference, but leaves it up to the caller to distribute the tokens that were + * generated. */ private[spark] class HadoopDelegationTokenManager( protected val sparkConf: SparkConf, - protected val hadoopConf: Configuration) extends Logging { + protected val hadoopConf: Configuration, + protected val schedulerRef: RpcEndpointRef) extends Logging { private val deprecatedProviderEnabledConfigs = List( "spark.yarn.security.tokens.%s.enabled", @@ -85,60 +76,44 @@ private[spark] class HadoopDelegationTokenManager( s"${delegationTokenProviders.keys.mkString(", ")}.") private var renewalExecutor: ScheduledExecutorService = _ - private val driverRef = new AtomicReference[RpcEndpointRef]() - - /** Set the endpoint used to send tokens to the driver. */ - def setDriverRef(ref: RpcEndpointRef): Unit = { - driverRef.set(ref) - } /** @return Whether delegation token renewal is enabled. */ def renewalEnabled: Boolean = principal != null /** - * Start the token renewer. Requires a principal and keytab. Upon start, the renewer will: + * Start the token renewer. Requires a principal and keytab. Upon start, the renewer will + * obtain delegation tokens for all configured services and send them to the driver, and + * set up tasks to periodically get fresh tokens as needed. * - * - log in the configured principal, and set up a task to keep that user's ticket renewed - * - obtain delegation tokens from all available providers - * - send the tokens to the driver, if it's already registered - * - schedule a periodic task to update the tokens when needed. + * This method requires that a keytab has been provided to Spark, and will try to keep the + * logged in user's TGT valid while this manager is active. * - * @return The newly logged in user. + * @return New set of delegation tokens created for the configured principal. */ - def start(): UserGroupInformation = { + def start(): Array[Byte] = { require(renewalEnabled, "Token renewal must be enabled to start the renewer.") + require(schedulerRef != null, "Token renewal requires a scheduler endpoint.") renewalExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") - val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() - val ugi = doLogin() - - val tgtRenewalTask = new Runnable() { - override def run(): Unit = { - ugi.checkTGTAndReloginFromKeytab() + val ugi = UserGroupInformation.getCurrentUser() + if (ugi.isFromKeytab()) { + // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop 3.x, + // it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added in + // HADOOP-9567). This task will make sure that the user stays logged in regardless of that + // configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if the TGT does + // not need to be renewed yet. + val tgtRenewalTask = new Runnable() { + override def run(): Unit = { + ugi.checkTGTAndReloginFromKeytab() + } } + val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD) + renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod, + TimeUnit.SECONDS) } - val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD) - renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod, - TimeUnit.SECONDS) - val creds = obtainTokensAndScheduleRenewal(ugi) - ugi.addCredentials(creds) - - val driver = driverRef.get() - if (driver != null) { - val tokens = SparkHadoopUtil.get.serialize(creds) - driver.send(UpdateDelegationTokens(tokens)) - } - - // Transfer the original user's tokens to the new user, since it may contain needed tokens - // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already - // exist in the current user's credentials, since those were freshly obtained above - // (see SPARK-23361). - val existing = ugi.getCredentials() - existing.mergeAll(originalCreds) - ugi.addCredentials(existing) - ugi + updateTokensTask() } def stop(): Unit = { @@ -218,27 +193,22 @@ private[spark] class HadoopDelegationTokenManager( * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself * to fetch the next set of tokens when needed. */ - private def updateTokensTask(): Unit = { + private def updateTokensTask(): Array[Byte] = { try { val freshUGI = doLogin() val creds = obtainTokensAndScheduleRenewal(freshUGI) val tokens = SparkHadoopUtil.get.serialize(creds) - val driver = driverRef.get() - if (driver != null) { - logInfo("Updating delegation tokens.") - driver.send(UpdateDelegationTokens(tokens)) - } else { - // This shouldn't really happen, since the driver should register way before tokens expire. - logWarning("Delegation tokens close to expiration but no driver has registered yet.") - SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf) - } + logInfo("Updating delegation tokens.") + schedulerRef.send(UpdateDelegationTokens(tokens)) + tokens } catch { case e: Exception => val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)) logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" + " If this happens too often tasks will fail.", e) scheduleRenewal(delay) + null } } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index 90f7051..4ca0136 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -67,11 +67,17 @@ private[spark] class HiveDelegationTokenProvider // Other modes (such as client with or without keytab, or cluster mode with keytab) do not need // a delegation token, since there's a valid kerberos TGT for the right user available to the // driver, which is the only process that connects to the HMS. - val deployMode = sparkConf.get("spark.submit.deployMode", "client") - UserGroupInformation.isSecurityEnabled && + // + // Note that this means Hive tokens are not re-created periodically by the token manager. + // This is because HMS connections are only performed by the Spark driver, and the driver + // either has a TGT, in which case it does not need tokens, or it has a token created + // elsewhere, in which case it cannot create new ones. The check for an existing token avoids + // printing an exception to the logs in the latter case. + val currentToken = UserGroupInformation.getCurrentUser().getCredentials().getToken(tokenAlias) + currentToken == null && UserGroupInformation.isSecurityEnabled && hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty && (SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser()) || - (deployMode == "cluster" && !sparkConf.contains(KEYTAB))) + (!Utils.isClientMode(sparkConf) && !sparkConf.contains(KEYTAB))) } override def obtainDelegationTokens( @@ -98,7 +104,7 @@ private[spark] class HiveDelegationTokenProvider val hive2Token = new Token[DelegationTokenIdentifier]() hive2Token.decodeFromUrlString(tokenStr) logDebug(s"Get Token from hive metastore: ${hive2Token.toString}") - creds.addToken(new Text("hive.server2.delegation.token"), hive2Token) + creds.addToken(tokenAlias, hive2Token) } None @@ -134,4 +140,6 @@ private[spark] class HiveDelegationTokenProvider case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) } } + + private def tokenAlias: Text = new Text("hive.server2.delegation.token") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index e8b7fc0..9e768c2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -104,6 +104,9 @@ private[spark] object CoarseGrainedClusterMessages { case class RegisterClusterManager(am: RpcEndpointRef) extends CoarseGrainedClusterMessage + // Used by YARN's client mode AM to retrieve the current set of delegation tokens. + object RetrieveDelegationTokens extends CoarseGrainedClusterMessage + // Request executors by specifying the new total number of executors desired // This includes executors already pending or running case class RequestExecutors( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 329158a..98ed2ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -162,11 +162,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } case UpdateDelegationTokens(newDelegationTokens) => - SparkHadoopUtil.get.addDelegationTokens(newDelegationTokens, conf) - delegationTokens.set(newDelegationTokens) - executorDataMap.values.foreach { ed => - ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens)) - } + updateDelegationTokens(newDelegationTokens) case RemoveExecutor(executorId, reason) => // We will remove the executor's state and cannot restore it. However, the connection @@ -404,17 +400,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp driverEndpoint = createDriverEndpointRef(properties) if (UserGroupInformation.isSecurityEnabled()) { - delegationTokenManager = createTokenManager() + delegationTokenManager = createTokenManager(driverEndpoint) delegationTokenManager.foreach { dtm => - dtm.setDriverRef(driverEndpoint) - val creds = if (dtm.renewalEnabled) { - dtm.start().getCredentials() + val tokens = if (dtm.renewalEnabled) { + dtm.start() } else { val creds = UserGroupInformation.getCurrentUser().getCredentials() dtm.obtainDelegationTokens(creds) - creds + SparkHadoopUtil.get.serialize(creds) + } + if (tokens != null) { + delegationTokens.set(tokens) } - delegationTokens.set(SparkHadoopUtil.get.serialize(creds)) } } } @@ -716,8 +713,27 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Create the delegation token manager to be used for the application. This method is called * once during the start of the scheduler backend (so after the object has already been * fully constructed), only if security is enabled in the Hadoop configuration. + * + * @param schedulerRef RPC endpoint for the scheduler, where updated delegation tokens should be + * sent. */ - protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None + protected def createTokenManager( + schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = None + + /** + * Called when a new set of delegation tokens is sent to the driver. Child classes can override + * this method but should always call this implementation, which handles token distribution to + * executors. + */ + protected def updateDelegationTokens(tokens: Array[Byte]): Unit = { + SparkHadoopUtil.get.addDelegationTokens(tokens, conf) + delegationTokens.set(tokens) + executorDataMap.values.foreach { ed => + ed.executorEndpoint.send(UpdateDelegationTokens(tokens)) + } + } + + protected def currentDelegationTokens: Array[Byte] = delegationTokens.get() } diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index def9e62..af7d44b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -27,7 +27,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { private val hadoopConf = new Configuration() test("default configuration") { - val manager = new HadoopDelegationTokenManager(new SparkConf(false), hadoopConf) + val manager = new HadoopDelegationTokenManager(new SparkConf(false), hadoopConf, null) assert(manager.isProviderLoaded("hadoopfs")) assert(manager.isProviderLoaded("hbase")) assert(manager.isProviderLoaded("hive")) @@ -36,7 +36,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { test("disable hive credential provider") { val sparkConf = new SparkConf(false).set("spark.security.credentials.hive.enabled", "false") - val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf) + val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null) assert(manager.isProviderLoaded("hadoopfs")) assert(manager.isProviderLoaded("hbase")) assert(!manager.isProviderLoaded("hive")) @@ -47,7 +47,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { val sparkConf = new SparkConf(false) .set("spark.yarn.security.tokens.hadoopfs.enabled", "false") .set("spark.yarn.security.credentials.hive.enabled", "false") - val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf) + val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null) assert(!manager.isProviderLoaded("hadoopfs")) assert(manager.isProviderLoaded("hbase")) assert(!manager.isProviderLoaded("hive")) @@ -99,7 +99,7 @@ private object NoHiveTest { def runTest(): Unit = { try { - val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration()) + val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration(), null) require(!manager.isProviderLoaded("hive")) } catch { case e: Throwable => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala index 721d7e9..a77e8d4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala @@ -91,7 +91,7 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri private lazy val delegationTokens: Array[Byte] = { if (keytab.isEmpty && existingSecretName.isEmpty) { val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf, - SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf)) + SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null) val creds = UserGroupInformation.getCurrentUser().getCredentials() tokenManager.obtainDelegationTokens(creds) // If no tokens and no secrets are stored in the credentials, make sure nothing is returned, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index cd29897..e285e20 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -26,7 +26,7 @@ import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.rpc.{RpcAddress, RpcEnv} +import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} import org.apache.spark.util.{ThreadUtils, Utils} @@ -147,8 +147,9 @@ private[spark] class KubernetesClusterSchedulerBackend( new KubernetesDriverEndpoint(sc.env.rpcEnv, properties) } - override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { - Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration)) + override protected def createTokenManager( + schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = { + Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef)) } private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index d017451..03cd258 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -36,7 +36,7 @@ import org.apache.spark.internal.config import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient -import org.apache.spark.rpc.RpcEndpointAddress +import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef} import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -772,8 +772,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } } - override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { - Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration)) + override protected def createTokenManager( + schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = { + Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef)) } private def numExecutors(): Int = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8dbdac1..1ece7bd 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.{StringUtils => ComStrUtils} import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ @@ -41,7 +42,6 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem @@ -58,6 +58,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. + private val appAttemptId = YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId() private val isClusterMode = args.userClass != null private val sparkConf = new SparkConf() @@ -99,25 +100,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - private val tokenManager: Option[YARNHadoopDelegationTokenManager] = { - sparkConf.get(KEYTAB).map { _ => - new YARNHadoopDelegationTokenManager(sparkConf, yarnConf) - } - } - - private val ugi = tokenManager match { - case Some(tm) => - // Set the context class loader so that the token renewer has access to jars distributed - // by the user. - Utils.withContextClassLoader(userClassLoader) { - tm.start() - } - - case _ => - SparkHadoopUtil.get.createSparkUser() - } - - private val client = doAsUser { new YarnRMClient() } + private val client = new YarnRMClient() // Default to twice the number of executors (twice the maximum number of executors if dynamic // allocation is enabled), with a minimum of 3. @@ -174,11 +157,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // In cluster mode, used to tell the AM when the user's SparkContext has been initialized. private val sparkContextPromise = Promise[SparkContext]() - // Load the list of localized files set by the client. This is used when launching executors, - // and is loaded here so that these configs don't pollute the Web UI's environment page in - // cluster mode. - private val localResources = doAsUser { + /** + * Load the list of localized files set by the client, used when launching executors. This should + * be called in a context where the needed credentials to access HDFS are available. + */ + private def prepareLocalResources(): Map[String, LocalResource] = { logInfo("Preparing Local resources") + val distCacheConf = new SparkConf(false) + if (args.distCacheConf != null) { + Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) => + distCacheConf.set(k, v) + } + } + val resources = HashMap[String, LocalResource]() def setupDistributedCache( @@ -199,11 +190,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends resources(fileName) = amJarRsrc } - val distFiles = sparkConf.get(CACHED_FILES) - val fileSizes = sparkConf.get(CACHED_FILES_SIZES) - val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS) - val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES) - val resTypes = sparkConf.get(CACHED_FILES_TYPES) + val distFiles = distCacheConf.get(CACHED_FILES) + val fileSizes = distCacheConf.get(CACHED_FILES_SIZES) + val timeStamps = distCacheConf.get(CACHED_FILES_TIMESTAMPS) + val visibilities = distCacheConf.get(CACHED_FILES_VISIBILITIES) + val resTypes = distCacheConf.get(CACHED_FILES_TYPES) for (i <- 0 to distFiles.size - 1) { val resType = LocalResourceType.valueOf(resTypes(i)) @@ -212,7 +203,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } // Distribute the conf archive to executors. - sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path => + distCacheConf.get(CACHED_CONF_ARCHIVE).foreach { path => val uri = new URI(path) val fs = FileSystem.get(uri, yarnConf) val status = fs.getFileStatus(new Path(uri)) @@ -225,33 +216,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends LocalResourceVisibility.PRIVATE.name()) } - // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy). - CACHE_CONFIGS.foreach { e => - sparkConf.remove(e) - sys.props.remove(e.key) - } - resources.toMap } - def getAttemptId(): ApplicationAttemptId = { - client.getAttemptId() - } - final def run(): Int = { - doAsUser { - runImpl() - } - exitCode - } - - private def runImpl(): Unit = { try { - val appAttemptId = client.getAttemptId() - - var attemptID: Option[String] = None - - if (isClusterMode) { + val attemptID = if (isClusterMode) { // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty("spark.ui.port", "0") @@ -264,7 +234,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) - attemptID = Option(appAttemptId.getAttemptId.toString) + Option(appAttemptId.getAttemptId.toString) + } else { + None } new CallerContext( @@ -277,7 +249,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1 ShutdownHookManager.addShutdownHook(priority) { () => val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) - val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts + val isLastAttempt = appAttemptId.getAttemptId() >= maxAppAttempts if (!finished) { // The default state of ApplicationMaster is failed if it is invoked by shut down hook. @@ -322,6 +294,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends logWarning("Exception during stopping of the metric system: ", e) } } + + exitCode } /** @@ -377,9 +351,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends logDebug("shutting down user thread") userClassThread.interrupt() } - if (!inShutdown) { - tokenManager.foreach(_.stop()) - } } } } @@ -405,8 +376,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends port: Int, _sparkConf: SparkConf, uiAddress: Option[String]): Unit = { - val appId = client.getAttemptId().getApplicationId().toString() - val attemptId = client.getAttemptId().getAttemptId().toString() + val appId = appAttemptId.getApplicationId().toString() + val attemptId = appAttemptId.getAttemptId().toString() val historyAddress = ApplicationMaster .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId) @@ -415,9 +386,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = { - val appId = client.getAttemptId().getApplicationId().toString() + // In client mode, the AM may be restarting after delegation tokens have reached their TTL. So + // always contact the driver to get the current set of valid tokens, so that local resources can + // be initialized below. + if (!isClusterMode) { + val tokens = driverRef.askSync[Array[Byte]](RetrieveDelegationTokens) + if (tokens != null) { + SparkHadoopUtil.get.addDelegationTokens(tokens, _sparkConf) + } + } + + val appId = appAttemptId.getApplicationId().toString() val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + val localResources = prepareLocalResources() // Before we initialize the allocator, let's log the information about how executors will // be run up front, to avoid printing this out for every single executor being launched. @@ -433,13 +415,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends allocator = client.createAllocator( yarnConf, _sparkConf, + appAttemptId, driverUrl, driverRef, securityMgr, localResources) - tokenManager.foreach(_.setDriverRef(driverRef)) - // Initialize the AM endpoint *after* the allocator has been initialized. This ensures // that when the driver sends an initial executor request (e.g. after an AM restart), // the allocator is ready to service requests. @@ -755,6 +736,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends case None => logWarning("Container allocator is not ready to find executor loss reasons yet.") } + + case UpdateDelegationTokens(tokens) => + SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -767,12 +751,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - private def doAsUser[T](fn: => T): T = { - ugi.doAs(new PrivilegedExceptionAction[T]() { - override def run: T = fn - }) - } - } object ApplicationMaster extends Logging { @@ -793,7 +771,24 @@ object ApplicationMaster extends Logging { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) master = new ApplicationMaster(amArgs) - System.exit(master.run()) + + val ugi = master.sparkConf.get(PRINCIPAL) match { + case Some(principal) => + val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() + SparkHadoopUtil.get.loginUserFromKeytab(principal, master.sparkConf.get(KEYTAB).orNull) + val newUGI = UserGroupInformation.getCurrentUser() + // Transfer the original user's tokens to the new user, since it may contain needed tokens + // (such as those user to connect to YARN). + newUGI.addCredentials(originalCreds) + newUGI + + case _ => + SparkHadoopUtil.get.createSparkUser() + } + + ugi.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = System.exit(master.run()) + }) } private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { @@ -801,7 +796,7 @@ object ApplicationMaster extends Logging { } private[spark] def getAttemptId(): ApplicationAttemptId = { - master.getAttemptId + master.appAttemptId } private[spark] def getHistoryServerAddress( diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index cc76a7c8f..c10206c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -26,6 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) { var primaryRFile: String = null var userArgs: Seq[String] = Nil var propertiesFile: String = null + var distCacheConf: String = null parseArgs(args.toList) @@ -62,6 +63,10 @@ class ApplicationMasterArguments(val args: Array[String]) { propertiesFile = value args = tail + case ("--dist-cache-conf") :: value :: tail => + distCacheConf = value + args = tail + case _ => printUsageAndExit(1, args) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 44a60b8..9f09dc0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -100,21 +100,19 @@ private[spark] class Client( } private val distCacheMgr = new ClientDistributedCacheManager() + private val cachedResourcesConf = new SparkConf(false) - private val principal = sparkConf.get(PRINCIPAL).orNull private val keytab = sparkConf.get(KEYTAB).orNull - private val loginFromKeytab = principal != null - private val amKeytabFileName: String = { + private val amKeytabFileName: Option[String] = if (keytab != null && isClusterMode) { + val principal = sparkConf.get(PRINCIPAL).orNull require((principal == null) == (keytab == null), "Both principal and keytab must be defined, or neither.") - if (loginFromKeytab) { - logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab") - // Generate a file name that can be used for the keytab file, that does not conflict - // with any user file. - new File(keytab).getName() + "-" + UUID.randomUUID().toString - } else { - null - } + logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab") + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString) + } else { + None } require(keytab == null || !Utils.isLocalUri(keytab), "Keytab should reference a local file.") @@ -220,16 +218,7 @@ private[spark] class Client( } } - if (isClusterMode && principal != null && keytab != null) { - val newUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) - newUgi.doAs(new PrivilegedExceptionAction[Unit] { - override def run(): Unit = { - cleanupStagingDirInternal() - } - }) - } else { - cleanupStagingDirInternal() - } + cleanupStagingDirInternal() } /** @@ -312,7 +301,7 @@ private[spark] class Client( */ private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { val credentials = UserGroupInformation.getCurrentUser().getCredentials() - val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf) + val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null) credentialManager.obtainDelegationTokens(credentials) // When using a proxy user, copy the delegation tokens to the user's credentials. Avoid @@ -496,11 +485,11 @@ private[spark] class Client( // If we passed in a keytab, make sure we copy the keytab to the staging directory on // HDFS, and setup the relevant environment vars, so the AM can login again. - if (loginFromKeytab) { + amKeytabFileName.foreach { kt => logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + " via the YARN Secure Distributed Cache.") val (_, localizedPath) = distribute(keytab, - destName = Some(amKeytabFileName), + destName = Some(kt), appMasterOnly = true) require(localizedPath != null, "Keytab file already distributed.") } @@ -636,7 +625,7 @@ private[spark] class Client( // Update the configuration with all the distributed files, minus the conf archive. The // conf archive will be handled by the AM differently so that we avoid having to send // this configuration by other means. See SPARK-14602 for one reason of why this is needed. - distCacheMgr.updateConfiguration(sparkConf) + distCacheMgr.updateConfiguration(cachedResourcesConf) // Upload the conf archive to HDFS manually, and record its location in the configuration. // This will allow the AM to know where the conf archive is in HDFS, so that it can be @@ -648,7 +637,7 @@ private[spark] class Client( // system. val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE) val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf) - sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString()) + cachedResourcesConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString()) val localConfArchive = new Path(createConfArchive().toURI()) copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, force = true, @@ -660,11 +649,6 @@ private[spark] class Client( remoteFs, hadoopConf, remoteConfArchivePath, localResources, LocalResourceType.ARCHIVE, LOCALIZED_CONF_DIR, statCache, appMasterOnly = false) - // Clear the cache-related entries from the configuration to avoid them polluting the - // UI's environment page. This works for client mode; for cluster mode, this is handled - // by the AM. - CACHE_CONFIGS.foreach(sparkConf.remove) - localResources } @@ -768,19 +752,25 @@ private[spark] class Client( hadoopConf.writeXml(confStream) confStream.closeEntry() - // Save Spark configuration to a file in the archive, but filter out the app's secret. - val props = new Properties() - sparkConf.getAll.foreach { case (k, v) => - props.setProperty(k, v) + // Save Spark configuration to a file in the archive. + val props = confToProperties(sparkConf) + + // If propagating the keytab to the AM, override the keytab name with the name of the + // distributed file. Otherwise remove princpal/keytab from the conf, so they're not seen + // by the AM at all. + amKeytabFileName match { + case Some(kt) => + props.setProperty(KEYTAB.key, kt) + case None => + props.remove(PRINCIPAL.key) + props.remove(KEYTAB.key) } - // Override spark.yarn.key to point to the location in distributed cache which will be used - // by AM. - Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) } - confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE)) - val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8) - props.store(writer, "Spark configuration.") - writer.flush() - confStream.closeEntry() + + writePropertiesToArchive(props, SPARK_CONF_FILE, confStream) + + // Write the distributed cache config to the archive. + writePropertiesToArchive(confToProperties(cachedResourcesConf), DIST_CACHE_CONF_FILE, + confStream) } finally { confStream.close() } @@ -984,7 +974,10 @@ private[spark] class Client( } val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++ - Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) + Seq("--properties-file", + buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++ + Seq("--dist-cache-conf", + buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE)) // Command for the ApplicationMaster val commands = prefixEnv ++ @@ -1213,6 +1206,9 @@ private object Client extends Logging { // Name of the file in the conf archive containing Spark configuration. val SPARK_CONF_FILE = "__spark_conf__.properties" + // Name of the file in the conf archive containing the distributed cache info. + val DIST_CACHE_CONF_FILE = "__spark_dist_cache__.properties" + // Subdirectory where the user's python files (not archives) will be placed. val LOCALIZED_PYTHON_DIR = "__pyfiles__" @@ -1512,6 +1508,22 @@ private object Client extends Logging { } getClusterPath(conf, cmdPrefix) } + + def confToProperties(conf: SparkConf): Properties = { + val props = new Properties() + conf.getAll.foreach { case (k, v) => + props.setProperty(k, v) + } + props + } + + def writePropertiesToArchive(props: Properties, name: String, out: ZipOutputStream): Unit = { + out.putNextEntry(new ZipEntry(name)) + val writer = new OutputStreamWriter(out, StandardCharsets.UTF_8) + props.store(writer, "Spark configuration.") + writer.flush() + out.closeEntry() + } } private[spark] class YarnClusterApplication extends SparkApplication { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 05a7b1e..cf16edf 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -76,12 +76,13 @@ private[spark] class YarnRMClient extends Logging { def createAllocator( conf: YarnConfiguration, sparkConf: SparkConf, + appAttemptId: ApplicationAttemptId, driverUrl: String, driverRef: RpcEndpointRef, securityMgr: SecurityManager, localResources: Map[String, LocalResource]): YarnAllocator = { require(registered, "Must register AM before creating allocator.") - new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, + new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, appAttemptId, securityMgr, localResources, new SparkRackResolver()) } @@ -100,11 +101,6 @@ private[spark] class YarnRMClient extends Logging { } } - /** Returns the attempt ID. */ - def getAttemptId(): ApplicationAttemptId = { - YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId() - } - /** Returns the configuration for the AmIpFilter to add to the Spark UI. */ def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = { // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 7e9cd40..6091cd4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -321,16 +321,6 @@ package object config { .stringConf .createOptional - // The list of cache-related config entries. This is used by Client and the AM to clean - // up the environment so that these settings do not appear on the web UI. - private[yarn] val CACHE_CONFIGS = Seq( - CACHED_FILES, - CACHED_FILES_SIZES, - CACHED_FILES_TIMESTAMPS, - CACHED_FILES_VISIBILITIES, - CACHED_FILES_TYPES, - CACHED_CONF_ARCHIVE) - /* YARN allocator-level blacklisting related config entries. */ private[spark] val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED = ConfigBuilder("spark.yarn.blacklist.executor.launch.blacklisting.enabled") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala index 2d9a3f0..bb40ea8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala @@ -36,10 +36,11 @@ import org.apache.spark.util.Utils * [[ServiceCredentialProvider]] interface, as well as the builtin providers defined * in [[HadoopDelegationTokenManager]]. */ -private[yarn] class YARNHadoopDelegationTokenManager( +private[spark] class YARNHadoopDelegationTokenManager( _sparkConf: SparkConf, - _hadoopConf: Configuration) - extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) { + _hadoopConf: Configuration, + _schedulerRef: RpcEndpointRef) + extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf, _schedulerRef) { private val credentialProviders = { ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 167eef1..934fba3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -27,6 +27,7 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.{config, Logging} import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ private[spark] class YarnClientSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -166,4 +167,9 @@ private[spark] class YarnClientSchedulerBackend( logInfo("Stopped") } + override protected def updateDelegationTokens(tokens: Array[Byte]): Unit = { + super.updateDelegationTokens(tokens) + amEndpoint.foreach(_.send(UpdateDelegationTokens(tokens))) + } + } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1289d4b..6357d4a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -27,6 +27,8 @@ import scala.util.control.NonFatal import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -55,6 +57,7 @@ private[spark] abstract class YarnSchedulerBackend( protected var totalExpectedExecutors = 0 private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv) + protected var amEndpoint: Option[RpcEndpointRef] = None private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint( YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint) @@ -191,6 +194,11 @@ private[spark] abstract class YarnSchedulerBackend( sc.executorAllocationManager.foreach(_.reset()) } + override protected def createTokenManager( + schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = { + Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, schedulerRef)) + } + /** * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected. * This endpoint communicates with the executors and queries the AM for an executor's exit @@ -226,7 +234,6 @@ private[spark] abstract class YarnSchedulerBackend( */ private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { - private var amEndpoint: Option[RpcEndpointRef] = None private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver( executorId: String, @@ -266,11 +273,6 @@ private[spark] abstract class YarnSchedulerBackend( logWarning(s"Requesting driver to remove executor $executorId for reason $reason") driverEndpoint.send(r) } - - case u @ UpdateDelegationTokens(tokens) => - // Add the tokens to the current user and send a message to the scheduler so that it - // notifies all registered executors of the new tokens. - driverEndpoint.send(u) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -304,6 +306,9 @@ private[spark] abstract class YarnSchedulerBackend( case RetrieveLastAllocatedExecutorId => context.reply(currentExecutorIdCounter) + + case RetrieveDelegationTokens => + context.reply(currentDelegationTokens) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala index 98315e4..f00453c 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala @@ -34,7 +34,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite { } test("Correctly loads credential providers") { - credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf) + credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null) assert(credentialManager.isProviderLoaded("yarn-test")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org