[SPARK-25876][K8S] Simplify kubernetes configuration types.

There are a few issues with the current configuration types used in
the kubernetes backend:

- they use type parameters for role-specific specialization, which makes
  type signatures really noisy throughout the code base.

- they break encapsulation by forcing the code that creates the config
  object to remove the configuration from SparkConf before creating the
  k8s-specific wrapper.

- they don't provide an easy way for tests to have default values for
  fields they do not use.

This change fixes those problems by:

- creating a base config type with role-specific specialization using
  inheritance

- encapsulating the logic of parsing SparkConf into k8s-specific views
  inside the k8s config classes

- providing some helper code for tests to easily override just the part
  of the configs they want.

Most of the change relates to the above, especially cleaning up the
tests. While doing that, I also made some smaller changes elsewhere:

- removed unnecessary type parameters in KubernetesVolumeSpec

- simplified the error detection logic in KubernetesVolumeUtils; all
  the call sites would just throw the first exception collected by
  that class, since they all called "get" on the "Try" object. Now
  the unnecessary wrapping is gone and the exception is just thrown
  where it occurs.

- removed a lot of unnecessary mocking from tests.

- changed the kerberos-related code so that less logic needs to live
  in the driver builder. In spirit it should be part of the upcoming
  work in this series of cleanups, but it made parts of this change
  simpler.

Tested with existing unit tests and integration tests.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #22959 from vanzin/SPARK-25876.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6be272b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6be272b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6be272b7

