[SPARK-25876][K8S] Simplify kubernetes configuration types. There are a few issues with the current configuration types used in the kubernetes backend:
- they use type parameters for role-specific specialization, which makes type signatures really noisy throughout the code base. - they break encapsulation by forcing the code that creates the config object to remove the configuration from SparkConf before creating the k8s-specific wrapper. - they don't provide an easy way for tests to have default values for fields they do not use. This change fixes those problems by: - creating a base config type with role-specific specialization using inheritance - encapsulating the logic of parsing SparkConf into k8s-specific views inside the k8s config classes - providing some helper code for tests to easily override just the part of the configs they want. Most of the change relates to the above, especially cleaning up the tests. While doing that, I also made some smaller changes elsewhere: - removed unnecessary type parameters in KubernetesVolumeSpec - simplified the error detection logic in KubernetesVolumeUtils; all the call sites would just throw the first exception collected by that class, since they all called "get" on the "Try" object. Now the unnecessary wrapping is gone and the exception is just thrown where it occurs. - removed a lot of unnecessary mocking from tests. - changed the kerberos-related code so that less logic needs to live in the driver builder. In spirit it should be part of the upcoming work in this series of cleanups, but it made parts of this change simpler. Tested with existing unit tests and integration tests. Author: Marcelo Vanzin <van...@cloudera.com> Closes #22959 from vanzin/SPARK-25876. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6be272b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6be272b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6be272b7 Branch: refs/heads/master Commit: 6be272b75b4ae3149869e19df193675cc4117763 Parents: 8856e9f Author: Marcelo Vanzin <van...@cloudera.com> Authored: Fri Nov 30 16:23:37 2018 -0800 Committer: mcheah <mch...@palantir.com> Committed: Fri Nov 30 16:23:37 2018 -0800 ---------------------------------------------------------------------- .../org/apache/spark/deploy/k8s/Config.scala | 17 +- .../spark/deploy/k8s/KubernetesConf.scala | 302 +++++++++---------- .../spark/deploy/k8s/KubernetesVolumeSpec.scala | 10 +- .../deploy/k8s/KubernetesVolumeUtils.scala | 53 +--- .../k8s/features/BasicDriverFeatureStep.scala | 24 +- .../k8s/features/BasicExecutorFeatureStep.scala | 29 +- .../k8s/features/DriverCommandFeatureStep.scala | 22 +- ...DriverKubernetesCredentialsFeatureStep.scala | 6 +- .../k8s/features/DriverServiceFeatureStep.scala | 10 +- .../k8s/features/EnvSecretsFeatureStep.scala | 11 +- .../HadoopConfExecutorFeatureStep.scala | 14 +- .../HadoopSparkUserExecutorFeatureStep.scala | 17 +- .../KerberosConfDriverFeatureStep.scala | 113 ++++--- .../KerberosConfExecutorFeatureStep.scala | 21 +- .../k8s/features/LocalDirsFeatureStep.scala | 9 +- .../k8s/features/MountSecretsFeatureStep.scala | 13 +- .../k8s/features/MountVolumesFeatureStep.scala | 11 +- .../k8s/features/PodTemplateConfigMapStep.scala | 5 +- .../hadooputils/HadoopKerberosLogin.scala | 64 ---- ...KubernetesHadoopDelegationTokenManager.scala | 37 --- .../submit/KubernetesClientApplication.scala | 61 +--- .../k8s/submit/KubernetesDriverBuilder.scala | 53 ++-- .../cluster/k8s/KubernetesExecutorBuilder.scala | 36 +-- .../spark/deploy/k8s/KubernetesConfSuite.scala | 71 ++--- .../spark/deploy/k8s/KubernetesTestConf.scala | 138 +++++++++ .../deploy/k8s/KubernetesVolumeUtilsSuite.scala | 30 +- .../features/BasicDriverFeatureStepSuite.scala | 127 ++------ .../BasicExecutorFeatureStepSuite.scala | 103 ++----- .../DriverCommandFeatureStepSuite.scala | 29 +- ...rKubernetesCredentialsFeatureStepSuite.scala | 69 +---- .../DriverServiceFeatureStepSuite.scala | 193 ++++-------- .../features/EnvSecretsFeatureStepSuite.scala | 32 +- .../features/LocalDirsFeatureStepSuite.scala | 46 +-- .../features/MountSecretsFeatureStepSuite.scala | 21 +- .../features/MountVolumesFeatureStepSuite.scala | 56 ++-- .../PodTemplateConfigMapStepSuite.scala | 28 +- .../spark/deploy/k8s/submit/ClientSuite.scala | 47 +-- .../submit/KubernetesDriverBuilderSuite.scala | 204 ++----------- .../k8s/ExecutorPodsAllocatorSuite.scala | 43 +-- .../k8s/KubernetesExecutorBuilderSuite.scala | 114 ++----- 40 files changed, 777 insertions(+), 1512 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 724acd2..1abf290 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -60,7 +60,8 @@ private[spark] object Config extends Logging { .doc("Comma separated list of the Kubernetes secrets used " + "to access private image registries.") .stringConf - .createOptional + .toSequence + .createWithDefault(Nil) val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" @@ -112,16 +113,16 @@ private[spark] object Config extends Logging { .stringConf .createOptional - val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = - ConfigBuilder("spark.kubernetes.executor.podNamePrefix") - .doc("Prefix to use in front of the executor pod names.") + // For testing only. + val KUBERNETES_DRIVER_POD_NAME_PREFIX = + ConfigBuilder("spark.kubernetes.driver.resourceNamePrefix") .internal() .stringConf - .createWithDefault("spark") + .createOptional - val KUBERNETES_PYSPARK_PY_FILES = - ConfigBuilder("spark.kubernetes.python.pyFiles") - .doc("The PyFiles that are distributed via client arguments") + val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = + ConfigBuilder("spark.kubernetes.executor.podNamePrefix") + .doc("Prefix to use in front of the executor pod names.") .internal() .stringConf .createOptional http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index ebb8154..a06c21b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -16,93 +16,53 @@ */ package org.apache.spark.deploy.k8s -import scala.collection.mutable +import java.util.Locale import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod} -import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager import org.apache.spark.deploy.k8s.submit._ -import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.util.Utils - -private[spark] sealed trait KubernetesRoleSpecificConf - -/* - * Structure containing metadata for Kubernetes logic that builds a Spark driver. - */ -private[spark] case class KubernetesDriverSpecificConf( - mainAppResource: MainAppResource, - mainClass: String, - appName: String, - appArgs: Seq[String], - pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf { - - require(mainAppResource != null, "Main resource must be provided.") - -} - -/* - * Structure containing metadata for Kubernetes logic that builds a Spark executor. - */ -private[spark] case class KubernetesExecutorSpecificConf( - executorId: String, - driverPod: Option[Pod]) - extends KubernetesRoleSpecificConf - -/* - * Structure containing metadata for HADOOP_CONF_DIR customization - */ -private[spark] case class HadoopConfSpec( - hadoopConfDir: Option[String], - hadoopConfigMapName: Option[String]) - /** * Structure containing metadata for Kubernetes logic to build Spark pods. */ -private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( - sparkConf: SparkConf, - roleSpecificConf: T, - appResourceNamePrefix: String, - appId: String, - roleLabels: Map[String, String], - roleAnnotations: Map[String, String], - roleSecretNamesToMountPaths: Map[String, String], - roleSecretEnvNamesToKeyRefs: Map[String, String], - roleEnvs: Map[String, String], - roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]], - hadoopConfSpec: Option[HadoopConfSpec]) { +private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) { - def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config" + val resourceNamePrefix: String + def labels: Map[String, String] + def environment: Map[String, String] + def annotations: Map[String, String] + def secretEnvNamesToKeyRefs: Map[String, String] + def secretNamesToMountPaths: Map[String, String] + def volumes: Seq[KubernetesVolumeSpec] - def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file" + def appName: String = get("spark.app.name", "spark") - def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager = - new KubernetesHadoopDelegationTokenManager(conf, hConf) + def hadoopConfigMapName: String = s"$resourceNamePrefix-hadoop-config" - def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) + def krbConfigMapName: String = s"$resourceNamePrefix-krb5-file" - def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) + def namespace: String = get(KUBERNETES_NAMESPACE) - def imagePullSecrets(): Seq[LocalObjectReference] = { + def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY) + + def imagePullSecrets: Seq[LocalObjectReference] = { sparkConf .get(IMAGE_PULL_SECRETS) - .map(_.split(",")) - .getOrElse(Array.empty[String]) - .map(_.trim) .map { secret => new LocalObjectReferenceBuilder().withName(secret).build() } } - def nodeSelector(): Map[String, String] = + def nodeSelector: Map[String, String] = KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) + def contains(config: ConfigEntry[_]): Boolean = sparkConf.contains(config) + def get[T](config: ConfigEntry[T]): T = sparkConf.get(config) def get(conf: String): String = sparkConf.get(conf) @@ -112,125 +72,139 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def getOption(key: String): Option[String] = sparkConf.getOption(key) } +private[spark] class KubernetesDriverConf( + sparkConf: SparkConf, + val appId: String, + val mainAppResource: MainAppResource, + val mainClass: String, + val appArgs: Array[String], + val pyFiles: Seq[String]) + extends KubernetesConf(sparkConf) { + + override val resourceNamePrefix: String = { + val custom = if (Utils.isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX) else None + custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName)) + } + + override def labels: Map[String, String] = { + val presetLabels = Map( + SPARK_APP_ID_LABEL -> appId, + SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) + val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( + sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) + + presetLabels.keys.foreach { key => + require( + !driverCustomLabels.contains(key), + s"Label with key $key is not allowed as it is reserved for Spark bookkeeping operations.") + } + + driverCustomLabels ++ presetLabels + } + + override def environment: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ENV_PREFIX) + } + + override def annotations: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) + } + + override def secretNamesToMountPaths: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX) + } + + override def secretEnvNamesToKeyRefs: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX) + } + + override def volumes: Seq[KubernetesVolumeSpec] = { + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX) + } +} + +private[spark] class KubernetesExecutorConf( + sparkConf: SparkConf, + val appId: String, + val executorId: String, + val driverPod: Option[Pod]) + extends KubernetesConf(sparkConf) { + + override val resourceNamePrefix: String = { + get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX).getOrElse( + KubernetesConf.getResourceNamePrefix(appName)) + } + + override def labels: Map[String, String] = { + val presetLabels = Map( + SPARK_EXECUTOR_ID_LABEL -> executorId, + SPARK_APP_ID_LABEL -> appId, + SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) + + val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( + sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) + + presetLabels.keys.foreach { key => + require( + !executorCustomLabels.contains(key), + s"Custom executor labels cannot contain $key as it is reserved for Spark.") + } + + executorCustomLabels ++ presetLabels + } + + override def environment: Map[String, String] = sparkConf.getExecutorEnv.toMap + + override def annotations: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + } + + override def secretNamesToMountPaths: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX) + } + + override def secretEnvNamesToKeyRefs: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX) + } + + override def volumes: Seq[KubernetesVolumeSpec] = { + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) + } + +} + private[spark] object KubernetesConf { def createDriverConf( sparkConf: SparkConf, - appName: String, - appResourceNamePrefix: String, appId: String, mainAppResource: MainAppResource, mainClass: String, appArgs: Array[String], - maybePyFiles: Option[String], - hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) - require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + - s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " + - "operations.") - require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " + - s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " + - "operations.") - val driverLabels = driverCustomLabels ++ Map( - SPARK_APP_ID_LABEL -> appId, - SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) - val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) - val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX) - val driverSecretEnvNamesToKeyRefs = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX) - val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_ENV_PREFIX) - val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( - sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get) - // Also parse executor volumes in order to verify configuration - // before the driver pod is created - KubernetesVolumeUtils.parseVolumesWithPrefix( - sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) - - val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) - KubernetesUtils.requireNandDefined( - hadoopConfDir, - hadoopConfigMapName, - "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " + - "as the creation of an additional ConfigMap, when one is already specified is extraneous" ) - val hadoopConfSpec = - if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) { - Some(HadoopConfSpec(hadoopConfDir, hadoopConfigMapName)) - } else { - None - } - val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil) + maybePyFiles: Option[String]): KubernetesDriverConf = { + // Parse executor volumes in order to verify configuration before the driver pod is created. + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) - - KubernetesConf( - sparkConf.clone(), - KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs, pyFiles), - appResourceNamePrefix, - appId, - driverLabels, - driverAnnotations, - driverSecretNamesToMountPaths, - driverSecretEnvNamesToKeyRefs, - driverEnvs, - driverVolumes, - hadoopConfSpec) + val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil) + new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, mainClass, appArgs, + pyFiles) } def createExecutorConf( sparkConf: SparkConf, executorId: String, appId: String, - driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] = { - val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) - require( - !executorCustomLabels.contains(SPARK_APP_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") - require( - !executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + - " Spark.") - require( - !executorCustomLabels.contains(SPARK_ROLE_LABEL), - s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") - val executorLabels = Map( - SPARK_EXECUTOR_ID_LABEL -> executorId, - SPARK_APP_ID_LABEL -> appId, - SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ - executorCustomLabels - val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) - val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX) - val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX) - val executorEnv = sparkConf.getExecutorEnv.toMap - val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( - sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) - - // If no prefix is defined then we are in pure client mode - // (not the one used by cluster mode inside the container) - val appResourceNamePrefix = { - if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) { - getResourceNamePrefix(getAppName(sparkConf)) - } else { - sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) - } - } + driverPod: Option[Pod]): KubernetesExecutorConf = { + new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod) + } - KubernetesConf( - sparkConf.clone(), - KubernetesExecutorSpecificConf(executorId, driverPod), - appResourceNamePrefix, - appId, - executorLabels, - executorAnnotations, - executorMountSecrets, - executorEnvSecrets, - executorEnv, - executorVolumes, - None) + def getResourceNamePrefix(appName: String): String = { + val launchTime = System.currentTimeMillis() + s"$appName-$launchTime" + .trim + .toLowerCase(Locale.ROOT) + .replaceAll("\\s+", "-") + .replaceAll("\\.", "-") + .replaceAll("[^a-z0-9\\-]", "") + .replaceAll("-+", "-") } } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index 1a214fa..0ebe8fd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -18,12 +18,10 @@ package org.apache.spark.deploy.k8s private[spark] sealed trait KubernetesVolumeSpecificConf -private[spark] case class KubernetesHostPathVolumeConf( - hostPath: String) +private[spark] case class KubernetesHostPathVolumeConf(hostPath: String) extends KubernetesVolumeSpecificConf -private[spark] case class KubernetesPVCVolumeConf( - claimName: String) +private[spark] case class KubernetesPVCVolumeConf(claimName: String) extends KubernetesVolumeSpecificConf private[spark] case class KubernetesEmptyDirVolumeConf( @@ -31,9 +29,9 @@ private[spark] case class KubernetesEmptyDirVolumeConf( sizeLimit: Option[String]) extends KubernetesVolumeSpecificConf -private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf]( +private[spark] case class KubernetesVolumeSpec( volumeName: String, mountPath: String, mountSubPath: String, mountReadOnly: Boolean, - volumeConf: T) + volumeConf: KubernetesVolumeSpecificConf) http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index 1553264..c0c4f86 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -16,10 +16,6 @@ */ package org.apache.spark.deploy.k8s -import java.util.NoSuchElementException - -import scala.util.{Failure, Success, Try} - import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ @@ -31,9 +27,7 @@ private[spark] object KubernetesVolumeUtils { * @param prefix the given property name prefix * @return a Map storing with volume name as key and spec as value */ - def parseVolumesWithPrefix( - sparkConf: SparkConf, - prefix: String): Iterable[Try[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]]] = { + def parseVolumesWithPrefix(sparkConf: SparkConf, prefix: String): Seq[KubernetesVolumeSpec] = { val properties = sparkConf.getAllWithPrefix(prefix).toMap getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) => @@ -41,17 +35,13 @@ private[spark] object KubernetesVolumeUtils { val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY" val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY" - for { - path <- properties.getTry(pathKey) - volumeConf <- parseVolumeSpecificConf(properties, volumeType, volumeName) - } yield KubernetesVolumeSpec( + KubernetesVolumeSpec( volumeName = volumeName, - mountPath = path, + mountPath = properties(pathKey), mountSubPath = properties.get(subPathKey).getOrElse(""), mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean), - volumeConf = volumeConf - ) - } + volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName)) + }.toSeq } /** @@ -61,9 +51,7 @@ private[spark] object KubernetesVolumeUtils { * @param properties flat mapping of property names to values * @return Set[(volumeType, volumeName)] */ - private def getVolumeTypesAndNames( - properties: Map[String, String] - ): Set[(String, String)] = { + private def getVolumeTypesAndNames(properties: Map[String, String]): Set[(String, String)] = { properties.keys.flatMap { k => k.split('.').toList match { case tpe :: name :: _ => Some((tpe, name)) @@ -73,40 +61,25 @@ private[spark] object KubernetesVolumeUtils { } private def parseVolumeSpecificConf( - options: Map[String, String], - volumeType: String, - volumeName: String): Try[KubernetesVolumeSpecificConf] = { + options: Map[String, String], + volumeType: String, + volumeName: String): KubernetesVolumeSpecificConf = { volumeType match { case KUBERNETES_VOLUMES_HOSTPATH_TYPE => val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY" - for { - path <- options.getTry(pathKey) - } yield KubernetesHostPathVolumeConf(path) + KubernetesHostPathVolumeConf(options(pathKey)) case KUBERNETES_VOLUMES_PVC_TYPE => val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY" - for { - claimName <- options.getTry(claimNameKey) - } yield KubernetesPVCVolumeConf(claimName) + KubernetesPVCVolumeConf(options(claimNameKey)) case KUBERNETES_VOLUMES_EMPTYDIR_TYPE => val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY" val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY" - Success(KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey))) + KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey)) case _ => - Failure(new RuntimeException(s"Kubernetes Volume type `$volumeType` is not supported")) - } - } - - /** - * Convenience wrapper to accumulate key lookup errors - */ - implicit private class MapOps[A, B](m: Map[A, B]) { - def getTry(key: A): Try[B] = { - m - .get(key) - .fold[Try[B]](Failure(new NoSuchElementException(key.toString)))(Success(_)) + throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 5ddf73c..d8cf365 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -30,13 +30,12 @@ import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils -private[spark] class BasicDriverFeatureStep( - conf: KubernetesConf[KubernetesDriverSpecificConf]) +private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep { private val driverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) - .getOrElse(s"${conf.appResourceNamePrefix}-driver") + .getOrElse(s"${conf.resourceNamePrefix}-driver") private val driverContainerImage = conf .get(DRIVER_CONTAINER_IMAGE) @@ -52,8 +51,8 @@ private[spark] class BasicDriverFeatureStep( // The memory overhead factor to use. If the user has not set it, then use a different // value for non-JVM apps. This value is propagated to executors. private val overheadFactor = - if (conf.roleSpecificConf.mainAppResource.isInstanceOf[NonJVMResource]) { - if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) { + if (conf.mainAppResource.isInstanceOf[NonJVMResource]) { + if (conf.contains(MEMORY_OVERHEAD_FACTOR)) { conf.get(MEMORY_OVERHEAD_FACTOR) } else { NON_JVM_MEMORY_OVERHEAD_FACTOR @@ -68,8 +67,7 @@ private[spark] class BasicDriverFeatureStep( private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { - val driverCustomEnvs = conf.roleEnvs - .toSeq + val driverCustomEnvs = conf.environment.toSeq .map { env => new EnvVarBuilder() .withName(env._1) @@ -96,7 +94,7 @@ private[spark] class BasicDriverFeatureStep( val driverContainer = new ContainerBuilder(pod.container) .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME)) .withImage(driverContainerImage) - .withImagePullPolicy(conf.imagePullPolicy()) + .withImagePullPolicy(conf.imagePullPolicy) .addNewPort() .withName(DRIVER_PORT_NAME) .withContainerPort(driverPort) @@ -130,13 +128,13 @@ private[spark] class BasicDriverFeatureStep( val driverPod = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(driverPodName) - .addToLabels(conf.roleLabels.asJava) - .addToAnnotations(conf.roleAnnotations.asJava) + .addToLabels(conf.labels.asJava) + .addToAnnotations(conf.annotations.asJava) .endMetadata() .editOrNewSpec() .withRestartPolicy("Never") - .addToNodeSelector(conf.nodeSelector().asJava) - .addToImagePullSecrets(conf.imagePullSecrets(): _*) + .addToNodeSelector(conf.nodeSelector.asJava) + .addToImagePullSecrets(conf.imagePullSecrets: _*) .endSpec() .build() @@ -147,7 +145,7 @@ private[spark] class BasicDriverFeatureStep( val additionalProps = mutable.Map( KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, "spark.app.id" -> conf.appId, - KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix, + KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix, KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 7f397e6..8bf3152 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -29,8 +29,7 @@ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils -private[spark] class BasicExecutorFeatureStep( - kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) +private[spark] class BasicExecutorFeatureStep(kubernetesConf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep { // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf @@ -42,7 +41,7 @@ private[spark] class BasicExecutorFeatureStep( .sparkConf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) - private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix + private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix private val driverUrl = RpcEndpointAddress( kubernetesConf.get("spark.driver.host"), @@ -76,7 +75,7 @@ private[spark] class BasicExecutorFeatureStep( private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) override def configurePod(pod: SparkPod): SparkPod = { - val name = s"$executorPodNamePrefix-exec-${kubernetesConf.roleSpecificConf.executorId}" + val name = s"$executorPodNamePrefix-exec-${kubernetesConf.executorId}" // hostname must be no longer than 63 characters, so take the last 63 characters of the pod // name as the hostname. This preserves uniqueness since the end of name contains @@ -98,7 +97,7 @@ private[spark] class BasicExecutorFeatureStep( .get(EXECUTOR_JAVA_OPTIONS) .map { opts => val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId, - kubernetesConf.roleSpecificConf.executorId) + kubernetesConf.executorId) val delimitedOpts = Utils.splitCommandString(subsOpts) delimitedOpts.zipWithIndex.map { case (opt, index) => @@ -112,8 +111,8 @@ private[spark] class BasicExecutorFeatureStep( (ENV_APPLICATION_ID, kubernetesConf.appId), // This is to set the SPARK_CONF_DIR to be /opt/spark/conf (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL), - (ENV_EXECUTOR_ID, kubernetesConf.roleSpecificConf.executorId)) ++ - kubernetesConf.roleEnvs) + (ENV_EXECUTOR_ID, kubernetesConf.executorId)) ++ + kubernetesConf.environment) .map(env => new EnvVarBuilder() .withName(env._1) .withValue(env._2) @@ -138,7 +137,7 @@ private[spark] class BasicExecutorFeatureStep( val executorContainer = new ContainerBuilder(pod.container) .withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)) .withImage(executorContainerImage) - .withImagePullPolicy(kubernetesConf.imagePullPolicy()) + .withImagePullPolicy(kubernetesConf.imagePullPolicy) .editOrNewResources() .addToRequests("memory", executorMemoryQuantity) .addToLimits("memory", executorMemoryQuantity) @@ -158,27 +157,27 @@ private[spark] class BasicExecutorFeatureStep( .endResources() .build() }.getOrElse(executorContainer) - val driverPod = kubernetesConf.roleSpecificConf.driverPod - val ownerReference = driverPod.map(pod => + val ownerReference = kubernetesConf.driverPod.map { pod => new OwnerReferenceBuilder() .withController(true) .withApiVersion(pod.getApiVersion) .withKind(pod.getKind) .withName(pod.getMetadata.getName) .withUid(pod.getMetadata.getUid) - .build()) + .build() + } val executorPod = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(name) - .addToLabels(kubernetesConf.roleLabels.asJava) - .addToAnnotations(kubernetesConf.roleAnnotations.asJava) + .addToLabels(kubernetesConf.labels.asJava) + .addToAnnotations(kubernetesConf.annotations.asJava) .addToOwnerReferences(ownerReference.toSeq: _*) .endMetadata() .editOrNewSpec() .withHostname(hostname) .withRestartPolicy("Never") - .addToNodeSelector(kubernetesConf.nodeSelector().asJava) - .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) + .addToNodeSelector(kubernetesConf.nodeSelector.asJava) + .addToImagePullSecrets(kubernetesConf.imagePullSecrets: _*) .endSpec() .build() http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala index 8b8f0d0..76b4ec9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -32,13 +32,11 @@ import org.apache.spark.util.Utils * Creates the driver command for running the user app, and propagates needed configuration so * executors can also find the app code. */ -private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDriverSpecificConf]) +private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep { - private val driverConf = conf.roleSpecificConf - override def configurePod(pod: SparkPod): SparkPod = { - driverConf.mainAppResource match { + conf.mainAppResource match { case JavaMainAppResource(_) => configureForJava(pod) @@ -51,7 +49,7 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDri } override def getAdditionalPodSystemProperties(): Map[String, String] = { - driverConf.mainAppResource match { + conf.mainAppResource match { case JavaMainAppResource(res) => res.map(additionalJavaProperties).getOrElse(Map.empty) @@ -71,10 +69,10 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDri } private def configureForPython(pod: SparkPod, res: String): SparkPod = { - val maybePythonFiles = if (driverConf.pyFiles.nonEmpty) { + val maybePythonFiles = if (conf.pyFiles.nonEmpty) { // Delineation by ":" is to append the PySpark Files to the PYTHONPATH // of the respective PySpark pod - val resolved = KubernetesUtils.resolveFileUrisAndPath(driverConf.pyFiles) + val resolved = KubernetesUtils.resolveFileUrisAndPath(conf.pyFiles) Some(new EnvVarBuilder() .withName(ENV_PYSPARK_FILES) .withValue(resolved.mkString(":")) @@ -85,7 +83,7 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDri val pythonEnvs = Seq(new EnvVarBuilder() .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) - .withValue(conf.sparkConf.get(PYSPARK_MAJOR_PYTHON_VERSION)) + .withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION)) .build()) ++ maybePythonFiles @@ -105,9 +103,9 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDri new ContainerBuilder(pod.container) .addToArgs("driver") .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", driverConf.mainClass) + .addToArgs("--class", conf.mainClass) .addToArgs(resource) - .addToArgs(driverConf.appArgs: _*) + .addToArgs(conf.appArgs: _*) } private def additionalJavaProperties(resource: String): Map[String, String] = { @@ -116,7 +114,7 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDri private def additionalPythonProperties(resource: String): Map[String, String] = { resourceType(APP_RESOURCE_TYPE_PYTHON) ++ - mergeFileList("spark.files", Seq(resource) ++ driverConf.pyFiles) + mergeFileList("spark.files", Seq(resource) ++ conf.pyFiles) } private def additionalRProperties(resource: String): Map[String, String] = { @@ -124,7 +122,7 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDri } private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = { - val existing = Utils.stringToSeq(conf.sparkConf.get(key, "")) + val existing = Utils.stringToSeq(conf.get(key, "")) Map(key -> (existing ++ filesToAdd).distinct.mkString(",")) } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala index ff5ad66..795ca49 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala @@ -28,7 +28,7 @@ import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf[_]) +private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf) extends KubernetesFeatureConfigStep { // TODO clean up this class, and credentials in general. See also SparkKubernetesClientFactory. // We should use a struct to hold all creds-related fields. A lot of the code is very repetitive. @@ -66,7 +66,7 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube clientCertDataBase64.isDefined private val driverCredentialsSecretName = - s"${kubernetesConf.appResourceNamePrefix}-kubernetes-credentials" + s"${kubernetesConf.resourceNamePrefix}-kubernetes-credentials" override def configurePod(pod: SparkPod): SparkPod = { if (!shouldMountSecret) { @@ -122,7 +122,7 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube val redactedTokens = kubernetesConf.sparkConf.getAll .filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX)) .toMap - .mapValues( _ => "<present_but_redacted>") + .map { case (k, v) => (k, "<present_but_redacted>") } redactedTokens ++ resolvedMountedCaCertFile.map { file => Map( http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index f2d7bbd..4230545 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -20,13 +20,13 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.{Clock, SystemClock} private[spark] class DriverServiceFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf], + kubernetesConf: KubernetesDriverConf, clock: Clock = new SystemClock) extends KubernetesFeatureConfigStep with Logging { import DriverServiceFeatureStep._ @@ -38,7 +38,7 @@ private[spark] class DriverServiceFeatureStep( s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " + "managed via a Kubernetes service.") - private val preferredServiceName = s"${kubernetesConf.appResourceNamePrefix}$DRIVER_SVC_POSTFIX" + private val preferredServiceName = s"${kubernetesConf.resourceNamePrefix}$DRIVER_SVC_POSTFIX" private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { preferredServiceName } else { @@ -58,7 +58,7 @@ private[spark] class DriverServiceFeatureStep( override def configurePod(pod: SparkPod): SparkPod = pod override def getAdditionalPodSystemProperties(): Map[String, String] = { - val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc" + val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace}.svc" Map(DRIVER_HOST_KEY -> driverHostname, "spark.driver.port" -> driverPort.toString, org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key -> @@ -72,7 +72,7 @@ private[spark] class DriverServiceFeatureStep( .endMetadata() .withNewSpec() .withClusterIP("None") - .withSelector(kubernetesConf.roleLabels.asJava) + .withSelector(kubernetesConf.labels.asJava) .addNewPort() .withName(DRIVER_PORT_NAME) .withPort(driverPort) http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala index 03ff7d4..d78f04d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala @@ -20,14 +20,13 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} -private[spark] class EnvSecretsFeatureStep( - kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) +private[spark] class EnvSecretsFeatureStep(kubernetesConf: KubernetesConf) extends KubernetesFeatureConfigStep { override def configurePod(pod: SparkPod): SparkPod = { val addedEnvSecrets = kubernetesConf - .roleSecretEnvNamesToKeyRefs + .secretEnvNamesToKeyRefs .map{ case (envName, keyRef) => // Keyref parts val keyRefParts = keyRef.split(":") @@ -50,8 +49,4 @@ private[spark] class EnvSecretsFeatureStep( .build() SparkPod(pod.pod, containerWithEnvVars) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala index fd09de2..bca6675 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala @@ -16,9 +16,7 @@ */ package org.apache.spark.deploy.k8s.features -import io.fabric8.kubernetes.api.model.HasMetadata - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil import org.apache.spark.internal.Logging @@ -28,21 +26,15 @@ import org.apache.spark.internal.Logging * containing Hadoop config files mounted as volumes and an ENV variable * pointed to the mounted file directory. */ -private[spark] class HadoopConfExecutorFeatureStep( - kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) +private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep with Logging { override def configurePod(pod: SparkPod): SparkPod = { - val sparkConf = kubernetesConf.sparkConf - val hadoopConfDirCMapName = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME) + val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME) require(hadoopConfDirCMapName.isDefined, "Ensure that the env `HADOOP_CONF_DIR` is defined either in the client or " + " using pre-existing ConfigMaps") logInfo("HADOOP_CONF_DIR defined") HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala index 5b6a6d5..e342110 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala @@ -16,28 +16,19 @@ */ package org.apache.spark.deploy.k8s.features -import io.fabric8.kubernetes.api.model.HasMetadata - -import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil -import org.apache.spark.internal.Logging /** * This step is responsible for setting ENV_SPARK_USER when HADOOP_FILES are detected * however, this step would not be run if Kerberos is enabled, as Kerberos sets SPARK_USER */ -private[spark] class HadoopSparkUserExecutorFeatureStep( - kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) - extends KubernetesFeatureConfigStep with Logging { +private[spark] class HadoopSparkUserExecutorFeatureStep(conf: KubernetesExecutorConf) + extends KubernetesFeatureConfigStep { override def configurePod(pod: SparkPod): SparkPod = { - val sparkUserName = kubernetesConf.sparkConf.get(KERBEROS_SPARK_USER_NAME) + val sparkUserName = conf.get(KERBEROS_SPARK_USER_NAME) HadoopBootstrapUtil.bootstrapSparkUserPod(sparkUserName, pod) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala index ce47933..c6d5a86 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala @@ -16,40 +16,43 @@ */ package org.apache.spark.deploy.k8s.features -import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.api.model.{HasMetadata, Secret, SecretBuilder} +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf import org.apache.spark.deploy.k8s.features.hadooputils._ -import org.apache.spark.internal.Logging +import org.apache.spark.deploy.security.HadoopDelegationTokenManager /** * Runs the necessary Hadoop-based logic based on Kerberos configs and the presence of the * HADOOP_CONF_DIR. This runs various bootstrap methods defined in HadoopBootstrapUtil. */ -private[spark] class KerberosConfDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep with Logging { - - require(kubernetesConf.hadoopConfSpec.isDefined, - "Ensure that HADOOP_CONF_DIR is defined either via env or a pre-defined ConfigMap") - private val hadoopConfDirSpec = kubernetesConf.hadoopConfSpec.get - private val conf = kubernetesConf.sparkConf - private val principal = conf.get(org.apache.spark.internal.config.PRINCIPAL) - private val keytab = conf.get(org.apache.spark.internal.config.KEYTAB) - private val existingSecretName = conf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME) - private val existingSecretItemKey = conf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY) - private val krb5File = conf.get(KUBERNETES_KERBEROS_KRB5_FILE) - private val krb5CMap = conf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP) - private val kubeTokenManager = kubernetesConf.tokenManager(conf, - SparkHadoopUtil.get.newConfiguration(conf)) +private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDriverConf) + extends KubernetesFeatureConfigStep { + + private val hadoopConfDir = Option(kubernetesConf.sparkConf.getenv(ENV_HADOOP_CONF_DIR)) + private val hadoopConfigMapName = kubernetesConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) + KubernetesUtils.requireNandDefined( + hadoopConfDir, + hadoopConfigMapName, + "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " + + "as the creation of an additional ConfigMap, when one is already specified is extraneous") + + private val principal = kubernetesConf.get(org.apache.spark.internal.config.PRINCIPAL) + private val keytab = kubernetesConf.get(org.apache.spark.internal.config.KEYTAB) + private val existingSecretName = kubernetesConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME) + private val existingSecretItemKey = kubernetesConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY) + private val krb5File = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_FILE) + private val krb5CMap = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP) + private val hadoopConf = SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf) + private val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf, hadoopConf) private val isKerberosEnabled = - (hadoopConfDirSpec.hadoopConfDir.isDefined && kubeTokenManager.isSecurityEnabled) || - (hadoopConfDirSpec.hadoopConfigMapName.isDefined && - (krb5File.isDefined || krb5CMap.isDefined)) + (hadoopConfDir.isDefined && UserGroupInformation.isSecurityEnabled) || + (hadoopConfigMapName.isDefined && (krb5File.isDefined || krb5CMap.isDefined)) require(keytab.isEmpty || isKerberosEnabled, "You must enable Kerberos support if you are specifying a Kerberos Keytab") @@ -76,11 +79,11 @@ private[spark] class KerberosConfDriverFeatureStep( "If a secret storing a Kerberos Delegation Token is specified you must also" + " specify the item-key where the data is stored") - private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { hConfDir => + private val hadoopConfigurationFiles = hadoopConfDir.map { hConfDir => HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) } private val newHadoopConfigMapName = - if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) { + if (hadoopConfigMapName.isEmpty) { Some(kubernetesConf.hadoopConfigMapName) } else { None @@ -95,23 +98,24 @@ private[spark] class KerberosConfDriverFeatureStep( dtSecret = None, dtSecretName = secretName, dtSecretItemKey = secretItemKey, - jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) + jobUserName = UserGroupInformation.getCurrentUser.getShortUserName) }).orElse( if (isKerberosEnabled) { - Some(HadoopKerberosLogin.buildSpec( - conf, - kubernetesConf.appResourceNamePrefix, - kubeTokenManager)) + Some(buildKerberosSpec()) } else { None } ) override def configurePod(pod: SparkPod): SparkPod = { + if (!isKerberosEnabled) { + return pod + } + val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir( - hadoopConfDirSpec.hadoopConfDir, + hadoopConfDir, newHadoopConfigMapName, - hadoopConfDirSpec.hadoopConfigMapName, + hadoopConfigMapName, pod) kerberosConfSpec.map { hSpec => HadoopBootstrapUtil.bootstrapKerberosPod( @@ -124,11 +128,15 @@ private[spark] class KerberosConfDriverFeatureStep( hadoopBasedSparkPod) }.getOrElse( HadoopBootstrapUtil.bootstrapSparkUserPod( - kubeTokenManager.getCurrentUser.getShortUserName, + UserGroupInformation.getCurrentUser.getShortUserName, hadoopBasedSparkPod)) } override def getAdditionalPodSystemProperties(): Map[String, String] = { + if (!isKerberosEnabled) { + return Map.empty + } + val resolvedConfValues = kerberosConfSpec.map { hSpec => Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName, KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey, @@ -136,13 +144,16 @@ private[spark] class KerberosConfDriverFeatureStep( KRB5_CONFIG_MAP_NAME -> krb5CMap.getOrElse(kubernetesConf.krbConfigMapName)) }.getOrElse( Map(KERBEROS_SPARK_USER_NAME -> - kubeTokenManager.getCurrentUser.getShortUserName)) + UserGroupInformation.getCurrentUser.getShortUserName)) Map(HADOOP_CONFIG_MAP_NAME -> - hadoopConfDirSpec.hadoopConfigMapName.getOrElse( - kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues + hadoopConfigMapName.getOrElse(kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + if (!isKerberosEnabled) { + return Seq.empty + } + val hadoopConfConfigMap = for { hName <- newHadoopConfigMapName hFiles <- hadoopConfigurationFiles @@ -162,4 +173,34 @@ private[spark] class KerberosConfDriverFeatureStep( krb5ConfigMap.toSeq ++ kerberosDTSecret.toSeq } + + private def buildKerberosSpec(): KerberosConfigSpec = { + // The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal + // The login happens in the SparkSubmit so login logic is not necessary to include + val jobUserUGI = UserGroupInformation.getCurrentUser + val creds = jobUserUGI.getCredentials + tokenManager.obtainDelegationTokens(creds) + val tokenData = SparkHadoopUtil.get.serialize(creds) + require(tokenData.nonEmpty, "Did not obtain any delegation tokens") + val newSecretName = + s"${kubernetesConf.resourceNamePrefix}-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME" + val secretDT = + new SecretBuilder() + .withNewMetadata() + .withName(newSecretName) + .endMetadata() + .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(tokenData)) + .build() + KerberosConfigSpec( + dtSecret = Some(secretDT), + dtSecretName = newSecretName, + dtSecretItemKey = KERBEROS_SECRET_KEY, + jobUserName = jobUserUGI.getShortUserName) + } + + private case class KerberosConfigSpec( + dtSecret: Option[Secret], + dtSecretName: String, + dtSecretItemKey: String, + jobUserName: String) } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala index 06a88b6..32bb6a5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala @@ -16,38 +16,29 @@ */ package org.apache.spark.deploy.k8s.features -import io.fabric8.kubernetes.api.model.HasMetadata - -import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil import org.apache.spark.internal.Logging /** * This step is responsible for mounting the DT secret for the executors */ -private[spark] class KerberosConfExecutorFeatureStep( - kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) +private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep with Logging { - private val sparkConf = kubernetesConf.sparkConf - private val maybeKrb5CMap = sparkConf.getOption(KRB5_CONFIG_MAP_NAME) + private val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME) require(maybeKrb5CMap.isDefined, "HADOOP_CONF_DIR ConfigMap not found") override def configurePod(pod: SparkPod): SparkPod = { logInfo(s"Mounting Resources for Kerberos") HadoopBootstrapUtil.bootstrapKerberosPod( - sparkConf.get(KERBEROS_DT_SECRET_NAME), - sparkConf.get(KERBEROS_DT_SECRET_KEY), - sparkConf.get(KERBEROS_SPARK_USER_NAME), + conf.get(KERBEROS_DT_SECRET_NAME), + conf.get(KERBEROS_DT_SECRET_KEY), + conf.get(KERBEROS_SPARK_USER_NAME), None, None, maybeKrb5CMap, pod) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty[HasMetadata] } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index be386e1..19ed2df 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -16,16 +16,15 @@ */ package org.apache.spark.deploy.k8s.features -import java.nio.file.Paths import java.util.UUID import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ private[spark] class LocalDirsFeatureStep( - conf: KubernetesConf[_ <: KubernetesRoleSpecificConf], + conf: KubernetesConf, defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}") extends KubernetesFeatureConfigStep { @@ -73,8 +72,4 @@ private[spark] class LocalDirsFeatureStep( .build() SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala index 97fa949..f4e1a3a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala @@ -18,14 +18,13 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} -private[spark] class MountSecretsFeatureStep( - kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) +private[spark] class MountSecretsFeatureStep(kubernetesConf: KubernetesConf) extends KubernetesFeatureConfigStep { override def configurePod(pod: SparkPod): SparkPod = { val addedVolumes = kubernetesConf - .roleSecretNamesToMountPaths + .secretNamesToMountPaths .keys .map(secretName => new VolumeBuilder() @@ -40,7 +39,7 @@ private[spark] class MountSecretsFeatureStep( .endSpec() .build() val addedVolumeMounts = kubernetesConf - .roleSecretNamesToMountPaths + .secretNamesToMountPaths .map { case (secretName, mountPath) => new VolumeMountBuilder() @@ -54,9 +53,5 @@ private[spark] class MountSecretsFeatureStep( SparkPod(podWithVolumes, containerWithMounts) } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty - private def secretVolumeName(secretName: String): String = s"$secretName-volume" } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 1473a7d..8548e70 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -20,12 +20,11 @@ import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s._ -private[spark] class MountVolumesFeatureStep( - kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) +private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { override def configurePod(pod: SparkPod): SparkPod = { - val (volumeMounts, volumes) = constructVolumes(kubernetesConf.roleVolumes).unzip + val (volumeMounts, volumes) = constructVolumes(conf.volumes).unzip val podWithVolumes = new PodBuilder(pod.pod) .editSpec() @@ -40,12 +39,8 @@ private[spark] class MountVolumesFeatureStep( SparkPod(podWithVolumes, containerWithVolumeMounts) } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty - private def constructVolumes( - volumeSpecs: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]] + volumeSpecs: Iterable[KubernetesVolumeSpec] ): Iterable[(VolumeMount, Volume)] = { volumeSpecs.map { spec => val volumeMount = new VolumeMountBuilder() http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala index 28e2d17..09dcf93 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala @@ -22,12 +22,11 @@ import java.nio.charset.StandardCharsets import com.google.common.io.Files import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, HasMetadata, PodBuilder} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -private[spark] class PodTemplateConfigMapStep( - conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) +private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { def configurePod(pod: SparkPod): SparkPod = { val podWithVolume = new PodBuilder(pod.pod) http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala deleted file mode 100644 index 0022d8f..0000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.hadooputils - -import io.fabric8.kubernetes.api.model.SecretBuilder -import org.apache.commons.codec.binary.Base64 - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager - -/** - * This logic does all the heavy lifting for Delegation Token creation. This step - * assumes that the job user has either specified a principal and keytab or ran - * $kinit before running spark-submit. By running UGI.getCurrentUser we are able - * to obtain the current user, either signed in via $kinit or keytab. With the - * Job User principal you then retrieve the delegation token from the NameNode - * and store values in DelegationToken. Lastly, the class puts the data into - * a secret. All this is defined in a KerberosConfigSpec. - */ -private[spark] object HadoopKerberosLogin { - def buildSpec( - submissionSparkConf: SparkConf, - kubernetesResourceNamePrefix: String, - tokenManager: KubernetesHadoopDelegationTokenManager): KerberosConfigSpec = { - // The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal - // The login happens in the SparkSubmit so login logic is not necessary to include - val jobUserUGI = tokenManager.getCurrentUser - val originalCredentials = jobUserUGI.getCredentials - tokenManager.obtainDelegationTokens(originalCredentials) - - val tokenData = SparkHadoopUtil.get.serialize(originalCredentials) - - val initialTokenDataKeyName = KERBEROS_SECRET_KEY - val newSecretName = s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME" - val secretDT = - new SecretBuilder() - .withNewMetadata() - .withName(newSecretName) - .endMetadata() - .addToData(initialTokenDataKeyName, Base64.encodeBase64String(tokenData)) - .build() - KerberosConfigSpec( - dtSecret = Some(secretDT), - dtSecretName = newSecretName, - dtSecretItemKey = initialTokenDataKeyName, - jobUserName = jobUserUGI.getShortUserName) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala deleted file mode 100644 index 3e98d58..0000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.k8s.security - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.UserGroupInformation - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.security.HadoopDelegationTokenManager - -/** - * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager. - */ -private[spark] class KubernetesHadoopDelegationTokenManager( - _sparkConf: SparkConf, - _hadoopConf: Configuration) - extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) { - - def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser - def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled - -} http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 543d6b1..70a93c9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,19 +17,17 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter -import java.util.{Collections, Locale, Properties, UUID} import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.KubernetesClient -import org.apache.hadoop.security.UserGroupInformation import scala.collection.mutable import scala.util.control.NonFatal import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkApplication -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkKubernetesClientFactory} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging @@ -47,8 +45,7 @@ private[spark] case class ClientArguments( mainAppResource: MainAppResource, mainClass: String, driverArgs: Array[String], - maybePyFiles: Option[String], - hadoopConfigDir: Option[String]) + maybePyFiles: Option[String]) private[spark] object ClientArguments { @@ -82,8 +79,7 @@ private[spark] object ClientArguments { mainAppResource, mainClass.get, driverArgs.toArray, - maybePyFiles, - sys.env.get(ENV_HADOOP_CONF_DIR)) + maybePyFiles) } } @@ -92,27 +88,24 @@ private[spark] object ClientArguments { * watcher that monitors and logs the application status. Waits for the application to terminate if * spark.kubernetes.submission.waitAppCompletion is true. * + * @param conf The kubernetes driver config. * @param builder Responsible for building the base driver pod based on a composition of * implemented features. - * @param kubernetesConf application configuration * @param kubernetesClient the client to talk to the Kubernetes API server * @param waitForAppCompletion a flag indicating whether the client should wait for the application * to complete - * @param appName the application name * @param watcher a watcher that monitors and logs the application status */ private[spark] class Client( + conf: KubernetesDriverConf, builder: KubernetesDriverBuilder, - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf], kubernetesClient: KubernetesClient, waitForAppCompletion: Boolean, - appName: String, - watcher: LoggingPodStatusWatcher, - kubernetesResourceNamePrefix: String) extends Logging { + watcher: LoggingPodStatusWatcher) extends Logging { def run(): Unit = { - val resolvedDriverSpec = builder.buildFromFeatures(kubernetesConf) - val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map" + val resolvedDriverSpec = builder.buildFromFeatures(conf) + val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map" val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties) // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the // Spark command builder to pickup on the Java Options present in the ConfigMap @@ -155,11 +148,11 @@ private[spark] class Client( } if (waitForAppCompletion) { - logInfo(s"Waiting for application $appName to finish...") + logInfo(s"Waiting for application ${conf.appName} to finish...") watcher.awaitCompletion() - logInfo(s"Application $appName finished.") + logInfo(s"Application ${conf.appName} finished.") } else { - logInfo(s"Deployed Spark application $appName into Kubernetes.") + logInfo(s"Deployed Spark application ${conf.appName} into Kubernetes.") } } } @@ -216,19 +209,13 @@ private[spark] class KubernetesClientApplication extends SparkApplication { // a unique app ID (captured by spark.app.id) in the format below. val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) - val kubernetesResourceNamePrefix = KubernetesClientApplication.getResourceNamePrefix(appName) - sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse("")) val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, - appName, - kubernetesResourceNamePrefix, kubernetesAppId, clientArguments.mainAppResource, clientArguments.mainClass, clientArguments.driverArgs, - clientArguments.maybePyFiles, - clientArguments.hadoopConfigDir) - val namespace = kubernetesConf.namespace() + clientArguments.maybePyFiles) // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) @@ -238,36 +225,18 @@ private[spark] class KubernetesClientApplication extends SparkApplication { Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, - Some(namespace), + Some(kubernetesConf.namespace), KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, sparkConf, None, None)) { kubernetesClient => val client = new Client( - KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf), kubernetesConf, + KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf), kubernetesClient, waitForAppCompletion, - appName, - watcher, - kubernetesResourceNamePrefix) + watcher) client.run() } } } - -private[spark] object KubernetesClientApplication { - - def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark") - - def getResourceNamePrefix(appName: String): String = { - val launchTime = System.currentTimeMillis() - s"$appName-$launchTime" - .trim - .toLowerCase(Locale.ROOT) - .replaceAll("\\s+", "-") - .replaceAll("\\.", "-") - .replaceAll("[^a-z0-9\\-]", "") - .replaceAll("-+", "-") - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org