Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4688#discussion_r27237949
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---
    @@ -82,6 +103,244 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
         if (credentials != null) credentials.getSecretKey(new Text(key)) else 
null
       }
     
    +  /*
    +   * The following methods are primarily meant to make sure long-running 
apps like Spark
    +   * Streaming apps can run without interruption while writing to secure 
HDFS. The
    +   * scheduleLoginFromKeytab method is called on the driver when the
    +   * CoarseGrainedScheduledBackend starts up. This method wakes up a 
thread that logs into the KDC
    +   * once 75% of the expiry time of the original delegation tokens used 
for the container
    +   * has elapsed. It then creates new delegation tokens and writes them to 
HDFS in a
    +   * pre-specified location - the prefix of which is specified in the 
sparkConf by
    +   * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 
etc. - each update goes
    +   * to a new file, with a monotonically increasing suffix). After this, 
the credentials are
    +   * updated once 75% of the new tokens validity has elapsed.
    +   *
    +   * On the executor side, the updateCredentialsIfRequired method is 
called once 80% of the
    +   * validity of the original tokens has elapsed. At that time the 
executor finds the
    +   * credentials file with the latest timestamp and checks if it has read 
those credentials
    +   * before (by keeping track of the suffix of the last file it read). If 
a new file has
    +   * appeared, it will read the credentials and update the currently 
running UGI with it. This
    +   * process happens again once 80% of the validity of this has expired.
    +   */
    +  private[spark] override def scheduleLoginFromKeytab(): Unit = {
    +    sparkConf.getOption("spark.yarn.principal").foreach { principal =>
    +      val keytab = sparkConf.get("spark.yarn.keytab")
    +
    +      def getRenewalInterval =
    +        math.max((0.75 * (getLatestValidity - 
System.currentTimeMillis())).toLong, 0L)
    +
    +      def scheduleRenewal(runnable: Runnable) = {
    +        val renewalInterval = getRenewalInterval
    +        logInfo(s"Scheduling login from keytab in $renewalInterval 
millis.")
    +        delegationTokenRenewer.schedule(runnable, renewalInterval, 
TimeUnit.MILLISECONDS)
    +      }
    +
    +      // This thread periodically runs on the driver to update the 
delegation tokens on HDFS.
    +      val driverTokenRenewerRunnable =
    +        new Runnable {
    +          override def run(): Unit = {
    +            try {
    +              writeNewTokensToHDFS(principal, keytab)
    +            } catch {
    +              case e: Exception =>
    +                logWarning("Failed to write out new credentials to HDFS, 
will try again in an " +
    +                  "hour! If this happens too often tasks will fail.", e)
    +                delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
    +                return
    +            }
    +            scheduleRenewal(this)
    +          }
    +        }
    +      // If this is an AM restart, it is possible that the original tokens 
have expired, which
    +      // means we need to login immediately to get new tokens.
    +      if (getRenewalInterval == 0) writeNewTokensToHDFS(principal, keytab)
    +      // Schedule update of credentials
    +      scheduleRenewal(driverTokenRenewerRunnable)
    +    }
    +  }
    +
    +  private def writeNewTokensToHDFS(principal: String, keytab: String): 
Unit = {
    +    if (!loggedInViaKeytab) {
    +      // Keytab is copied by YARN to the working directory of the AM, so 
full path is
    +      // not needed.
    +      logInfo(s"Attempting to login to KDC using principal: $principal")
    +      loggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
    +        principal, keytab)
    +      logInfo("Successfully logged into KDC.")
    +      loggedInViaKeytab = true
    +      // Not exactly sure when HDFS re-logs in, be safe and do it 
ourselves.
    +      // Periodically check and relogin this keytab. The UGI will take 
care of not relogging in
    +      // if it is not necessary to relogin.
    +      val reloginRunnable = new Runnable {
    +        override def run(): Unit = {
    +          try {
    +            loggedInUGI.checkTGTAndReloginFromKeytab()
    +          } catch {
    +            case e: Exception =>
    +              logError("Error while attempting tp relogin to KDC", e)
    +          }
    +        }
    +      }
    +      delegationTokenRenewer.schedule(reloginRunnable, 6, TimeUnit.HOURS)
    +    }
    +    val nns = getNameNodesToAccess(sparkConf)
    +    obtainTokensForNamenodes(nns, conf, loggedInUGI.getCredentials)
    +    val remoteFs = FileSystem.get(conf)
    +    // If lastCredentialsFileSuffix is 0, then the AM is either started or 
restarted. If the AM
    +    // was restarted, then the lastCredentialsFileSuffix might be > 0, so 
find the newest file
    +    // and update the lastCredentialsFileSuffix.
    +    if (lastCredentialsFileSuffix == 0) {
    +      val credentialsPath = new 
Path(sparkConf.get("spark.yarn.credentials.file"))
    +      listCredentialsFilesSorted(remoteFs, credentialsPath)
    +        .lastOption.foreach { status =>
    +        lastCredentialsFileSuffix = getSuffixForCredentialsPath(status)
    +      }
    +    }
    +    val nextSuffix = lastCredentialsFileSuffix + 1
    +    val tokenPathStr =
    +      sparkConf.get("spark.yarn.credentials.file") + "-" + nextSuffix
    +    val tokenPath = new Path(tokenPathStr)
    +    val tempTokenPath = new Path(tokenPathStr + ".tmp")
    +    logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
    +    val stream = Option(remoteFs.create(tempTokenPath, true))
    +    try {
    +      stream.foreach { s =>
    +        loggedInUGI.getCredentials.writeTokenStorageToStream(s)
    +        s.hflush()
    +        s.close()
    +        logInfo(s"Delegation Tokens written out successfully. Renaming 
file to $tokenPathStr")
    +        remoteFs.rename(tempTokenPath, tokenPath)
    --- End diff --
    
    so if I remember properly the FileSystem rename isn't atomic, but there is 
an atomic version in FileContext.  I'm not sure if that is why we are using 
multiple


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to