Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4688#discussion_r28285820
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala 
---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.{DataOutputStream, ByteArrayOutputStream}
    +import java.nio.ByteBuffer
    +import java.util.concurrent.{Executors, TimeUnit}
    +
    +import akka.actor.ActorSelection
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
    +import org.apache.hadoop.security.UserGroupInformation
    +
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.rpc.RpcEndpointRef
    +import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.NewTokens
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.util.{SerializableBuffer, Utils}
    +
    +/*
    + * The following methods are primarily meant to make sure long-running 
apps like Spark
    + * Streaming apps can run without interruption while writing to secure 
HDFS. The
    + * scheduleLoginFromKeytab method is called on the driver when the
    + * CoarseGrainedScheduledBackend starts up. This method wakes up a thread 
that logs into the KDC
    + * once 75% of the expiry time of the original delegation tokens used for 
the container
    + * has elapsed. It then creates new delegation tokens and writes them to 
HDFS in a
    + * pre-specified location - the prefix of which is specified in the 
sparkConf by
    + * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 
etc. - each update goes
    + * to a new file, with a monotonically increasing suffix). After this, the 
credentials are
    + * updated once 75% of the new tokens validity has elapsed.
    + *
    + * On the executor side, the updateCredentialsIfRequired method is called 
once 80% of the
    + * validity of the original tokens has elapsed. At that time the executor 
finds the
    + * credentials file with the latest timestamp and checks if it has read 
those credentials
    + * before (by keeping track of the suffix of the last file it read). If a 
new file has
    + * appeared, it will read the credentials and update the currently running 
UGI with it. This
    + * process happens again once 80% of the validity of this has expired.
    + */
    +class AMDelegationTokenRenewer(sparkConf: SparkConf, hadoopConf: 
Configuration) extends Logging {
    +
    +  private var lastCredentialsFileSuffix = 0
    +
    +  private lazy val delegationTokenRenewer =
    +    Executors.newSingleThreadScheduledExecutor(
    +      Utils.namedThreadFactory("Delegation Token Refresh Thread"))
    +
    +  private var loggedInViaKeytab = false
    +  var driverEndPoint: RpcEndpointRef = null
    +
    +  private lazy val hadoopUtil = YarnSparkHadoopUtil.get
    +
    +  /**
    +   * Schedule a login from the keytab and principal set using the 
--principal and --keytab
    +   * arguments to spark-submit. This login happens only when the 
credentials of the current user
    +   * are about to expire. This method reads SPARK_PRINCIPAL and 
SPARK_KEYTAB from the environment
    +   * to do the login. This method is a no-op in non-YARN mode.
    +   */
    +  private[spark] def scheduleLoginFromKeytab(): Unit = {
    +    sparkConf.getOption("spark.yarn.principal").foreach { principal =>
    +      val keytab = sparkConf.get("spark.yarn.keytab")
    +
    +      def getRenewalInterval: Long = {
    +        import scala.concurrent.duration._
    +        val credentials = 
UserGroupInformation.getCurrentUser.getCredentials
    +        val interval = (0.75 * 
(hadoopUtil.getLatestTokenValidity(credentials) -
    +          System.currentTimeMillis())).toLong
    +        // If only 6 hours left, then force a renewal immediately. This is 
to avoid tokens with
    +        // very less validity being used on AM restart.
    +        if ((interval millis).toHours <= 6) {
    +          0L
    +        } else {
    +          interval
    +        }
    +      }
    +
    +      def scheduleRenewal(runnable: Runnable): Unit = {
    +        val renewalInterval = getRenewalInterval
    +        logInfo(s"Scheduling login from keytab in $renewalInterval 
millis.")
    +        delegationTokenRenewer.schedule(runnable, renewalInterval, 
TimeUnit.MILLISECONDS)
    +      }
    +
    +      // This thread periodically runs on the driver to update the 
delegation tokens on HDFS.
    +      val driverTokenRenewerRunnable =
    +        new Runnable {
    +          override def run(): Unit = {
    +            var wroteNewFiles = false
    +            try {
    +              writeNewTokensToHDFS(principal, keytab)
    +              wroteNewFiles = true
    +              cleanupOldFiles()
    +            } catch {
    +              case e: Exception =>
    +                // If the exception was due to some issue deleting files, 
don't worry about it -
    +                // just try to clean up next time. Else, reschedule for an 
hour later so new
    +                // tokens get written out.
    +                if (!wroteNewFiles) {
    +                  logWarning("Failed to write out new credentials to HDFS, 
will try again in an " +
    +                    "hour! If this happens too often tasks will fail.", e)
    +                  delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
    +                  return
    +                } else {
    +                  logWarning("Error while attempting to clean up old 
delegation token files. " +
    +                    "Cleanup will be reattempted the next time new tokens 
are being written.")
    --- End diff --
    
    nit: nuke `being`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to