Branch: refs/heads/master
Commit: 6be272b75b4ae3149869e19df193675cc4117763
Parents: 8856e9f
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Fri Nov 30 16:23:37 2018 -0800
Committer: mcheah <mch...@palantir.com>
Committed: Fri Nov 30 16:23:37 2018 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/k8s/Config.scala    |  17 +-
 .../spark/deploy/k8s/KubernetesConf.scala       | 302 +++++++++----------
 .../spark/deploy/k8s/KubernetesVolumeSpec.scala |  10 +-
 .../deploy/k8s/KubernetesVolumeUtils.scala      |  53 +---
 .../k8s/features/BasicDriverFeatureStep.scala   |  24 +-
 .../k8s/features/BasicExecutorFeatureStep.scala |  29 +-
 .../k8s/features/DriverCommandFeatureStep.scala |  22 +-
 ...DriverKubernetesCredentialsFeatureStep.scala |   6 +-
 .../k8s/features/DriverServiceFeatureStep.scala |  10 +-
 .../k8s/features/EnvSecretsFeatureStep.scala    |  11 +-
 .../HadoopConfExecutorFeatureStep.scala         |  14 +-
 .../HadoopSparkUserExecutorFeatureStep.scala    |  17 +-
 .../KerberosConfDriverFeatureStep.scala         | 113 ++++---
 .../KerberosConfExecutorFeatureStep.scala       |  21 +-
 .../k8s/features/LocalDirsFeatureStep.scala     |   9 +-
 .../k8s/features/MountSecretsFeatureStep.scala  |  13 +-
 .../k8s/features/MountVolumesFeatureStep.scala  |  11 +-
 .../k8s/features/PodTemplateConfigMapStep.scala |   5 +-
 .../hadooputils/HadoopKerberosLogin.scala       |  64 ----
 ...KubernetesHadoopDelegationTokenManager.scala |  37 ---
 .../submit/KubernetesClientApplication.scala    |  61 +---
 .../k8s/submit/KubernetesDriverBuilder.scala    |  53 ++--
 .../cluster/k8s/KubernetesExecutorBuilder.scala |  36 +--
 .../spark/deploy/k8s/KubernetesConfSuite.scala  |  71 ++---
 .../spark/deploy/k8s/KubernetesTestConf.scala   | 138 +++++++++
 .../deploy/k8s/KubernetesVolumeUtilsSuite.scala |  30 +-
 .../features/BasicDriverFeatureStepSuite.scala  | 127 ++------
 .../BasicExecutorFeatureStepSuite.scala         | 103 ++-----
 .../DriverCommandFeatureStepSuite.scala         |  29 +-
 ...rKubernetesCredentialsFeatureStepSuite.scala |  69 +----
 .../DriverServiceFeatureStepSuite.scala         | 193 ++++--------
 .../features/EnvSecretsFeatureStepSuite.scala   |  32 +-
 .../features/LocalDirsFeatureStepSuite.scala    |  46 +--
 .../features/MountSecretsFeatureStepSuite.scala |  21 +-
 .../features/MountVolumesFeatureStepSuite.scala |  56 ++--
 .../PodTemplateConfigMapStepSuite.scala         |  28 +-
 .../spark/deploy/k8s/submit/ClientSuite.scala   |  47 +--
 .../submit/KubernetesDriverBuilderSuite.scala   | 204 ++-----------
 .../k8s/ExecutorPodsAllocatorSuite.scala        |  43 +--
 .../k8s/KubernetesExecutorBuilderSuite.scala    | 114 ++-----
 40 files changed, 777 insertions(+), 1512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 724acd2..1abf290 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -60,7 +60,8 @@ private[spark] object Config extends Logging {
       .doc("Comma separated list of the Kubernetes secrets used " +
         "to access private image registries.")
       .stringConf
-      .createOptional
+      .toSequence
+      .createWithDefault(Nil)
 
   val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
       "spark.kubernetes.authenticate.driver"
@@ -112,16 +113,16 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
-  val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
-    ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
-      .doc("Prefix to use in front of the executor pod names.")
+  // For testing only.
+  val KUBERNETES_DRIVER_POD_NAME_PREFIX =
+    ConfigBuilder("spark.kubernetes.driver.resourceNamePrefix")
       .internal()
       .stringConf
-      .createWithDefault("spark")
+      .createOptional
 
-  val KUBERNETES_PYSPARK_PY_FILES =
-    ConfigBuilder("spark.kubernetes.python.pyFiles")
-      .doc("The PyFiles that are distributed via client arguments")
+  val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
+    ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
+      .doc("Prefix to use in front of the executor pod names.")
       .internal()
       .stringConf
       .createOptional

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index ebb8154..a06c21b 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -16,93 +16,53 @@
  */
 package org.apache.spark.deploy.k8s
 
-import scala.collection.mutable
+import java.util.Locale
 
 import io.fabric8.kubernetes.api.model.{LocalObjectReference, 
LocalObjectReferenceBuilder, Pod}
-import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import 
org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
 import org.apache.spark.deploy.k8s.submit._
-import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.util.Utils
 
-
-private[spark] sealed trait KubernetesRoleSpecificConf
-
-/*
- * Structure containing metadata for Kubernetes logic that builds a Spark 
driver.
- */
-private[spark] case class KubernetesDriverSpecificConf(
-    mainAppResource: MainAppResource,
-    mainClass: String,
-    appName: String,
-    appArgs: Seq[String],
-    pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf {
-
-  require(mainAppResource != null, "Main resource must be provided.")
-
-}
-
-/*
- * Structure containing metadata for Kubernetes logic that builds a Spark 
executor.
- */
-private[spark] case class KubernetesExecutorSpecificConf(
-    executorId: String,
-    driverPod: Option[Pod])
-  extends KubernetesRoleSpecificConf
-
-/*
- * Structure containing metadata for HADOOP_CONF_DIR customization
- */
-private[spark] case class HadoopConfSpec(
-    hadoopConfDir: Option[String],
-    hadoopConfigMapName: Option[String])
-
 /**
  * Structure containing metadata for Kubernetes logic to build Spark pods.
  */
-private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
-    sparkConf: SparkConf,
-    roleSpecificConf: T,
-    appResourceNamePrefix: String,
-    appId: String,
-    roleLabels: Map[String, String],
-    roleAnnotations: Map[String, String],
-    roleSecretNamesToMountPaths: Map[String, String],
-    roleSecretEnvNamesToKeyRefs: Map[String, String],
-    roleEnvs: Map[String, String],
-    roleVolumes: Iterable[KubernetesVolumeSpec[_ <: 
KubernetesVolumeSpecificConf]],
-    hadoopConfSpec: Option[HadoopConfSpec]) {
+private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
 
-  def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config"
+  val resourceNamePrefix: String
+  def labels: Map[String, String]
+  def environment: Map[String, String]
+  def annotations: Map[String, String]
+  def secretEnvNamesToKeyRefs: Map[String, String]
+  def secretNamesToMountPaths: Map[String, String]
+  def volumes: Seq[KubernetesVolumeSpec]
 
-  def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"
+  def appName: String = get("spark.app.name", "spark")
 
-  def tokenManager(conf: SparkConf, hConf: Configuration): 
KubernetesHadoopDelegationTokenManager =
-    new KubernetesHadoopDelegationTokenManager(conf, hConf)
+  def hadoopConfigMapName: String = s"$resourceNamePrefix-hadoop-config"
 
-  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+  def krbConfigMapName: String = s"$resourceNamePrefix-krb5-file"
 
-  def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+  def namespace: String = get(KUBERNETES_NAMESPACE)
 
-  def imagePullSecrets(): Seq[LocalObjectReference] = {
+  def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def imagePullSecrets: Seq[LocalObjectReference] = {
     sparkConf
       .get(IMAGE_PULL_SECRETS)
-      .map(_.split(","))
-      .getOrElse(Array.empty[String])
-      .map(_.trim)
       .map { secret =>
         new LocalObjectReferenceBuilder().withName(secret).build()
       }
   }
 
-  def nodeSelector(): Map[String, String] =
+  def nodeSelector: Map[String, String] =
     KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
 
+  def contains(config: ConfigEntry[_]): Boolean = sparkConf.contains(config)
+
   def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
 
   def get(conf: String): String = sparkConf.get(conf)
@@ -112,125 +72,139 @@ private[spark] case class KubernetesConf[T <: 
KubernetesRoleSpecificConf](
   def getOption(key: String): Option[String] = sparkConf.getOption(key)
 }
 
+private[spark] class KubernetesDriverConf(
+    sparkConf: SparkConf,
+    val appId: String,
+    val mainAppResource: MainAppResource,
+    val mainClass: String,
+    val appArgs: Array[String],
+    val pyFiles: Seq[String])
+  extends KubernetesConf(sparkConf) {
+
+  override val resourceNamePrefix: String = {
+    val custom = if (Utils.isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX) 
else None
+    custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
+  }
+
+  override def labels: Map[String, String] = {
+    val presetLabels = Map(
+      SPARK_APP_ID_LABEL -> appId,
+      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
+
+    presetLabels.keys.foreach { key =>
+      require(
+        !driverCustomLabels.contains(key),
+        s"Label with key $key is not allowed as it is reserved for Spark 
bookkeeping operations.")
+    }
+
+    driverCustomLabels ++ presetLabels
+  }
+
+  override def environment: Map[String, String] = {
+    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_ENV_PREFIX)
+  }
+
+  override def annotations: Map[String, String] = {
+    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+  }
+
+  override def secretNamesToMountPaths: Map[String, String] = {
+    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_SECRETS_PREFIX)
+  }
+
+  override def secretEnvNamesToKeyRefs: Map[String, String] = {
+    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
+  }
+
+  override def volumes: Seq[KubernetesVolumeSpec] = {
+    KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
KUBERNETES_DRIVER_VOLUMES_PREFIX)
+  }
+}
+
+private[spark] class KubernetesExecutorConf(
+    sparkConf: SparkConf,
+    val appId: String,
+    val executorId: String,
+    val driverPod: Option[Pod])
+  extends KubernetesConf(sparkConf) {
+
+  override val resourceNamePrefix: String = {
+    get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX).getOrElse(
+      KubernetesConf.getResourceNamePrefix(appName))
+  }
+
+  override def labels: Map[String, String] = {
+    val presetLabels = Map(
+      SPARK_EXECUTOR_ID_LABEL -> executorId,
+      SPARK_APP_ID_LABEL -> appId,
+      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
+
+    val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
+
+    presetLabels.keys.foreach { key =>
+      require(
+        !executorCustomLabels.contains(key),
+        s"Custom executor labels cannot contain $key as it is reserved for 
Spark.")
+    }
+
+    executorCustomLabels ++ presetLabels
+  }
+
+  override def environment: Map[String, String] = 
sparkConf.getExecutorEnv.toMap
+
+  override def annotations: Map[String, String] = {
+    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+  }
+
+  override def secretNamesToMountPaths: Map[String, String] = {
+    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_EXECUTOR_SECRETS_PREFIX)
+  }
+
+  override def secretEnvNamesToKeyRefs: Map[String, String] = {
+    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
+  }
+
+  override def volumes: Seq[KubernetesVolumeSpec] = {
+    KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
+  }
+
+}
+
 private[spark] object KubernetesConf {
   def createDriverConf(
       sparkConf: SparkConf,
-      appName: String,
-      appResourceNamePrefix: String,
       appId: String,
       mainAppResource: MainAppResource,
       mainClass: String,
       appArgs: Array[String],
-      maybePyFiles: Option[String],
-      hadoopConfDir: Option[String]): 
KubernetesConf[KubernetesDriverSpecificConf] = {
-    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
-    require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key 
" +
-      s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
-      "operations.")
-    require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
-      s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
-      "operations.")
-    val driverLabels = driverCustomLabels ++ Map(
-      SPARK_APP_ID_LABEL -> appId,
-      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
-    val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
-    val driverSecretNamesToMountPaths = 
KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
-    val driverSecretEnvNamesToKeyRefs = 
KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
-    val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
-    val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
-      sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get)
-    // Also parse executor volumes in order to verify configuration
-    // before the driver pod is created
-    KubernetesVolumeUtils.parseVolumesWithPrefix(
-      sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
-
-    val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
-    KubernetesUtils.requireNandDefined(
-      hadoopConfDir,
-      hadoopConfigMapName,
-      "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap 
" +
-      "as the creation of an additional ConfigMap, when one is already 
specified is extraneous" )
-    val hadoopConfSpec =
-      if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) {
-        Some(HadoopConfSpec(hadoopConfDir, hadoopConfigMapName))
-      } else {
-        None
-      }
-    val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
+      maybePyFiles: Option[String]): KubernetesDriverConf = {
+    // Parse executor volumes in order to verify configuration before the 
driver pod is created.
+    KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
 
-
-    KubernetesConf(
-      sparkConf.clone(),
-      KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, 
appArgs, pyFiles),
-      appResourceNamePrefix,
-      appId,
-      driverLabels,
-      driverAnnotations,
-      driverSecretNamesToMountPaths,
-      driverSecretEnvNamesToKeyRefs,
-      driverEnvs,
-      driverVolumes,
-      hadoopConfSpec)
+    val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
+    new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, 
mainClass, appArgs,
+      pyFiles)
   }
 
   def createExecutorConf(
       sparkConf: SparkConf,
       executorId: String,
       appId: String,
-      driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] 
= {
-    val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
-    require(
-      !executorCustomLabels.contains(SPARK_APP_ID_LABEL),
-      s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
-    require(
-      !executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
-      s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
-        " Spark.")
-    require(
-      !executorCustomLabels.contains(SPARK_ROLE_LABEL),
-      s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
-    val executorLabels = Map(
-      SPARK_EXECUTOR_ID_LABEL -> executorId,
-      SPARK_APP_ID_LABEL -> appId,
-      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
-      executorCustomLabels
-    val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
-    val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
-    val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
-    val executorEnv = sparkConf.getExecutorEnv.toMap
-    val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
-      sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
-
-    // If no prefix is defined then we are in pure client mode
-    // (not the one used by cluster mode inside the container)
-    val appResourceNamePrefix = {
-      if 
(sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) {
-        getResourceNamePrefix(getAppName(sparkConf))
-      } else {
-        sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
-      }
-    }
+      driverPod: Option[Pod]): KubernetesExecutorConf = {
+    new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod)
+  }
 
-    KubernetesConf(
-      sparkConf.clone(),
-      KubernetesExecutorSpecificConf(executorId, driverPod),
-      appResourceNamePrefix,
-      appId,
-      executorLabels,
-      executorAnnotations,
-      executorMountSecrets,
-      executorEnvSecrets,
-      executorEnv,
-      executorVolumes,
-      None)
+  def getResourceNamePrefix(appName: String): String = {
+    val launchTime = System.currentTimeMillis()
+    s"$appName-$launchTime"
+      .trim
+      .toLowerCase(Locale.ROOT)
+      .replaceAll("\\s+", "-")
+      .replaceAll("\\.", "-")
+      .replaceAll("[^a-z0-9\\-]", "")
+      .replaceAll("-+", "-")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
index 1a214fa..0ebe8fd 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
@@ -18,12 +18,10 @@ package org.apache.spark.deploy.k8s
 
 private[spark] sealed trait KubernetesVolumeSpecificConf
 
-private[spark] case class KubernetesHostPathVolumeConf(
-    hostPath: String)
+private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
   extends KubernetesVolumeSpecificConf
 
-private[spark] case class KubernetesPVCVolumeConf(
-    claimName: String)
+private[spark] case class KubernetesPVCVolumeConf(claimName: String)
   extends KubernetesVolumeSpecificConf
 
 private[spark] case class KubernetesEmptyDirVolumeConf(
@@ -31,9 +29,9 @@ private[spark] case class KubernetesEmptyDirVolumeConf(
     sizeLimit: Option[String])
   extends KubernetesVolumeSpecificConf
 
-private[spark] case class KubernetesVolumeSpec[T <: 
KubernetesVolumeSpecificConf](
+private[spark] case class KubernetesVolumeSpec(
     volumeName: String,
     mountPath: String,
     mountSubPath: String,
     mountReadOnly: Boolean,
-    volumeConf: T)
+    volumeConf: KubernetesVolumeSpecificConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
index 1553264..c0c4f86 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
@@ -16,10 +16,6 @@
  */
 package org.apache.spark.deploy.k8s
 
-import java.util.NoSuchElementException
-
-import scala.util.{Failure, Success, Try}
-
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 
@@ -31,9 +27,7 @@ private[spark] object KubernetesVolumeUtils {
    * @param prefix the given property name prefix
    * @return a Map storing with volume name as key and spec as value
    */
-  def parseVolumesWithPrefix(
-    sparkConf: SparkConf,
-    prefix: String): Iterable[Try[KubernetesVolumeSpec[_ <: 
KubernetesVolumeSpecificConf]]] = {
+  def parseVolumesWithPrefix(sparkConf: SparkConf, prefix: String): 
Seq[KubernetesVolumeSpec] = {
     val properties = sparkConf.getAllWithPrefix(prefix).toMap
 
     getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
@@ -41,17 +35,13 @@ private[spark] object KubernetesVolumeUtils {
       val readOnlyKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
       val subPathKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
 
-      for {
-        path <- properties.getTry(pathKey)
-        volumeConf <- parseVolumeSpecificConf(properties, volumeType, 
volumeName)
-      } yield KubernetesVolumeSpec(
+      KubernetesVolumeSpec(
         volumeName = volumeName,
-        mountPath = path,
+        mountPath = properties(pathKey),
         mountSubPath = properties.get(subPathKey).getOrElse(""),
         mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
-        volumeConf = volumeConf
-      )
-    }
+        volumeConf = parseVolumeSpecificConf(properties, volumeType, 
volumeName))
+    }.toSeq
   }
 
   /**
@@ -61,9 +51,7 @@ private[spark] object KubernetesVolumeUtils {
    * @param properties flat mapping of property names to values
    * @return Set[(volumeType, volumeName)]
    */
-  private def getVolumeTypesAndNames(
-    properties: Map[String, String]
-  ): Set[(String, String)] = {
+  private def getVolumeTypesAndNames(properties: Map[String, String]): 
Set[(String, String)] = {
     properties.keys.flatMap { k =>
       k.split('.').toList match {
         case tpe :: name :: _ => Some((tpe, name))
@@ -73,40 +61,25 @@ private[spark] object KubernetesVolumeUtils {
   }
 
   private def parseVolumeSpecificConf(
-    options: Map[String, String],
-    volumeType: String,
-    volumeName: String): Try[KubernetesVolumeSpecificConf] = {
+      options: Map[String, String],
+      volumeType: String,
+      volumeName: String): KubernetesVolumeSpecificConf = {
     volumeType match {
       case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
         val pathKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
-        for {
-          path <- options.getTry(pathKey)
-        } yield KubernetesHostPathVolumeConf(path)
+        KubernetesHostPathVolumeConf(options(pathKey))
 
       case KUBERNETES_VOLUMES_PVC_TYPE =>
         val claimNameKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
-        for {
-          claimName <- options.getTry(claimNameKey)
-        } yield KubernetesPVCVolumeConf(claimName)
+        KubernetesPVCVolumeConf(options(claimNameKey))
 
       case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
         val mediumKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
         val sizeLimitKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
-        Success(KubernetesEmptyDirVolumeConf(options.get(mediumKey), 
options.get(sizeLimitKey)))
+        KubernetesEmptyDirVolumeConf(options.get(mediumKey), 
options.get(sizeLimitKey))
 
       case _ =>
-        Failure(new RuntimeException(s"Kubernetes Volume type `$volumeType` is 
not supported"))
-    }
-  }
-
-  /**
-   * Convenience wrapper to accumulate key lookup errors
-   */
-  implicit private class MapOps[A, B](m: Map[A, B]) {
-    def getTry(key: A): Try[B] = {
-      m
-        .get(key)
-        .fold[Try[B]](Failure(new 
NoSuchElementException(key.toString)))(Success(_))
+        throw new IllegalArgumentException(s"Kubernetes Volume type 
`$volumeType` is not supported")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 5ddf73c..d8cf365 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -30,13 +30,12 @@ import org.apache.spark.internal.config._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.Utils
 
-private[spark] class BasicDriverFeatureStep(
-    conf: KubernetesConf[KubernetesDriverSpecificConf])
+private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
   extends KubernetesFeatureConfigStep {
 
   private val driverPodName = conf
     .get(KUBERNETES_DRIVER_POD_NAME)
-    .getOrElse(s"${conf.appResourceNamePrefix}-driver")
+    .getOrElse(s"${conf.resourceNamePrefix}-driver")
 
   private val driverContainerImage = conf
     .get(DRIVER_CONTAINER_IMAGE)
@@ -52,8 +51,8 @@ private[spark] class BasicDriverFeatureStep(
   // The memory overhead factor to use. If the user has not set it, then use a 
different
   // value for non-JVM apps. This value is propagated to executors.
   private val overheadFactor =
-    if (conf.roleSpecificConf.mainAppResource.isInstanceOf[NonJVMResource]) {
-      if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) {
+    if (conf.mainAppResource.isInstanceOf[NonJVMResource]) {
+      if (conf.contains(MEMORY_OVERHEAD_FACTOR)) {
         conf.get(MEMORY_OVERHEAD_FACTOR)
       } else {
         NON_JVM_MEMORY_OVERHEAD_FACTOR
@@ -68,8 +67,7 @@ private[spark] class BasicDriverFeatureStep(
   private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
 
   override def configurePod(pod: SparkPod): SparkPod = {
-    val driverCustomEnvs = conf.roleEnvs
-      .toSeq
+    val driverCustomEnvs = conf.environment.toSeq
       .map { env =>
         new EnvVarBuilder()
           .withName(env._1)
@@ -96,7 +94,7 @@ private[spark] class BasicDriverFeatureStep(
     val driverContainer = new ContainerBuilder(pod.container)
       
.withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
       .withImage(driverContainerImage)
-      .withImagePullPolicy(conf.imagePullPolicy())
+      .withImagePullPolicy(conf.imagePullPolicy)
       .addNewPort()
         .withName(DRIVER_PORT_NAME)
         .withContainerPort(driverPort)
@@ -130,13 +128,13 @@ private[spark] class BasicDriverFeatureStep(
     val driverPod = new PodBuilder(pod.pod)
       .editOrNewMetadata()
         .withName(driverPodName)
-        .addToLabels(conf.roleLabels.asJava)
-        .addToAnnotations(conf.roleAnnotations.asJava)
+        .addToLabels(conf.labels.asJava)
+        .addToAnnotations(conf.annotations.asJava)
         .endMetadata()
       .editOrNewSpec()
         .withRestartPolicy("Never")
-        .addToNodeSelector(conf.nodeSelector().asJava)
-        .addToImagePullSecrets(conf.imagePullSecrets(): _*)
+        .addToNodeSelector(conf.nodeSelector.asJava)
+        .addToImagePullSecrets(conf.imagePullSecrets: _*)
         .endSpec()
       .build()
 
@@ -147,7 +145,7 @@ private[spark] class BasicDriverFeatureStep(
     val additionalProps = mutable.Map(
       KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
       "spark.app.id" -> conf.appId,
-      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix,
       KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
       MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 7f397e6..8bf3152 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -29,8 +29,7 @@ import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
-private[spark] class BasicExecutorFeatureStep(
-    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+private[spark] class BasicExecutorFeatureStep(kubernetesConf: 
KubernetesExecutorConf)
   extends KubernetesFeatureConfigStep {
 
   // Consider moving some of these fields to KubernetesConf or 
KubernetesExecutorSpecificConf
@@ -42,7 +41,7 @@ private[spark] class BasicExecutorFeatureStep(
     .sparkConf
     .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
 
-  private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
+  private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix
 
   private val driverUrl = RpcEndpointAddress(
     kubernetesConf.get("spark.driver.host"),
@@ -76,7 +75,7 @@ private[spark] class BasicExecutorFeatureStep(
   private val executorLimitCores = 
kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
 
   override def configurePod(pod: SparkPod): SparkPod = {
-    val name = 
s"$executorPodNamePrefix-exec-${kubernetesConf.roleSpecificConf.executorId}"
+    val name = s"$executorPodNamePrefix-exec-${kubernetesConf.executorId}"
 
     // hostname must be no longer than 63 characters, so take the last 63 
characters of the pod
     // name as the hostname.  This preserves uniqueness since the end of name 
contains
@@ -98,7 +97,7 @@ private[spark] class BasicExecutorFeatureStep(
       .get(EXECUTOR_JAVA_OPTIONS)
       .map { opts =>
         val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
-          kubernetesConf.roleSpecificConf.executorId)
+          kubernetesConf.executorId)
         val delimitedOpts = Utils.splitCommandString(subsOpts)
         delimitedOpts.zipWithIndex.map {
           case (opt, index) =>
@@ -112,8 +111,8 @@ private[spark] class BasicExecutorFeatureStep(
       (ENV_APPLICATION_ID, kubernetesConf.appId),
       // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
       (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
-      (ENV_EXECUTOR_ID, kubernetesConf.roleSpecificConf.executorId)) ++
-      kubernetesConf.roleEnvs)
+      (ENV_EXECUTOR_ID, kubernetesConf.executorId)) ++
+      kubernetesConf.environment)
       .map(env => new EnvVarBuilder()
         .withName(env._1)
         .withValue(env._2)
@@ -138,7 +137,7 @@ private[spark] class BasicExecutorFeatureStep(
     val executorContainer = new ContainerBuilder(pod.container)
       
.withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
       .withImage(executorContainerImage)
-      .withImagePullPolicy(kubernetesConf.imagePullPolicy())
+      .withImagePullPolicy(kubernetesConf.imagePullPolicy)
       .editOrNewResources()
         .addToRequests("memory", executorMemoryQuantity)
         .addToLimits("memory", executorMemoryQuantity)
@@ -158,27 +157,27 @@ private[spark] class BasicExecutorFeatureStep(
           .endResources()
         .build()
     }.getOrElse(executorContainer)
-    val driverPod = kubernetesConf.roleSpecificConf.driverPod
-    val ownerReference = driverPod.map(pod =>
+    val ownerReference = kubernetesConf.driverPod.map { pod =>
       new OwnerReferenceBuilder()
         .withController(true)
         .withApiVersion(pod.getApiVersion)
         .withKind(pod.getKind)
         .withName(pod.getMetadata.getName)
         .withUid(pod.getMetadata.getUid)
-        .build())
+        .build()
+    }
     val executorPod = new PodBuilder(pod.pod)
       .editOrNewMetadata()
         .withName(name)
-        .addToLabels(kubernetesConf.roleLabels.asJava)
-        .addToAnnotations(kubernetesConf.roleAnnotations.asJava)
+        .addToLabels(kubernetesConf.labels.asJava)
+        .addToAnnotations(kubernetesConf.annotations.asJava)
         .addToOwnerReferences(ownerReference.toSeq: _*)
         .endMetadata()
       .editOrNewSpec()
         .withHostname(hostname)
         .withRestartPolicy("Never")
-        .addToNodeSelector(kubernetesConf.nodeSelector().asJava)
-        .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
+        .addToNodeSelector(kubernetesConf.nodeSelector.asJava)
+        .addToImagePullSecrets(kubernetesConf.imagePullSecrets: _*)
         .endSpec()
       .build()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
index 8b8f0d0..76b4ec9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
@@ -32,13 +32,11 @@ import org.apache.spark.util.Utils
  * Creates the driver command for running the user app, and propagates needed 
configuration so
  * executors can also find the app code.
  */
-private[spark] class DriverCommandFeatureStep(conf: 
KubernetesConf[KubernetesDriverSpecificConf])
+private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
   extends KubernetesFeatureConfigStep {
 
-  private val driverConf = conf.roleSpecificConf
-
   override def configurePod(pod: SparkPod): SparkPod = {
-    driverConf.mainAppResource match {
+    conf.mainAppResource match {
       case JavaMainAppResource(_) =>
         configureForJava(pod)
 
@@ -51,7 +49,7 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesConf[KubernetesDri
   }
 
   override def getAdditionalPodSystemProperties(): Map[String, String] = {
-    driverConf.mainAppResource match {
+    conf.mainAppResource match {
       case JavaMainAppResource(res) =>
         res.map(additionalJavaProperties).getOrElse(Map.empty)
 
@@ -71,10 +69,10 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesConf[KubernetesDri
   }
 
   private def configureForPython(pod: SparkPod, res: String): SparkPod = {
-    val maybePythonFiles = if (driverConf.pyFiles.nonEmpty) {
+    val maybePythonFiles = if (conf.pyFiles.nonEmpty) {
       // Delineation by ":" is to append the PySpark Files to the PYTHONPATH
       // of the respective PySpark pod
-      val resolved = KubernetesUtils.resolveFileUrisAndPath(driverConf.pyFiles)
+      val resolved = KubernetesUtils.resolveFileUrisAndPath(conf.pyFiles)
       Some(new EnvVarBuilder()
         .withName(ENV_PYSPARK_FILES)
         .withValue(resolved.mkString(":"))
@@ -85,7 +83,7 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesConf[KubernetesDri
     val pythonEnvs =
       Seq(new EnvVarBuilder()
           .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
-          .withValue(conf.sparkConf.get(PYSPARK_MAJOR_PYTHON_VERSION))
+          .withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
         .build()) ++
       maybePythonFiles
 
@@ -105,9 +103,9 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesConf[KubernetesDri
     new ContainerBuilder(pod.container)
       .addToArgs("driver")
       .addToArgs("--properties-file", SPARK_CONF_PATH)
-      .addToArgs("--class", driverConf.mainClass)
+      .addToArgs("--class", conf.mainClass)
       .addToArgs(resource)
-      .addToArgs(driverConf.appArgs: _*)
+      .addToArgs(conf.appArgs: _*)
   }
 
   private def additionalJavaProperties(resource: String): Map[String, String] 
= {
@@ -116,7 +114,7 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesConf[KubernetesDri
 
   private def additionalPythonProperties(resource: String): Map[String, 
String] = {
     resourceType(APP_RESOURCE_TYPE_PYTHON) ++
-      mergeFileList("spark.files", Seq(resource) ++ driverConf.pyFiles)
+      mergeFileList("spark.files", Seq(resource) ++ conf.pyFiles)
   }
 
   private def additionalRProperties(resource: String): Map[String, String] = {
@@ -124,7 +122,7 @@ private[spark] class DriverCommandFeatureStep(conf: 
KubernetesConf[KubernetesDri
   }
 
   private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, 
String] = {
-    val existing = Utils.stringToSeq(conf.sparkConf.get(key, ""))
+    val existing = Utils.stringToSeq(conf.get(key, ""))
     Map(key -> (existing ++ filesToAdd).distinct.mkString(","))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
index ff5ad66..795ca49 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 
-private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: 
KubernetesConf[_])
+private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: 
KubernetesConf)
   extends KubernetesFeatureConfigStep {
   // TODO clean up this class, and credentials in general. See also 
SparkKubernetesClientFactory.
   // We should use a struct to hold all creds-related fields. A lot of the 
code is very repetitive.
@@ -66,7 +66,7 @@ private[spark] class 
DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube
     clientCertDataBase64.isDefined
 
   private val driverCredentialsSecretName =
-    s"${kubernetesConf.appResourceNamePrefix}-kubernetes-credentials"
+    s"${kubernetesConf.resourceNamePrefix}-kubernetes-credentials"
 
   override def configurePod(pod: SparkPod): SparkPod = {
     if (!shouldMountSecret) {
@@ -122,7 +122,7 @@ private[spark] class 
DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube
     val redactedTokens = kubernetesConf.sparkConf.getAll
       .filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX))
       .toMap
-      .mapValues( _ => "<present_but_redacted>")
+      .map { case (k, v) => (k, "<present_but_redacted>") }
     redactedTokens ++
       resolvedMountedCaCertFile.map { file =>
         Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
index f2d7bbd..4230545 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
@@ -20,13 +20,13 @@ import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.{Clock, SystemClock}
 
 private[spark] class DriverServiceFeatureStep(
-    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf],
+    kubernetesConf: KubernetesDriverConf,
     clock: Clock = new SystemClock)
   extends KubernetesFeatureConfigStep with Logging {
   import DriverServiceFeatureStep._
@@ -38,7 +38,7 @@ private[spark] class DriverServiceFeatureStep(
     s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's 
hostname will be " +
       "managed via a Kubernetes service.")
 
-  private val preferredServiceName = 
s"${kubernetesConf.appResourceNamePrefix}$DRIVER_SVC_POSTFIX"
+  private val preferredServiceName = 
s"${kubernetesConf.resourceNamePrefix}$DRIVER_SVC_POSTFIX"
   private val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
     preferredServiceName
   } else {
@@ -58,7 +58,7 @@ private[spark] class DriverServiceFeatureStep(
   override def configurePod(pod: SparkPod): SparkPod = pod
 
   override def getAdditionalPodSystemProperties(): Map[String, String] = {
-    val driverHostname = 
s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
+    val driverHostname = 
s"$resolvedServiceName.${kubernetesConf.namespace}.svc"
     Map(DRIVER_HOST_KEY -> driverHostname,
       "spark.driver.port" -> driverPort.toString,
       org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key ->
@@ -72,7 +72,7 @@ private[spark] class DriverServiceFeatureStep(
         .endMetadata()
       .withNewSpec()
         .withClusterIP("None")
-        .withSelector(kubernetesConf.roleLabels.asJava)
+        .withSelector(kubernetesConf.labels.asJava)
         .addNewPort()
           .withName(DRIVER_PORT_NAME)
           .withPort(driverPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala
index 03ff7d4..d78f04d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala
@@ -20,14 +20,13 @@ import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
HasMetadata}
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
 
-private[spark] class EnvSecretsFeatureStep(
-    kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+private[spark] class EnvSecretsFeatureStep(kubernetesConf: KubernetesConf)
   extends KubernetesFeatureConfigStep {
   override def configurePod(pod: SparkPod): SparkPod = {
     val addedEnvSecrets = kubernetesConf
-      .roleSecretEnvNamesToKeyRefs
+      .secretEnvNamesToKeyRefs
       .map{ case (envName, keyRef) =>
         // Keyref parts
         val keyRefParts = keyRef.split(":")
@@ -50,8 +49,4 @@ private[spark] class EnvSecretsFeatureStep(
       .build()
     SparkPod(pod.pod, containerWithEnvVars)
   }
-
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
index fd09de2..bca6675 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
@@ -16,9 +16,7 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import io.fabric8.kubernetes.api.model.HasMetadata
-
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
 import org.apache.spark.internal.Logging
@@ -28,21 +26,15 @@ import org.apache.spark.internal.Logging
  * containing Hadoop config files mounted as volumes and an ENV variable
  * pointed to the mounted file directory.
  */
-private[spark] class HadoopConfExecutorFeatureStep(
-    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+private[spark] class HadoopConfExecutorFeatureStep(conf: 
KubernetesExecutorConf)
   extends KubernetesFeatureConfigStep with Logging {
 
   override def configurePod(pod: SparkPod): SparkPod = {
-    val sparkConf = kubernetesConf.sparkConf
-    val hadoopConfDirCMapName = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
+    val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
     require(hadoopConfDirCMapName.isDefined,
       "Ensure that the env `HADOOP_CONF_DIR` is defined either in the client 
or " +
         " using pre-existing ConfigMaps")
     logInfo("HADOOP_CONF_DIR defined")
     HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, 
hadoopConfDirCMapName, pod)
   }
-
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
index 5b6a6d5..e342110 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
@@ -16,28 +16,19 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import io.fabric8.kubernetes.api.model.HasMetadata
-
-import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf
 import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
-import org.apache.spark.internal.Logging
 
 /**
  * This step is responsible for setting ENV_SPARK_USER when HADOOP_FILES are 
detected
  * however, this step would not be run if Kerberos is enabled, as Kerberos 
sets SPARK_USER
  */
-private[spark] class HadoopSparkUserExecutorFeatureStep(
-    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
-  extends KubernetesFeatureConfigStep with Logging {
+private[spark] class HadoopSparkUserExecutorFeatureStep(conf: 
KubernetesExecutorConf)
+  extends KubernetesFeatureConfigStep {
 
   override def configurePod(pod: SparkPod): SparkPod = {
-    val sparkUserName = kubernetesConf.sparkConf.get(KERBEROS_SPARK_USER_NAME)
+    val sparkUserName = conf.get(KERBEROS_SPARK_USER_NAME)
     HadoopBootstrapUtil.bootstrapSparkUserPod(sparkUserName, pod)
   }
-
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
index ce47933..c6d5a86 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
@@ -16,40 +16,43 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.api.model.{HasMetadata, Secret, SecretBuilder}
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, 
SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf
 import org.apache.spark.deploy.k8s.features.hadooputils._
-import org.apache.spark.internal.Logging
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 
 /**
  * Runs the necessary Hadoop-based logic based on Kerberos configs and the 
presence of the
  * HADOOP_CONF_DIR. This runs various bootstrap methods defined in 
HadoopBootstrapUtil.
  */
-private[spark] class KerberosConfDriverFeatureStep(
-    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
-  extends KubernetesFeatureConfigStep with Logging {
-
-  require(kubernetesConf.hadoopConfSpec.isDefined,
-     "Ensure that HADOOP_CONF_DIR is defined either via env or a pre-defined 
ConfigMap")
-  private val hadoopConfDirSpec = kubernetesConf.hadoopConfSpec.get
-  private val conf = kubernetesConf.sparkConf
-  private val principal = conf.get(org.apache.spark.internal.config.PRINCIPAL)
-  private val keytab = conf.get(org.apache.spark.internal.config.KEYTAB)
-  private val existingSecretName = conf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
-  private val existingSecretItemKey = 
conf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
-  private val krb5File = conf.get(KUBERNETES_KERBEROS_KRB5_FILE)
-  private val krb5CMap = conf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP)
-  private val kubeTokenManager = kubernetesConf.tokenManager(conf,
-    SparkHadoopUtil.get.newConfiguration(conf))
+private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: 
KubernetesDriverConf)
+  extends KubernetesFeatureConfigStep {
+
+  private val hadoopConfDir = 
Option(kubernetesConf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
+  private val hadoopConfigMapName = 
kubernetesConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
+  KubernetesUtils.requireNandDefined(
+    hadoopConfDir,
+    hadoopConfigMapName,
+    "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " 
+
+    "as the creation of an additional ConfigMap, when one is already specified 
is extraneous")
+
+  private val principal = 
kubernetesConf.get(org.apache.spark.internal.config.PRINCIPAL)
+  private val keytab = 
kubernetesConf.get(org.apache.spark.internal.config.KEYTAB)
+  private val existingSecretName = 
kubernetesConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
+  private val existingSecretItemKey = 
kubernetesConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
+  private val krb5File = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_FILE)
+  private val krb5CMap = 
kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP)
+  private val hadoopConf = 
SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf)
+  private val tokenManager = new 
HadoopDelegationTokenManager(kubernetesConf.sparkConf, hadoopConf)
   private val isKerberosEnabled =
-    (hadoopConfDirSpec.hadoopConfDir.isDefined && 
kubeTokenManager.isSecurityEnabled) ||
-      (hadoopConfDirSpec.hadoopConfigMapName.isDefined &&
-        (krb5File.isDefined || krb5CMap.isDefined))
+    (hadoopConfDir.isDefined && UserGroupInformation.isSecurityEnabled) ||
+      (hadoopConfigMapName.isDefined && (krb5File.isDefined || 
krb5CMap.isDefined))
   require(keytab.isEmpty || isKerberosEnabled,
     "You must enable Kerberos support if you are specifying a Kerberos Keytab")
 
@@ -76,11 +79,11 @@ private[spark] class KerberosConfDriverFeatureStep(
     "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
       " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { 
hConfDir =>
+  private val hadoopConfigurationFiles = hadoopConfDir.map { hConfDir =>
     HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
   }
   private val newHadoopConfigMapName =
-    if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
+    if (hadoopConfigMapName.isEmpty) {
       Some(kubernetesConf.hadoopConfigMapName)
     } else {
       None
@@ -95,23 +98,24 @@ private[spark] class KerberosConfDriverFeatureStep(
       dtSecret = None,
       dtSecretName = secretName,
       dtSecretItemKey = secretItemKey,
-      jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
+      jobUserName = UserGroupInformation.getCurrentUser.getShortUserName)
   }).orElse(
     if (isKerberosEnabled) {
-      Some(HadoopKerberosLogin.buildSpec(
-        conf,
-        kubernetesConf.appResourceNamePrefix,
-        kubeTokenManager))
+      Some(buildKerberosSpec())
     } else {
       None
     }
   )
 
   override def configurePod(pod: SparkPod): SparkPod = {
+    if (!isKerberosEnabled) {
+      return pod
+    }
+
     val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir(
-      hadoopConfDirSpec.hadoopConfDir,
+      hadoopConfDir,
       newHadoopConfigMapName,
-      hadoopConfDirSpec.hadoopConfigMapName,
+      hadoopConfigMapName,
       pod)
     kerberosConfSpec.map { hSpec =>
       HadoopBootstrapUtil.bootstrapKerberosPod(
@@ -124,11 +128,15 @@ private[spark] class KerberosConfDriverFeatureStep(
         hadoopBasedSparkPod)
     }.getOrElse(
       HadoopBootstrapUtil.bootstrapSparkUserPod(
-        kubeTokenManager.getCurrentUser.getShortUserName,
+        UserGroupInformation.getCurrentUser.getShortUserName,
         hadoopBasedSparkPod))
   }
 
   override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    if (!isKerberosEnabled) {
+      return Map.empty
+    }
+
     val resolvedConfValues = kerberosConfSpec.map { hSpec =>
       Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName,
         KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey,
@@ -136,13 +144,16 @@ private[spark] class KerberosConfDriverFeatureStep(
         KRB5_CONFIG_MAP_NAME -> 
krb5CMap.getOrElse(kubernetesConf.krbConfigMapName))
       }.getOrElse(
         Map(KERBEROS_SPARK_USER_NAME ->
-          kubeTokenManager.getCurrentUser.getShortUserName))
+          UserGroupInformation.getCurrentUser.getShortUserName))
     Map(HADOOP_CONFIG_MAP_NAME ->
-      hadoopConfDirSpec.hadoopConfigMapName.getOrElse(
-      kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues
+      hadoopConfigMapName.getOrElse(kubernetesConf.hadoopConfigMapName)) ++ 
resolvedConfValues
   }
 
   override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    if (!isKerberosEnabled) {
+      return Seq.empty
+    }
+
     val hadoopConfConfigMap = for {
       hName <- newHadoopConfigMapName
       hFiles <- hadoopConfigurationFiles
@@ -162,4 +173,34 @@ private[spark] class KerberosConfDriverFeatureStep(
       krb5ConfigMap.toSeq ++
       kerberosDTSecret.toSeq
   }
+
+  private def buildKerberosSpec(): KerberosConfigSpec = {
+    // The JobUserUGI will be taken fom the Local Ticket Cache or via 
keytab+principal
+    // The login happens in the SparkSubmit so login logic is not necessary to 
include
+    val jobUserUGI = UserGroupInformation.getCurrentUser
+    val creds = jobUserUGI.getCredentials
+    tokenManager.obtainDelegationTokens(creds)
+    val tokenData = SparkHadoopUtil.get.serialize(creds)
+    require(tokenData.nonEmpty, "Did not obtain any delegation tokens")
+    val newSecretName =
+      
s"${kubernetesConf.resourceNamePrefix}-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME"
+    val secretDT =
+      new SecretBuilder()
+        .withNewMetadata()
+          .withName(newSecretName)
+          .endMetadata()
+        .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(tokenData))
+        .build()
+    KerberosConfigSpec(
+      dtSecret = Some(secretDT),
+      dtSecretName = newSecretName,
+      dtSecretItemKey = KERBEROS_SECRET_KEY,
+      jobUserName = jobUserUGI.getShortUserName)
+  }
+
+  private case class KerberosConfigSpec(
+      dtSecret: Option[Secret],
+      dtSecretName: String,
+      dtSecretItemKey: String,
+      jobUserName: String)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
index 06a88b6..32bb6a5 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
@@ -16,38 +16,29 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import io.fabric8.kubernetes.api.model.HasMetadata
-
-import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf
 import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
 import org.apache.spark.internal.Logging
 
 /**
  * This step is responsible for mounting the DT secret for the executors
  */
-private[spark] class KerberosConfExecutorFeatureStep(
-    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+private[spark] class KerberosConfExecutorFeatureStep(conf: 
KubernetesExecutorConf)
   extends KubernetesFeatureConfigStep with Logging {
 
-  private val sparkConf = kubernetesConf.sparkConf
-  private val maybeKrb5CMap = sparkConf.getOption(KRB5_CONFIG_MAP_NAME)
+  private val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
   require(maybeKrb5CMap.isDefined, "HADOOP_CONF_DIR ConfigMap not found")
 
   override def configurePod(pod: SparkPod): SparkPod = {
     logInfo(s"Mounting Resources for Kerberos")
     HadoopBootstrapUtil.bootstrapKerberosPod(
-      sparkConf.get(KERBEROS_DT_SECRET_NAME),
-      sparkConf.get(KERBEROS_DT_SECRET_KEY),
-      sparkConf.get(KERBEROS_SPARK_USER_NAME),
+      conf.get(KERBEROS_DT_SECRET_NAME),
+      conf.get(KERBEROS_DT_SECRET_KEY),
+      conf.get(KERBEROS_SPARK_USER_NAME),
       None,
       None,
       maybeKrb5CMap,
       pod)
   }
-
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = 
Seq.empty[HasMetadata]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
index be386e1..19ed2df 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
@@ -16,16 +16,15 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import java.nio.file.Paths
 import java.util.UUID
 
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder, VolumeBuilder, VolumeMountBuilder}
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 
 private[spark] class LocalDirsFeatureStep(
-    conf: KubernetesConf[_ <: KubernetesRoleSpecificConf],
+    conf: KubernetesConf,
     defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}")
   extends KubernetesFeatureConfigStep {
 
@@ -73,8 +72,4 @@ private[spark] class LocalDirsFeatureStep(
       .build()
     SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
   }
-
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
index 97fa949..f4e1a3a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
@@ -18,14 +18,13 @@ package org.apache.spark.deploy.k8s.features
 
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder, VolumeBuilder, VolumeMountBuilder}
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
 
-private[spark] class MountSecretsFeatureStep(
-    kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+private[spark] class MountSecretsFeatureStep(kubernetesConf: KubernetesConf)
   extends KubernetesFeatureConfigStep {
   override def configurePod(pod: SparkPod): SparkPod = {
     val addedVolumes = kubernetesConf
-      .roleSecretNamesToMountPaths
+      .secretNamesToMountPaths
       .keys
       .map(secretName =>
         new VolumeBuilder()
@@ -40,7 +39,7 @@ private[spark] class MountSecretsFeatureStep(
         .endSpec()
       .build()
     val addedVolumeMounts = kubernetesConf
-      .roleSecretNamesToMountPaths
+      .secretNamesToMountPaths
       .map {
         case (secretName, mountPath) =>
           new VolumeMountBuilder()
@@ -54,9 +53,5 @@ private[spark] class MountSecretsFeatureStep(
     SparkPod(podWithVolumes, containerWithMounts)
   }
 
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
-
   private def secretVolumeName(secretName: String): String = 
s"$secretName-volume"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index 1473a7d..8548e70 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -20,12 +20,11 @@ import io.fabric8.kubernetes.api.model._
 
 import org.apache.spark.deploy.k8s._
 
-private[spark] class MountVolumesFeatureStep(
-    kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
   extends KubernetesFeatureConfigStep {
 
   override def configurePod(pod: SparkPod): SparkPod = {
-    val (volumeMounts, volumes) = 
constructVolumes(kubernetesConf.roleVolumes).unzip
+    val (volumeMounts, volumes) = constructVolumes(conf.volumes).unzip
 
     val podWithVolumes = new PodBuilder(pod.pod)
       .editSpec()
@@ -40,12 +39,8 @@ private[spark] class MountVolumesFeatureStep(
     SparkPod(podWithVolumes, containerWithVolumeMounts)
   }
 
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
-
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
-
   private def constructVolumes(
-    volumeSpecs: Iterable[KubernetesVolumeSpec[_ <: 
KubernetesVolumeSpecificConf]]
+    volumeSpecs: Iterable[KubernetesVolumeSpec]
   ): Iterable[(VolumeMount, Volume)] = {
     volumeSpecs.map { spec =>
       val volumeMount = new VolumeMountBuilder()

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
index 28e2d17..09dcf93 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
@@ -22,12 +22,11 @@ import java.nio.charset.StandardCharsets
 import com.google.common.io.Files
 import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, 
HasMetadata, PodBuilder}
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 
-private[spark] class PodTemplateConfigMapStep(
-   conf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf)
   extends KubernetesFeatureConfigStep {
   def configurePod(pod: SparkPod): SparkPod = {
     val podWithVolume = new PodBuilder(pod.pod)

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
deleted file mode 100644
index 0022d8f..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features.hadooputils
-
-import io.fabric8.kubernetes.api.model.SecretBuilder
-import org.apache.commons.codec.binary.Base64
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.k8s.Constants._
-import 
org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
-
-/**
- * This logic does all the heavy lifting for Delegation Token creation. This 
step
- * assumes that the job user has either specified a principal and keytab or ran
- * $kinit before running spark-submit. By running UGI.getCurrentUser we are 
able
- * to obtain the current user, either signed in via $kinit or keytab. With the
- * Job User principal you then retrieve the delegation token from the NameNode
- * and store values in DelegationToken. Lastly, the class puts the data into
- * a secret. All this is defined in a KerberosConfigSpec.
- */
-private[spark] object HadoopKerberosLogin {
-  def buildSpec(
-      submissionSparkConf: SparkConf,
-      kubernetesResourceNamePrefix: String,
-      tokenManager: KubernetesHadoopDelegationTokenManager): 
KerberosConfigSpec = {
-    // The JobUserUGI will be taken fom the Local Ticket Cache or via 
keytab+principal
-    // The login happens in the SparkSubmit so login logic is not necessary to 
include
-    val jobUserUGI = tokenManager.getCurrentUser
-    val originalCredentials = jobUserUGI.getCredentials
-    tokenManager.obtainDelegationTokens(originalCredentials)
-
-    val tokenData = SparkHadoopUtil.get.serialize(originalCredentials)
-
-    val initialTokenDataKeyName = KERBEROS_SECRET_KEY
-    val newSecretName = 
s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME"
-    val secretDT =
-      new SecretBuilder()
-        .withNewMetadata()
-          .withName(newSecretName)
-          .endMetadata()
-        .addToData(initialTokenDataKeyName, 
Base64.encodeBase64String(tokenData))
-        .build()
-    KerberosConfigSpec(
-      dtSecret = Some(secretDT),
-      dtSecretName = newSecretName,
-      dtSecretItemKey = initialTokenDataKeyName,
-      jobUserName = jobUserUGI.getShortUserName)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
deleted file mode 100644
index 3e98d58..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.k8s.security
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-
-/**
- * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
- */
-private[spark] class KubernetesHadoopDelegationTokenManager(
-    _sparkConf: SparkConf,
-    _hadoopConf: Configuration)
-  extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {
-
-  def getCurrentUser: UserGroupInformation = 
UserGroupInformation.getCurrentUser
-  def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 543d6b1..70a93c9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -17,19 +17,17 @@
 package org.apache.spark.deploy.k8s.submit
 
 import java.io.StringWriter
-import java.util.{Collections, Locale, Properties, UUID}
 import java.util.{Collections, UUID}
 import java.util.Properties
 
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.client.KubernetesClient
-import org.apache.hadoop.security.UserGroupInformation
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkApplication
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkKubernetesClientFactory}
+import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
@@ -47,8 +45,7 @@ private[spark] case class ClientArguments(
     mainAppResource: MainAppResource,
     mainClass: String,
     driverArgs: Array[String],
-    maybePyFiles: Option[String],
-    hadoopConfigDir: Option[String])
+    maybePyFiles: Option[String])
 
 private[spark] object ClientArguments {
 
@@ -82,8 +79,7 @@ private[spark] object ClientArguments {
       mainAppResource,
       mainClass.get,
       driverArgs.toArray,
-      maybePyFiles,
-      sys.env.get(ENV_HADOOP_CONF_DIR))
+      maybePyFiles)
   }
 }
 
@@ -92,27 +88,24 @@ private[spark] object ClientArguments {
  * watcher that monitors and logs the application status. Waits for the 
application to terminate if
  * spark.kubernetes.submission.waitAppCompletion is true.
  *
+ * @param conf The kubernetes driver config.
  * @param builder Responsible for building the base driver pod based on a 
composition of
  *                implemented features.
- * @param kubernetesConf application configuration
  * @param kubernetesClient the client to talk to the Kubernetes API server
  * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
  *                             to complete
- * @param appName the application name
  * @param watcher a watcher that monitors and logs the application status
  */
 private[spark] class Client(
+    conf: KubernetesDriverConf,
     builder: KubernetesDriverBuilder,
-    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf],
     kubernetesClient: KubernetesClient,
     waitForAppCompletion: Boolean,
-    appName: String,
-    watcher: LoggingPodStatusWatcher,
-    kubernetesResourceNamePrefix: String) extends Logging {
+    watcher: LoggingPodStatusWatcher) extends Logging {
 
   def run(): Unit = {
-    val resolvedDriverSpec = builder.buildFromFeatures(kubernetesConf)
-    val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map"
+    val resolvedDriverSpec = builder.buildFromFeatures(conf)
+    val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
     val configMap = buildConfigMap(configMapName, 
resolvedDriverSpec.systemProperties)
     // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
     // Spark command builder to pickup on the Java Options present in the 
ConfigMap
@@ -155,11 +148,11 @@ private[spark] class Client(
       }
 
       if (waitForAppCompletion) {
-        logInfo(s"Waiting for application $appName to finish...")
+        logInfo(s"Waiting for application ${conf.appName} to finish...")
         watcher.awaitCompletion()
-        logInfo(s"Application $appName finished.")
+        logInfo(s"Application ${conf.appName} finished.")
       } else {
-        logInfo(s"Deployed Spark application $appName into Kubernetes.")
+        logInfo(s"Deployed Spark application ${conf.appName} into Kubernetes.")
       }
     }
   }
@@ -216,19 +209,13 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
     // a unique app ID (captured by spark.app.id) in the format below.
     val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", 
"")}"
     val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
-    val kubernetesResourceNamePrefix = 
KubernetesClientApplication.getResourceNamePrefix(appName)
-    sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, 
clientArguments.maybePyFiles.getOrElse(""))
     val kubernetesConf = KubernetesConf.createDriverConf(
       sparkConf,
-      appName,
-      kubernetesResourceNamePrefix,
       kubernetesAppId,
       clientArguments.mainAppResource,
       clientArguments.mainClass,
       clientArguments.driverArgs,
-      clientArguments.maybePyFiles,
-      clientArguments.hadoopConfigDir)
-    val namespace = kubernetesConf.namespace()
+      clientArguments.maybePyFiles)
     // The master URL has been checked for validity already in SparkSubmit.
     // We just need to get rid of the "k8s://" prefix here.
     val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
@@ -238,36 +225,18 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
 
     Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
       master,
-      Some(namespace),
+      Some(kubernetesConf.namespace),
       KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
       sparkConf,
       None,
       None)) { kubernetesClient =>
         val client = new Client(
-          KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf),
           kubernetesConf,
+          KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf),
           kubernetesClient,
           waitForAppCompletion,
-          appName,
-          watcher,
-          kubernetesResourceNamePrefix)
+          watcher)
         client.run()
     }
   }
 }
-
-private[spark] object KubernetesClientApplication {
-
-  def getAppName(conf: SparkConf): String = 
conf.getOption("spark.app.name").getOrElse("spark")
-
-  def getResourceNamePrefix(appName: String): String = {
-    val launchTime = System.currentTimeMillis()
-    s"$appName-$launchTime"
-      .trim
-      .toLowerCase(Locale.ROOT)
-      .replaceAll("\\s+", "-")
-      .replaceAll("\\.", "-")
-      .replaceAll("[^a-z0-9\\-]", "")
-      .replaceAll("-+", "-")
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to