Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22624#discussion_r229031795 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -17,76 +17,175 @@ package org.apache.spark.deploy.security +import java.io.File +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.ThreadUtils /** - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]], - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not - * explicitly disabled. + * Manager for delegation tokens in a Spark application. + * + * This manager has two modes of operation: + * + * 1. When configured with a principal and a keytab, it will make sure long-running apps can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. + * + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op + * when the relogin is not yet needed. The check period can be overridden in the configuration. * - * Also, each HadoopDelegationTokenProvider is controlled by - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be - * enabled/disabled by the configuration spark.security.credentials.hive.enabled. + * New delegation tokens are created once 75% of the renewal interval of the original tokens has + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM. + * The driver is tasked with distributing the tokens to other processes that might need them. * - * @param sparkConf Spark configuration - * @param hadoopConf Hadoop configuration - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems. + * 2. When operating without an explicit principal and keytab, token renewal will not be available. + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark + * driver, but the app will not get new tokens when those expire. + * + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens` + * method. This option does not require calling the `start` method, but leaves it up to the + * caller to distribute the tokens that were generated. */ private[spark] class HadoopDelegationTokenManager( - sparkConf: SparkConf, - hadoopConf: Configuration, - fileSystems: Configuration => Set[FileSystem]) - extends Logging { + protected val sparkConf: SparkConf, + protected val hadoopConf: Configuration) extends Logging { private val deprecatedProviderEnabledConfigs = List( "spark.yarn.security.tokens.%s.enabled", "spark.yarn.security.credentials.%s.enabled") private val providerEnabledConfig = "spark.security.credentials.%s.enabled" - // Maintain all the registered delegation token providers - private val delegationTokenProviders = getDelegationTokenProviders + private val principal = sparkConf.get(PRINCIPAL).orNull + private val keytab = sparkConf.get(KEYTAB).orNull + + require((principal == null) == (keytab == null), + "Both principal and keytab must be defined, or neither.") + require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.") + + private val delegationTokenProviders = loadProviders() logDebug("Using the following builtin delegation token providers: " + s"${delegationTokenProviders.keys.mkString(", ")}.") - /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */ - def this(sparkConf: SparkConf, hadoopConf: Configuration) = { - this( - sparkConf, - hadoopConf, - hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf))) + private var renewalExecutor: ScheduledExecutorService = _ + private val driverRef = new AtomicReference[RpcEndpointRef]() + + protected def setDriverRef(ref: RpcEndpointRef): Unit = { + driverRef.set(ref) } - private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { - val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ - safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + /** + * Start the token renewer. Upon start, if a principal has been configured, the renewer will: + * + * - log in the configured principal, and set up a task to keep that user's ticket renewed + * - obtain delegation tokens from all available providers + * - schedule a periodic task to update the tokens when needed. + * + * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it + * will generate tokens if the driver ref has been provided, update the current user, and send + * those tokens to the driver. No future tokens will be generated, so when that initial set + * expires, the app will stop working. + * + * @param driverEndpoint If provided, the driver where to send the newly generated tokens. + * The same ref will also receive future token updates unless overridden + * later. + * @return The newly logged in user, or null if a principal is not configured. + */ + def start(driverEndpoint: Option[RpcEndpointRef] = None): UserGroupInformation = { + driverEndpoint.foreach(setDriverRef) - // Filter out providers for which spark.security.credentials.{service}.enabled is false. - providers - .filter { p => isServiceEnabled(p.serviceName) } - .map { p => (p.serviceName, p) } - .toMap + if (principal != null) { + renewalExecutor = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() + val ugi = doLogin() + + val tgtRenewalTask = new Runnable() { + override def run(): Unit = { + ugi.checkTGTAndReloginFromKeytab() + } + } + val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD) + renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod, + TimeUnit.SECONDS) + + val creds = obtainTokensAndScheduleRenewal(ugi) + ugi.addCredentials(creds) + + val driver = driverRef.get() + if (driver != null) { + val tokens = SparkHadoopUtil.get.serialize(creds) + driver.send(UpdateDelegationTokens(tokens)) + } + + // Transfer the original user's tokens to the new user, since it may contain needed tokens + // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already + // exist in the current user's credentials, since those were freshly obtained above + // (see SPARK-23361). + val existing = ugi.getCredentials() + existing.mergeAll(originalCreds) + ugi.addCredentials(existing) + ugi + } else { + driverEndpoint.foreach { driver => --- End diff -- This is the part of the current API that I dislike the most, but I really don't know how to make it better without fixing SPARK-25689 first. This at least keeps a single method to initialize delegation tokens, instead of having logic in the call sites about what method to call.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org