Github user ifilonenko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r227064934
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer 
APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default 
[[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] 
will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure 
long-running apps can run
    + * without interruption while accessing secured services. It periodically 
logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a 
configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often 
since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden 
in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if 
this config is set to
    - * false. For example, Hive's delegation token provider 
[[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration 
spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes 
that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop 
filesystems.
    + * 2. When operating without an explicit principal and keytab, token 
renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation 
tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the 
`obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but 
leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = 
"spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a 
keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop 
filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => 
Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been 
configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that 
user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any 
periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the 
current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so 
when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly 
generated tokens.
    +   *               The same ref will also receive future token updates 
unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not 
configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = 
{
    +    driver.foreach(setDriverRef)
     
    -    // Filter out providers for which 
spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential 
Renewal Thread")
    +
    +      val originalCreds = 
UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, 
tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    --- End diff --
    
    Can these tokens be packaged in a tuple or case class with the UGI? i.e. 
    `(tokens, ugi)` as a response for this method 
    Reason being, (I know it is preferred to use driver.send(token)), but logic 
in Kubernetes makes it simple to merely update the secret (as updates to 
Secrets are automatically detected by Driver + Executor pods and this logic is 
already being run natively by the k8s api). Unless we are being strict in being 
idiomatic to Spark, I think leveraging the benefits of Kubernetes here would be 
beneficial. I'll leave it up to you in deciding how executors should detect a 
newly updated token. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to