Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19272#discussion_r150688646
  
    --- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
    @@ -0,0 +1,185 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster.mesos
    +
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.UserGroupInformation
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    +import org.apache.spark.internal.{config, Logging}
    +import org.apache.spark.rpc.RpcEndpointRef
    +import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.util.ThreadUtils
    +
    +
    +/**
    + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
    + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
    + * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
    + * The principal difference is that instead of writing the new credentials 
to HDFS and
    + * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
    + * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
    + * received they overwrite the current credentials.
    + */
    +private[spark] class MesosHadoopDelegationTokenManager(
    +    conf: SparkConf,
    +    hadoopConfig: Configuration,
    +    driverEndpoint: RpcEndpointRef)
    +  extends Logging {
    +
    +  require(driverEndpoint != null, "DriverEndpoint is not initialized")
    +
    +  private val credentialRenewerThread: ScheduledExecutorService =
    +    ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
    +
    +  private val tokenManager: HadoopDelegationTokenManager =
    +    new HadoopDelegationTokenManager(conf, hadoopConfig)
    +
    +  private val principal: String = conf.get(config.PRINCIPAL).orNull
    +
    +  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
    +
    +  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
    +    try {
    +      val creds = UserGroupInformation.getCurrentUser.getCredentials
    +      val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
    +      val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
    +      logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
    +      (SparkHadoopUtil.get.serialize(creds), rt)
    +    } catch {
    +      case e: Exception =>
    +        logError("Failed to initialize Hadoop delegation tokens\n" +
    +          s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
    +        throw e
    +    }
    +  }
    +
    +  scheduleTokenRenewal()
    +
    +  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
    +    val keytab = conf.get(config.KEYTAB)
    +    val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
    +    val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
    +      // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
    +      logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
    +        s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
    +      (keytab, "keytab")
    +    } else {
    +      val m = if (keytab.isDefined) "keytab" else "tgt"
    +      val sf = if (keytab.isDefined) keytab else tgt
    +      (sf, m)
    +    }
    +
    +    if (principal == null) {
    +      require(mode == "tgt", s"Must specify a principal when using a 
Keytab, was $principal")
    +      logInfo(s"Using ticket cache to fetch Hadoop delegation tokens")
    +    } else {
    +      logInfo(s"Using principal: $principal with mode and keytab $keytab " 
+
    +        s"to fetch Hadoop delegation tokens")
    +    }
    +
    +    logDebug(s"secretFile is $secretFile")
    +    (secretFile, mode)
    +  }
    +
    +  private def scheduleTokenRenewal(): Unit = {
    +    def scheduleRenewal(runnable: Runnable): Unit = {
    +      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
    +      if (remainingTime <= 0) {
    +        logInfo("Credentials have expired, creating new ones now.")
    +        runnable.run()
    +      } else {
    +        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
    +        credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
    +      }
    +    }
    +
    +    val credentialRenewerRunnable =
    +      new Runnable {
    +        override def run(): Unit = {
    +          try {
    +            val tokensBytes = getNewDelegationTokens
    +            broadcastDelegationTokens(tokensBytes)
    +          } catch {
    +            case e: Exception =>
    +              // Log the error and try to write new tokens back in an hour
    +              logWarning("Couldn't broadcast tokens, trying again in an 
hour", e)
    +              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
    +              return
    +          }
    +          scheduleRenewal(this)
    +        }
    +      }
    +    scheduleRenewal(credentialRenewerRunnable)
    +  }
    +
    +  private def getNewDelegationTokens(): Array[Byte] = {
    +    logInfo(s"Attempting to login to KDC with 
${conf.get(config.PRINCIPAL).orNull}")
    +    // Get new delegation tokens by logging in with a new UGI
    +    // inspired by AMCredentialRenewer.scala:L174
    +    val ugi = if (mode == "keytab") {
    +      UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
secretFile.get)
    +    } else {
    +      // if the ticket cache is not explicitly defined, use the default
    +      if (secretFile.isEmpty) {
    --- End diff --
    
    The point I was trying to make is that you do not need *any* special 
handling for TGT. The UGI class already does everything you need, you just need 
to get the current user. It will keep the TGT updated with any changes that 
happen on disk. You don't need to handle `KRB5CCNAME` anywhere, because UGI 
should be doing that for you. If it's not, you need to explain why you need 
this special handling, because the expected behavior is for this to work 
without you needing to do anything.
    
    So you can simplify this class by *only* handling the principal / keytab 
case, and just using `UserGroupInformation.getCurrentUser` in the other case. 
You don't need to keep track of the "mode" or anything else, just whether 
you're using a principal / keytab pair.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to