Repository: spark Updated Branches: refs/heads/master 6aad02d03 -> bfdc361ed
[SPARK-16742] Mesos Kerberos Support ## What changes were proposed in this pull request? Add Kerberos Support to Mesos. This includes kinit and --keytab support, but does not include delegation token renewal. ## How was this patch tested? Manually against a Secure DC/OS Apache HDFS cluster. Author: ArtRand <ar...@soe.ucsc.edu> Author: Michael Gummelt <mgumm...@mesosphere.io> Closes #18519 from mgummelt/SPARK-16742-kerberos. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfdc361e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfdc361e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfdc361e Branch: refs/heads/master Commit: bfdc361ededb2ed4e323f075fdc40ed004b7f41d Parents: 6aad02d Author: ArtRand <ar...@soe.ucsc.edu> Authored: Thu Aug 17 15:47:07 2017 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Aug 17 15:47:07 2017 -0700 ---------------------------------------------------------------------- .../apache/spark/deploy/SparkHadoopUtil.scala | 29 ++++++++++++--- .../org/apache/spark/deploy/SparkSubmit.scala | 38 +++++++++++++++----- .../security/HadoopDelegationTokenManager.scala | 8 +++++ .../executor/CoarseGrainedExecutorBackend.scala | 7 ++++ .../cluster/CoarseGrainedClusterMessage.scala | 3 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 33 ++++++++++++++--- resource-managers/mesos/pom.xml | 11 ++++++ .../MesosCoarseGrainedSchedulerBackend.scala | 12 ++++--- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 8 ----- 9 files changed, 117 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 2a92ef9..6d507d8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.{File, IOException} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException} import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -147,14 +147,18 @@ class SparkHadoopUtil extends Logging { def isYarnMode(): Boolean = { false } - def getCurrentUserCredentials(): Credentials = { null } - - def addCurrentUserCredentials(creds: Credentials) {} - def addSecretKeyToUserCredentials(key: String, secret: String) {} def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null } + def getCurrentUserCredentials(): Credentials = { + UserGroupInformation.getCurrentUser().getCredentials() + } + + def addCurrentUserCredentials(creds: Credentials): Unit = { + UserGroupInformation.getCurrentUser.addCredentials(creds) + } + def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = { if (!new File(keytabFilename).exists()) { throw new SparkException(s"Keytab file: ${keytabFilename} does not exist") @@ -425,6 +429,21 @@ class SparkHadoopUtil extends Logging { s"${if (status.isDirectory) "d" else "-"}$perm") false } + + def serialize(creds: Credentials): Array[Byte] = { + val byteStream = new ByteArrayOutputStream + val dataStream = new DataOutputStream(byteStream) + creds.writeTokenStorageToStream(dataStream) + byteStream.toByteArray + } + + def deserialize(tokenBytes: Array[Byte]): Credentials = { + val tokensBuf = new ByteArrayInputStream(tokenBytes) + + val creds = new Credentials() + creds.readTokenStorageStream(new DataInputStream(tokensBuf)) + creds + } } object SparkHadoopUtil { http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 6d744a0..e7e8fbc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor._ @@ -49,6 +50,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ +import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util._ @@ -556,19 +558,25 @@ object SparkSubmit extends CommandLineUtils { } // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL) { + if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) { if (args.principal != null) { - require(args.keytab != null, "Keytab must be specified when principal is specified") - SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab) - // Add keytab and principal configurations in sysProps to make them available - // for later use; e.g. in spark sql, the isolated class loader used to talk - // to HiveMetastore will use these settings. They will be set as Java system - // properties and then loaded by SparkConf - sysProps.put("spark.yarn.keytab", args.keytab) - sysProps.put("spark.yarn.principal", args.principal) + if (args.keytab != null) { + require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") + // Add keytab and principal configurations in sysProps to make them available + // for later use; e.g. in spark sql, the isolated class loader used to talk + // to HiveMetastore will use these settings. They will be set as Java system + // properties and then loaded by SparkConf + sysProps.put("spark.yarn.keytab", args.keytab) + sysProps.put("spark.yarn.principal", args.principal) + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) + } } } + if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { + setRMPrincipal(sysProps) + } + // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" @@ -653,6 +661,18 @@ object SparkSubmit extends CommandLineUtils { (childArgs, childClasspath, sysProps, childMainClass) } + // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with + // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we + // must trick it into thinking we're YARN. + private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = { + val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName + val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" + // scalastyle:off println + printStream.println(s"Setting ${key} to ${shortUserName}") + // scalastyle:off println + sysProps.put(key, shortUserName) + } + /** * Run the main method of the child class using the provided launch environment. * http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 01cbfe1..c317c4f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -55,6 +55,14 @@ private[spark] class HadoopDelegationTokenManager( logDebug(s"Using the following delegation token providers: " + s"${delegationTokenProviders.keys.mkString(", ")}.") + /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */ + def this(sparkConf: SparkConf, hadoopConf: Configuration) = { + this( + sparkConf, + hadoopConf, + hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf))) + } + private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = List(new HadoopFSDelegationTokenProvider(fileSystems), new HiveDelegationTokenProvider, http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index a2f1aa2..a5d60e9 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -26,6 +26,8 @@ import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal +import org.apache.hadoop.security.{Credentials, UserGroupInformation} + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil @@ -219,6 +221,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.startCredentialUpdater(driverConf) } + cfg.hadoopDelegationCreds.foreach { hadoopCreds => + val creds = SparkHadoopUtil.get.deserialize(hadoopCreds) + SparkHadoopUtil.get.addCurrentUserCredentials(creds) + } + val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 89a9ad6..5d65731 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -32,7 +32,8 @@ private[spark] object CoarseGrainedClusterMessages { case class SparkAppConfig( sparkProperties: Seq[(String, String)], - ioEncryptionKey: Option[Array[Byte]]) + ioEncryptionKey: Option[Array[Byte]], + hadoopDelegationCreds: Option[Array[Byte]]) extends CoarseGrainedClusterMessage case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a46824a..a0ef209 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -24,7 +24,11 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future +import org.apache.hadoop.security.UserGroupInformation + import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -42,8 +46,8 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} */ private[spark] class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) - extends ExecutorAllocationClient with SchedulerBackend with Logging -{ + extends ExecutorAllocationClient with SchedulerBackend with Logging { + // Use an atomic variable to track total number of cores in the cluster for simplicity and speed protected val totalCoreCount = new AtomicInteger(0) // Total number of executors that are currently registered @@ -95,6 +99,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + // hadoop token manager used by some sub-classes (e.g. Mesos) + def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None + + // Hadoop delegation tokens to be sent to the executors. + val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -223,8 +233,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) case RetrieveSparkAppConfig => - val reply = SparkAppConfig(sparkProperties, - SparkEnv.get.securityManager.getIOEncryptionKey()) + val reply = SparkAppConfig( + sparkProperties, + SparkEnv.get.securityManager.getIOEncryptionKey(), + hadoopDelegationCreds) context.reply(reply) } @@ -675,6 +687,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp driverEndpoint.send(KillExecutorsOnHost(host)) true } + + protected def getHadoopDelegationCreds(): Option[Array[Byte]] = { + if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) { + hadoopDelegationTokenManager.map { manager => + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + manager.obtainDelegationTokens(hadoopConf, creds) + SparkHadoopUtil.get.serialize(creds) + } + } else { + None + } + } } private[spark] object CoarseGrainedSchedulerBackend { http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/resource-managers/mesos/pom.xml ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml index 20b53f2..2aa3228 100644 --- a/resource-managers/mesos/pom.xml +++ b/resource-managers/mesos/pom.xml @@ -74,6 +74,17 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>${hive.group}</groupId> + <artifactId>hive-exec</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>${hive.group}</groupId> + <artifactId>hive-metastore</artifactId> + <scope>provided</scope> + </dependency> + <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive --> <dependency> <groupId>com.google.guava</groupId> http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index e6b0957..5ecd466 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -22,15 +22,15 @@ import java.util.{Collections, List => JList} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.ReentrantLock +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} +import org.apache.mesos.SchedulerDriver import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} -import org.apache.mesos.SchedulerDriver - import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.config._ +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -55,8 +55,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( master: String, securityManager: SecurityManager) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with org.apache.mesos.Scheduler - with MesosSchedulerUtils { + with org.apache.mesos.Scheduler with MesosSchedulerUtils { + + override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = + Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4fef439..3d9f99f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -74,14 +74,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } - override def getCurrentUserCredentials(): Credentials = { - UserGroupInformation.getCurrentUser().getCredentials() - } - - override def addCurrentUserCredentials(creds: Credentials) { - UserGroupInformation.getCurrentUser().addCredentials(creds) - } - override def addSecretKeyToUserCredentials(key: String, secret: String) { val creds = new Credentials() creds.addSecretKey(new Text(key), secret.getBytes(UTF_8)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org