Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150688646 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( + conf: SparkConf, + hadoopConfig: Configuration, + driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = + new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: Option[String], mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { + try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) + } catch { + case e: Exception => + logError("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") + throw e + } + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (Option[String], String) = { + val keytab = conf.get(config.KEYTAB) + val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR)) + val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) { + // if a keytab and a specific ticket cache is specified use the keytab and log the behavior + logWarning(s"Keytab and TGT were detected, using keytab, ${keytab.get}, " + + s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})") + (keytab, "keytab") + } else { + val m = if (keytab.isDefined) "keytab" else "tgt" + val sf = if (keytab.isDefined) keytab else tgt + (sf, m) + } + + if (principal == null) { + require(mode == "tgt", s"Must specify a principal when using a Keytab, was $principal") + logInfo(s"Using ticket cache to fetch Hadoop delegation tokens") + } else { + logInfo(s"Using principal: $principal with mode and keytab $keytab " + + s"to fetch Hadoop delegation tokens") + } + + logDebug(s"secretFile is $secretFile") + (secretFile, mode) + } + + private def scheduleTokenRenewal(): Unit = { + def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { + logInfo("Credentials have expired, creating new ones now.") + runnable.run() + } else { + logInfo(s"Scheduling login from keytab in $remainingTime millis.") + credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } + } + + val credentialRenewerRunnable = + new Runnable { + override def run(): Unit = { + try { + val tokensBytes = getNewDelegationTokens + broadcastDelegationTokens(tokensBytes) + } catch { + case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying again in an hour", e) + credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) + return + } + scheduleRenewal(this) + } + } + scheduleRenewal(credentialRenewerRunnable) + } + + private def getNewDelegationTokens(): Array[Byte] = { + logInfo(s"Attempting to login to KDC with ${conf.get(config.PRINCIPAL).orNull}") + // Get new delegation tokens by logging in with a new UGI + // inspired by AMCredentialRenewer.scala:L174 + val ugi = if (mode == "keytab") { + UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile.get) + } else { + // if the ticket cache is not explicitly defined, use the default + if (secretFile.isEmpty) { --- End diff -- The point I was trying to make is that you do not need *any* special handling for TGT. The UGI class already does everything you need, you just need to get the current user. It will keep the TGT updated with any changes that happen on disk. You don't need to handle `KRB5CCNAME` anywhere, because UGI should be doing that for you. If it's not, you need to explain why you need this special handling, because the expected behavior is for this to work without you needing to do anything. So you can simplify this class by *only* handling the principal / keytab case, and just using `UserGroupInformation.getCurrentUser` in the other case. You don't need to keep track of the "mode" or anything else, just whether you're using a principal / keytab pair.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org