Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228398765
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
    @@ -110,32 +209,105 @@ private[spark] class HadoopDelegationTokenManager(
       }
     
       /**
    -   * Get delegation token provider for the specified service.
    +   * List of file systems for which to obtain delegation tokens. The base 
implementation
    +   * returns just the default file system in the given Hadoop 
configuration.
        */
    -  def getServiceDelegationTokenProvider(service: String): 
Option[HadoopDelegationTokenProvider] = {
    -    delegationTokenProviders.get(service)
    +  protected def fileSystemsToAccess(): Set[FileSystem] = {
    +    Set(FileSystem.get(hadoopConf))
    +  }
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in 
${UIUtils.formatDuration(delay)}.")
    +
    +    val renewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        updateTokensTask()
    +      }
    +    }
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
       }
     
       /**
    -   * Writes delegation tokens to creds.  Delegation tokens are fetched 
from all registered
    -   * providers.
    -   *
    -   * @param hadoopConf hadoop Configuration
    -   * @param creds Credentials that will be updated in place (overwritten)
    -   * @return Time after which the fetched delegation tokens should be 
renewed.
    +   * Periodic task to login to the KDC and create new delegation tokens. 
Re-schedules itself
    +   * to fetch the next set of tokens when needed.
        */
    -  def obtainDelegationTokens(
    -      hadoopConf: Configuration,
    -      creds: Credentials): Long = {
    -    delegationTokenProviders.values.flatMap { provider =>
    -      if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
    -        provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
    +  private def updateTokensTask(): Unit = {
    +    try {
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
           } else {
    -        logDebug(s"Service ${provider.serviceName} does not require a 
token." +
    -          s" Check your configuration to see if security is disabled or 
not.")
    -        None
    +        // This shouldn't really happen, since the driver should register 
way before tokens expire.
    +        logWarning("Delegation tokens close to expiration but no driver 
has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
           }
    -    }.foldLeft(Long.MaxValue)(math.min)
    +    } catch {
    +      case e: Exception =>
    +        val delay = 
TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in 
${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
    +    }
    +  }
    +
    +  /**
    +   * Obtain new delegation tokens from the available providers. Schedules 
a new task to fetch
    +   * new tokens before the new set expires.
    +   *
    +   * @return Credentials containing the new tokens.
    +   */
    +  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): 
Credentials = {
    +    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
    +      override def run(): Credentials = {
    +        val creds = new Credentials()
    +        val nextRenewal = obtainDelegationTokens(creds)
    +
    +        // Calculate the time when new credentials should be created, 
based on the configured
    +        // ratio.
    +        val now = System.currentTimeMillis
    +        val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
    +        val adjustedNextRenewal = (now + (ratio * (nextRenewal - 
now))).toLong
    +
    +        scheduleRenewal(adjustedNextRenewal - now)
    +        creds
    +      }
    +    })
    +  }
    +
    +  private def doLogin(): UserGroupInformation = {
    +    logInfo(s"Attempting to login to KDC using principal: $principal")
    +    val ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
    +    logInfo("Successfully logged into KDC.")
    +    ugi
    +  }
    +
    +  private def loadProviders(): Map[String, HadoopDelegationTokenProvider] 
= {
    +    val providers = Seq(new 
HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++
    +      safeCreateProvider(new HiveDelegationTokenProvider) ++
    +      safeCreateProvider(new HBaseDelegationTokenProvider)
    +
    +    // Filter out providers for which 
spark.security.credentials.{service}.enabled is false.
    +    providers
    +      .filter { p => isServiceEnabled(p.serviceName) }
    +      .map { p => (p.serviceName, p) }
    +      .toMap
       }
    -}
     
    +  private def safeCreateProvider(
    +      createFn: => HadoopDelegationTokenProvider): 
Option[HadoopDelegationTokenProvider] = {
    +    try {
    +      Some(createFn)
    +    } catch {
    +      case t: Throwable =>
    +        logDebug(s"Failed to load built in provider.", t)
    --- End diff --
    
    I know this is old code, but do you know why this is catching `Throwable` 
and not `Exception`?  Just for `NoClassDefFoundError`?  I wonder if we should 
special case that ... I worry about trying to recover from stackoverflow or oom


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to