Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14065#discussion_r73931365
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
 ---
    @@ -53,50 +53,58 @@ private[spark] class CredentialUpdater(
           override def run(): Unit = 
Utils.logUncaughtExceptions(updateCredentialsIfRequired())
         }
     
    -  @volatile private var timeOfNextUpdate = 
sparkConf.get(CREDENTIALS_UPDATE_TIME)
    +  /** Start the credentail updater thread periodically to get the 
credentials */
    +  def bootstrap(): Unit = {
    +    val bootstrapTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
    +    val remainingTime = bootstrapTime - System.currentTimeMillis()
    +    if (remainingTime <= 0) {
    +      // We just checked for new credentials but none were there, wait a 
minute and retry.
    +      // This handles the shutdown case where the staging directory may 
have been removed(see
    +      // SPARK-12316 for more details).
    +      credentialUpdater.schedule(credentialUpdaterRunnable, 1, 
TimeUnit.MINUTES)
    +    } else {
    +      logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime 
millis.")
    +      credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, 
TimeUnit.MILLISECONDS)
    +    }
    +  }
     
    -  def updateCredentialsIfRequired(): Unit = {
    -    try {
    +  private def updateCredentialsIfRequired(): Unit = {
    +    val timeToNextUpdate = try {
           val credentialsFilePath = new Path(credentialsFile)
           val remoteFs = FileSystem.get(freshHadoopConf)
           SparkHadoopUtil.get.listFilesSorted(
             remoteFs, credentialsFilePath.getParent,
             credentialsFilePath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
    -        .lastOption.foreach { credentialsStatus =>
    +        .lastOption.map { credentialsStatus =>
             val suffix = 
SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
             if (suffix > lastCredentialsFileSuffix) {
               logInfo("Reading new credentials from " + 
credentialsStatus.getPath)
               val newCredentials = getCredentialsFromHDFSFile(remoteFs, 
credentialsStatus.getPath)
               lastCredentialsFileSuffix = suffix
               
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
               logInfo("Credentials updated from credentials file.")
    -          timeOfNextUpdate = 
getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)
    +
    +          val remainingTime =
    +            getTimeOfNextUpdateFromFileName(credentialsStatus.getPath) - 
System.currentTimeMillis()
    +          if (remainingTime <= 0) 60L * 1000 else remainingTime
             } else {
    -          // Check every hour to see if new credentials arrived.
    -          logInfo("Updated credentials were expected, but the AM has not 
updated the " +
    -            "credentials yet, will check again in an hour.")
    -          credentialUpdater.schedule(credentialUpdaterRunnable, 1, 
TimeUnit.HOURS)
    -          return
    +          // If current credential file is elder than expected, sleep 1 
hour and check again.
    +          3600L * 1000
             }
    -      }
    -      val remainingTime = timeOfNextUpdate - System.currentTimeMillis()
    -      if (remainingTime <= 0) {
    -        // We just checked for new credentials but none were there, wait a 
minute and retry.
    -        // This handles the shutdown case where the staging directory may 
have been removed(see
    -        // SPARK-12316 for more details).
    -        credentialUpdater.schedule(credentialUpdaterRunnable, 1, 
TimeUnit.MINUTES)
    -      } else {
    -        logInfo(s"Scheduling credentials refresh from HDFS in 
$remainingTime millis.")
    -        credentialUpdater.schedule(
    -          credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS)
    +      }.getOrElse {
    +        // Wait for 1 minute to check again if there's no credential file 
currently
    +        60L * 1000
           }
         } catch {
           // Since the file may get deleted while we are reading it, catch the 
Exception and come
           // back in an hour to try again
           case NonFatal(e) =>
             logWarning("Error while trying to update credentials, will try 
again in 1 hour", e)
    -        credentialUpdater.schedule(credentialUpdaterRunnable, 1, 
TimeUnit.HOURS)
    +        3600L * 1000
    --- End diff --
    
    `TimeUnit...` you get the gist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to