Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r173236591
  
    --- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
    @@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
     
     import java.security.PrivilegedExceptionAction
     import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.{FileSystem, Path}
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
     import org.apache.spark.deploy.SparkHadoopUtil
    -import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
     import org.apache.spark.deploy.yarn.config._
     import org.apache.spark.internal.Logging
     import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
     import org.apache.spark.util.ThreadUtils
     
     /**
    - * The following methods are primarily meant to make sure long-running 
apps like Spark
    - * Streaming apps can run without interruption while accessing secured 
services. The
    - * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
    - * This method wakes up a thread that logs into the KDC
    - * once 75% of the renewal interval of the original credentials used for 
the container
    - * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
    - * pre-specified location - the prefix of which is specified in the 
sparkConf by
    - * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
    - * - each update goes to a new file, with a monotonically increasing 
suffix), also the
    - * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
    - * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
    + * A manager tasked with periodically updating delegation tokens needed by 
the application.
      *
    - * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
    - * called once 80% of the validity of the original credentials has 
elapsed. At that time the
    - * executor finds the credentials file with the latest timestamp and 
checks if it has read those
    - * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
    - * appeared, it will read the credentials and update the currently running 
UGI with it. This
    - * process happens again once 80% of the validity of this has expired.
    + * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
    + * without interruption while accessing secured services. It periodically 
logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when 
needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often 
to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op 
when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes 
that might need them.
      */
     private[yarn] class AMCredentialRenewer(
         sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
    +    hadoopConf: Configuration) extends Logging {
     
    -  private var lastCredentialsFileSuffix = 0
    +  private val principal = sparkConf.get(PRINCIPAL).get
    +  private val keytab = sparkConf.get(KEYTAB).get
    +  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  private val credentialRenewerThread: ScheduledExecutorService =
    +  private val renewalExecutor: ScheduledExecutorService =
         ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
     
    -  private val hadoopUtil = SparkHadoopUtil.get
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
     
    -  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
    -  private val daysToKeepFiles = 
sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
    -  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
    -  private val freshHadoopConf =
    -    hadoopUtil.getConfBypassingFSCache(hadoopConf, new 
Path(credentialsFile).toUri.getScheme)
    +  private val renewalTask = new Runnable() {
    +    override def run(): Unit = {
    +      updateTokensTask()
    +    }
    +  }
     
    -  @volatile private var timeOfNextRenewal: Long = 
sparkConf.get(CREDENTIALS_RENEWAL_TIME)
    +  def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
     
       /**
    -   * Schedule a login from the keytab and principal set using the 
--principal and --keytab
    -   * arguments to spark-submit. This login happens only when the 
credentials of the current user
    -   * are about to expire. This method reads spark.yarn.principal and 
spark.yarn.keytab from
    -   * SparkConf to do the login. This method is a no-op in non-YARN mode.
    +   * Start the token renewer. Upon start, the renewer will:
        *
    +   * - log in the configured user, and set up a task to keep that user's 
ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
        */
    -  private[spark] def scheduleLoginFromKeytab(): Unit = {
    -    val principal = sparkConf.get(PRINCIPAL).get
    -    val keytab = sparkConf.get(KEYTAB).get
    -
    -    /**
    -     * Schedule re-login and creation of new credentials. If credentials 
have already expired, this
    -     * method will synchronously create new ones.
    -     */
    -    def scheduleRenewal(runnable: Runnable): Unit = {
    -      // Run now!
    -      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
    -      if (remainingTime <= 0) {
    -        logInfo("Credentials have expired, creating new ones now.")
    -        runnable.run()
    -      } else {
    -        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
    -        credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
    +  def start(): UserGroupInformation = {
    +    val originalCreds = 
UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
           }
         }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, 
tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
     
    -    // This thread periodically runs on the AM to update the credentials 
on HDFS.
    -    val credentialRenewerRunnable =
    -      new Runnable {
    -        override def run(): Unit = {
    -          try {
    -            writeNewCredentialsToHDFS(principal, keytab)
    -            cleanupOldFiles()
    -          } catch {
    -            case e: Exception =>
    -              // Log the error and try to write new tokens back in an hour
    -              logWarning("Failed to write out new credentials to HDFS, 
will try again in an " +
    -                "hour! If this happens too often tasks will fail.", e)
    -              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
    -              return
    -          }
    -          scheduleRenewal(this)
    -        }
    -      }
    -    // Schedule update of credentials. This handles the case of updating 
the credentials right now
    -    // as well, since the renewal interval will be 0, and the thread will 
get scheduled
    -    // immediately.
    -    scheduleRenewal(credentialRenewerRunnable)
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    // Transfer the original user's tokens to the new user, since that's 
needed to connect to
    +    // YARN. Explicitly avoid overwriting tokens that already exist in the 
current user's
    +    // credentials, since those were freshly obtained above (see 
SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    renewalExecutor.shutdown()
    +  }
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in 
${UIUtils.formatDuration(delay)}.")
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
       }
     
    -  // Keeps only files that are newer than daysToKeepFiles days, and 
deletes everything else. At
    -  // least numFilesToKeep files are kept for safety
    -  private def cleanupOldFiles(): Unit = {
    -    import scala.concurrent.duration._
    +  /**
    +   * Periodic task to login to the KDC and create new delegation tokens. 
Re-schedules itself
    +   * to fetch the next set of tokens when needed.
    +   */
    +  private def updateTokensTask(): Unit = {
         try {
    -      val remoteFs = FileSystem.get(freshHadoopConf)
    -      val credentialsPath = new Path(credentialsFile)
    -      val thresholdTime = System.currentTimeMillis() - 
(daysToKeepFiles.days).toMillis
    -      hadoopUtil.listFilesSorted(
    -        remoteFs, credentialsPath.getParent,
    -        credentialsPath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
    -        .dropRight(numFilesToKeep)
    -        .takeWhile(_.getModificationTime < thresholdTime)
    -        .foreach(x => remoteFs.delete(x.getPath, true))
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
    +      } else {
    +        // This shouldn't really happen, since the driver should register 
way before tokens expire
    +        // (or the AM should time out the application).
    +        logWarning("Delegation tokens close to expiration but no driver 
has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
    +      }
         } catch {
    -      // Such errors are not fatal, so don't throw. Make sure they are 
logged though
           case e: Exception =>
    -        logWarning("Error while attempting to cleanup old credentials. If 
you are seeing many " +
    -          "such warnings there may be an issue with your HDFS cluster.", e)
    +        val delay = 
TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in 
${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
         }
       }
     
    -  private def writeNewCredentialsToHDFS(principal: String, keytab: 
String): Unit = {
    -    // Keytab is copied by YARN to the working directory of the AM, so 
full path is
    -    // not needed.
    -
    -    // HACK:
    --- End diff --
    
    Not sure I understand the question. This comment talks about a lot of 
things. The only thing that really applies still is the using a new UGI to get 
new delegation tokens. That's not really a hack, that's just how the API 
works...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to