Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14065#discussion_r73931365 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala --- @@ -53,50 +53,58 @@ private[spark] class CredentialUpdater( override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired()) } - @volatile private var timeOfNextUpdate = sparkConf.get(CREDENTIALS_UPDATE_TIME) + /** Start the credentail updater thread periodically to get the credentials */ + def bootstrap(): Unit = { + val bootstrapTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME) + val remainingTime = bootstrapTime - System.currentTimeMillis() + if (remainingTime <= 0) { + // We just checked for new credentials but none were there, wait a minute and retry. + // This handles the shutdown case where the staging directory may have been removed(see + // SPARK-12316 for more details). + credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES) + } else { + logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime millis.") + credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS) + } + } - def updateCredentialsIfRequired(): Unit = { - try { + private def updateCredentialsIfRequired(): Unit = { + val timeToNextUpdate = try { val credentialsFilePath = new Path(credentialsFile) val remoteFs = FileSystem.get(freshHadoopConf) SparkHadoopUtil.get.listFilesSorted( remoteFs, credentialsFilePath.getParent, credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) - .lastOption.foreach { credentialsStatus => + .lastOption.map { credentialsStatus => val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath) if (suffix > lastCredentialsFileSuffix) { logInfo("Reading new credentials from " + credentialsStatus.getPath) val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath) lastCredentialsFileSuffix = suffix UserGroupInformation.getCurrentUser.addCredentials(newCredentials) logInfo("Credentials updated from credentials file.") - timeOfNextUpdate = getTimeOfNextUpdateFromFileName(credentialsStatus.getPath) + + val remainingTime = + getTimeOfNextUpdateFromFileName(credentialsStatus.getPath) - System.currentTimeMillis() + if (remainingTime <= 0) 60L * 1000 else remainingTime } else { - // Check every hour to see if new credentials arrived. - logInfo("Updated credentials were expected, but the AM has not updated the " + - "credentials yet, will check again in an hour.") - credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.HOURS) - return + // If current credential file is elder than expected, sleep 1 hour and check again. + 3600L * 1000 } - } - val remainingTime = timeOfNextUpdate - System.currentTimeMillis() - if (remainingTime <= 0) { - // We just checked for new credentials but none were there, wait a minute and retry. - // This handles the shutdown case where the staging directory may have been removed(see - // SPARK-12316 for more details). - credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES) - } else { - logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime millis.") - credentialUpdater.schedule( - credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS) + }.getOrElse { + // Wait for 1 minute to check again if there's no credential file currently + 60L * 1000 } } catch { // Since the file may get deleted while we are reading it, catch the Exception and come // back in an hour to try again case NonFatal(e) => logWarning("Error while trying to update credentials, will try again in 1 hour", e) - credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.HOURS) + 3600L * 1000 --- End diff -- `TimeUnit...` you get the gist.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org