Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r229031795
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer 
APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default 
[[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] 
will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure 
long-running apps can run
    + * without interruption while accessing secured services. It periodically 
logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a 
configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often 
since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden 
in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if 
this config is set to
    - * false. For example, Hive's delegation token provider 
[[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration 
spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes 
that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop 
filesystems.
    + * 2. When operating without an explicit principal and keytab, token 
renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation 
tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the 
`obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but 
leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = 
"spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  require((principal == null) == (keytab == null),
    +    "Both principal and keytab must be defined, or neither.")
    +  require(keytab == null || new File(keytab).isFile(), s"Cannot find 
keytab at $keytab.")
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop 
filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => 
Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been 
configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that 
user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any 
periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the 
current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so 
when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driverEndpoint If provided, the driver where to send the newly 
generated tokens.
    +   *                       The same ref will also receive future token 
updates unless overridden
    +   *                       later.
    +   * @return The newly logged in user, or null if a principal is not 
configured.
    +   */
    +  def start(driverEndpoint: Option[RpcEndpointRef] = None): 
UserGroupInformation = {
    +    driverEndpoint.foreach(setDriverRef)
     
    -    // Filter out providers for which 
spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential 
Renewal Thread")
    +
    +      val originalCreds = 
UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, 
tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    +        driver.send(UpdateDelegationTokens(tokens))
    +      }
    +
    +      // Transfer the original user's tokens to the new user, since it may 
contain needed tokens
    +      // (such as those user to connect to YARN). Explicitly avoid 
overwriting tokens that already
    +      // exist in the current user's credentials, since those were freshly 
obtained above
    +      // (see SPARK-23361).
    +      val existing = ugi.getCredentials()
    +      existing.mergeAll(originalCreds)
    +      ugi.addCredentials(existing)
    +      ugi
    +    } else {
    +      driverEndpoint.foreach { driver =>
    --- End diff --
    
    This is the part of the current API that I dislike the most, but I really 
don't know how to make it better without fixing SPARK-25689 first.
    
    This at least keeps a single method to initialize delegation tokens, 
instead of having logic in the call sites about what method to call.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to