[ 
https://issues.apache.org/jira/browse/SPARK-25877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719388#comment-16719388
 ] 

ASF GitHub Bot commented on SPARK-25877:
----------------------------------------

asfgit closed pull request #23220: [SPARK-25877][k8s] Move all feature logic to 
feature classes.
URL: https://github.com/apache/spark/pull/23220
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
index bca66759d586e..da332881ae1a2 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
@@ -31,10 +31,10 @@ private[spark] class HadoopConfExecutorFeatureStep(conf: 
KubernetesExecutorConf)
 
   override def configurePod(pod: SparkPod): SparkPod = {
     val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
-    require(hadoopConfDirCMapName.isDefined,
-      "Ensure that the env `HADOOP_CONF_DIR` is defined either in the client 
or " +
-        " using pre-existing ConfigMaps")
-    logInfo("HADOOP_CONF_DIR defined")
-    HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, 
hadoopConfDirCMapName, pod)
+    if (hadoopConfDirCMapName.isDefined) {
+      HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, 
hadoopConfDirCMapName, pod)
+    } else {
+      pod
+    }
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
index e342110763196..c038e75491ca5 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
@@ -28,7 +28,8 @@ private[spark] class HadoopSparkUserExecutorFeatureStep(conf: 
KubernetesExecutor
   extends KubernetesFeatureConfigStep {
 
   override def configurePod(pod: SparkPod): SparkPod = {
-    val sparkUserName = conf.get(KERBEROS_SPARK_USER_NAME)
-    HadoopBootstrapUtil.bootstrapSparkUserPod(sparkUserName, pod)
+    conf.getOption(KERBEROS_SPARK_USER_NAME).map { user =>
+      HadoopBootstrapUtil.bootstrapSparkUserPod(user, pod)
+    }.getOrElse(pod)
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
index 32bb6a5d2bcbb..907271b1cb483 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
@@ -27,18 +27,20 @@ import org.apache.spark.internal.Logging
 private[spark] class KerberosConfExecutorFeatureStep(conf: 
KubernetesExecutorConf)
   extends KubernetesFeatureConfigStep with Logging {
 
-  private val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
-  require(maybeKrb5CMap.isDefined, "HADOOP_CONF_DIR ConfigMap not found")
-
   override def configurePod(pod: SparkPod): SparkPod = {
-    logInfo(s"Mounting Resources for Kerberos")
-    HadoopBootstrapUtil.bootstrapKerberosPod(
-      conf.get(KERBEROS_DT_SECRET_NAME),
-      conf.get(KERBEROS_DT_SECRET_KEY),
-      conf.get(KERBEROS_SPARK_USER_NAME),
-      None,
-      None,
-      maybeKrb5CMap,
-      pod)
+    val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
+    if (maybeKrb5CMap.isDefined) {
+      logInfo(s"Mounting Resources for Kerberos")
+      HadoopBootstrapUtil.bootstrapKerberosPod(
+        conf.get(KERBEROS_DT_SECRET_NAME),
+        conf.get(KERBEROS_DT_SECRET_KEY),
+        conf.get(KERBEROS_SPARK_USER_NAME),
+        None,
+        None,
+        maybeKrb5CMap,
+        pod)
+    } else {
+      pod
+    }
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
index 09dcf93a54f8e..7f41ca43589b6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
@@ -28,44 +28,60 @@ import org.apache.spark.deploy.k8s.Constants._
 
 private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf)
   extends KubernetesFeatureConfigStep {
+
+  private val hasTemplate = conf.contains(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
+
   def configurePod(pod: SparkPod): SparkPod = {
-    val podWithVolume = new PodBuilder(pod.pod)
-        .editSpec()
-          .addNewVolume()
-            .withName(POD_TEMPLATE_VOLUME)
-            .withNewConfigMap()
-              .withName(POD_TEMPLATE_CONFIGMAP)
-              .addNewItem()
-                .withKey(POD_TEMPLATE_KEY)
-                .withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
-              .endItem()
-            .endConfigMap()
-          .endVolume()
-        .endSpec()
-      .build()
+    if (hasTemplate) {
+      val podWithVolume = new PodBuilder(pod.pod)
+          .editSpec()
+            .addNewVolume()
+              .withName(POD_TEMPLATE_VOLUME)
+              .withNewConfigMap()
+                .withName(POD_TEMPLATE_CONFIGMAP)
+                .addNewItem()
+                  .withKey(POD_TEMPLATE_KEY)
+                  .withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
+                .endItem()
+              .endConfigMap()
+            .endVolume()
+          .endSpec()
+        .build()
 
-    val containerWithVolume = new ContainerBuilder(pod.container)
-        .addNewVolumeMount()
-          .withName(POD_TEMPLATE_VOLUME)
-          .withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
-        .endVolumeMount()
-      .build()
-    SparkPod(podWithVolume, containerWithVolume)
+      val containerWithVolume = new ContainerBuilder(pod.container)
+          .addNewVolumeMount()
+            .withName(POD_TEMPLATE_VOLUME)
+            .withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
+          .endVolumeMount()
+        .build()
+      SparkPod(podWithVolume, containerWithVolume)
+    } else {
+      pod
+    }
   }
 
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map[String, String](
-    KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
-      (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + 
EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    if (hasTemplate) {
+      Map[String, String](
+        KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
+          (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + 
EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
+    } else {
+      Map.empty
+    }
+  }
 
   override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
-    require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
-    val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
-    val podTemplateString = Files.toString(new File(podTemplateFile), 
StandardCharsets.UTF_8)
-    Seq(new ConfigMapBuilder()
-        .withNewMetadata()
-          .withName(POD_TEMPLATE_CONFIGMAP)
-        .endMetadata()
-        .addToData(POD_TEMPLATE_KEY, podTemplateString)
-      .build())
+    if (hasTemplate) {
+      val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
+      val podTemplateString = Files.toString(new File(podTemplateFile), 
StandardCharsets.UTF_8)
+      Seq(new ConfigMapBuilder()
+          .withNewMetadata()
+            .withName(POD_TEMPLATE_CONFIGMAP)
+          .endMetadata()
+          .addToData(POD_TEMPLATE_KEY, podTemplateString)
+        .build())
+    } else {
+      Nil
+    }
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 70a93c968795e..3888778bf84ca 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -104,7 +104,7 @@ private[spark] class Client(
     watcher: LoggingPodStatusWatcher) extends Logging {
 
   def run(): Unit = {
-    val resolvedDriverSpec = builder.buildFromFeatures(conf)
+    val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
     val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
     val configMap = buildConfigMap(configMapName, 
resolvedDriverSpec.systemProperties)
     // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
@@ -232,7 +232,7 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
       None)) { kubernetesClient =>
         val client = new Client(
           kubernetesConf,
-          KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf),
+          new KubernetesDriverBuilder(),
           kubernetesClient,
           waitForAppCompletion,
           watcher)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index a5ad9729aee9a..d2c0ced9fa2f4 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -20,90 +20,49 @@ import java.io.File
 
 import io.fabric8.kubernetes.client.KubernetesClient
 
-import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.features._
 
-private[spark] class KubernetesDriverBuilder(
-    provideBasicStep: (KubernetesDriverConf => BasicDriverFeatureStep) =
-      new BasicDriverFeatureStep(_),
-    provideCredentialsStep: (KubernetesDriverConf => 
DriverKubernetesCredentialsFeatureStep) =
-      new DriverKubernetesCredentialsFeatureStep(_),
-    provideServiceStep: (KubernetesDriverConf => DriverServiceFeatureStep) =
-      new DriverServiceFeatureStep(_),
-    provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
-      new MountSecretsFeatureStep(_),
-    provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
-      new EnvSecretsFeatureStep(_),
-    provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) =
-      new LocalDirsFeatureStep(_),
-    provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) =
-      new MountVolumesFeatureStep(_),
-    provideDriverCommandStep: (KubernetesDriverConf => 
DriverCommandFeatureStep) =
-      new DriverCommandFeatureStep(_),
-    provideHadoopGlobalStep: (KubernetesDriverConf => 
KerberosConfDriverFeatureStep) =
-      new KerberosConfDriverFeatureStep(_),
-    providePodTemplateConfigMapStep: (KubernetesConf => 
PodTemplateConfigMapStep) =
-      new PodTemplateConfigMapStep(_),
-    provideInitialPod: () => SparkPod = () => SparkPod.initialPod) {
+private[spark] class KubernetesDriverBuilder {
 
-  def buildFromFeatures(kubernetesConf: KubernetesDriverConf): 
KubernetesDriverSpec = {
-    val baseFeatures = Seq(
-      provideBasicStep(kubernetesConf),
-      provideCredentialsStep(kubernetesConf),
-      provideServiceStep(kubernetesConf),
-      provideLocalDirsStep(kubernetesConf))
-
-    val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
-      Seq(provideSecretsStep(kubernetesConf))
-    } else Nil
-    val envSecretFeature = if 
(kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) {
-      Seq(provideEnvSecretsStep(kubernetesConf))
-    } else Nil
-    val volumesFeature = if (kubernetesConf.volumes.nonEmpty) {
-      Seq(provideVolumesStep(kubernetesConf))
-    } else Nil
-    val podTemplateFeature = if (
-      
kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
-      Seq(providePodTemplateConfigMapStep(kubernetesConf))
-    } else Nil
-
-    val driverCommandStep = provideDriverCommandStep(kubernetesConf)
-
-    val hadoopConfigStep = Some(provideHadoopGlobalStep(kubernetesConf))
+  def buildFromFeatures(
+      conf: KubernetesDriverConf,
+      client: KubernetesClient): KubernetesDriverSpec = {
+    val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
+      .map { file =>
+        KubernetesUtils.loadPodFromTemplate(
+          client,
+          new File(file),
+          conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
+      }
+      .getOrElse(SparkPod.initialPod())
 
-    val allFeatures: Seq[KubernetesFeatureConfigStep] =
-      baseFeatures ++ Seq(driverCommandStep) ++
-        secretFeature ++ envSecretFeature ++ volumesFeature ++
-        hadoopConfigStep ++ podTemplateFeature
+    val features = Seq(
+      new BasicDriverFeatureStep(conf),
+      new DriverKubernetesCredentialsFeatureStep(conf),
+      new DriverServiceFeatureStep(conf),
+      new MountSecretsFeatureStep(conf),
+      new EnvSecretsFeatureStep(conf),
+      new LocalDirsFeatureStep(conf),
+      new MountVolumesFeatureStep(conf),
+      new DriverCommandFeatureStep(conf),
+      new KerberosConfDriverFeatureStep(conf),
+      new PodTemplateConfigMapStep(conf))
 
-    var spec = KubernetesDriverSpec(
-      provideInitialPod(),
+    val spec = KubernetesDriverSpec(
+      initialPod,
       driverKubernetesResources = Seq.empty,
-      kubernetesConf.sparkConf.getAll.toMap)
-    for (feature <- allFeatures) {
+      conf.sparkConf.getAll.toMap)
+
+    features.foldLeft(spec) { case (spec, feature) =>
       val configuredPod = feature.configurePod(spec.pod)
       val addedSystemProperties = feature.getAdditionalPodSystemProperties()
       val addedResources = feature.getAdditionalKubernetesResources()
-      spec = KubernetesDriverSpec(
+      KubernetesDriverSpec(
         configuredPod,
         spec.driverKubernetesResources ++ addedResources,
         spec.systemProperties ++ addedSystemProperties)
     }
-    spec
   }
-}
 
-private[spark] object KubernetesDriverBuilder {
-  def apply(kubernetesClient: KubernetesClient, conf: SparkConf): 
KubernetesDriverBuilder = {
-    conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
-      .map(new File(_))
-      .map(file => new KubernetesDriverBuilder(provideInitialPod = () =>
-        KubernetesUtils.loadPodFromTemplate(
-          kubernetesClient,
-          file,
-          conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
-      ))
-      .getOrElse(new KubernetesDriverBuilder())
-  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index ac42554b1334b..da3edfeca9b1f 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -136,7 +136,8 @@ private[spark] class ExecutorPodsAllocator(
             newExecutorId.toString,
             applicationId,
             driverPod)
-          val executorPod = executorBuilder.buildFromFeatures(executorConf, 
secMgr)
+          val executorPod = executorBuilder.buildFromFeatures(executorConf, 
secMgr,
+            kubernetesClient)
           val podWithAttachedContainer = new PodBuilder(executorPod.pod)
             .editOrNewSpec()
             .addToContainers(executorPod.container)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index b31fbb420ed6d..809bdf8ca8c27 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -95,7 +95,7 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
     val executorPodsAllocator = new ExecutorPodsAllocator(
       sc.conf,
       sc.env.securityManager,
-      KubernetesExecutorBuilder(kubernetesClient, sc.conf),
+      new KubernetesExecutorBuilder(),
       kubernetesClient,
       snapshotsStore,
       new SystemClock())
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index ba273cad6a8e5..0b74966fe8685 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -20,86 +20,36 @@ import java.io.File
 
 import io.fabric8.kubernetes.client.KubernetesClient
 
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.SecurityManager
 import org.apache.spark.deploy.k8s._
-import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features._
 
-private[spark] class KubernetesExecutorBuilder(
-    provideBasicStep: (KubernetesExecutorConf, SecurityManager) => 
BasicExecutorFeatureStep =
-      new BasicExecutorFeatureStep(_, _),
-    provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
-      new MountSecretsFeatureStep(_),
-    provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
-      new EnvSecretsFeatureStep(_),
-    provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) =
-      new LocalDirsFeatureStep(_),
-    provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) =
-      new MountVolumesFeatureStep(_),
-    provideHadoopConfStep: (KubernetesExecutorConf => 
HadoopConfExecutorFeatureStep) =
-      new HadoopConfExecutorFeatureStep(_),
-    provideKerberosConfStep: (KubernetesExecutorConf => 
KerberosConfExecutorFeatureStep) =
-      new KerberosConfExecutorFeatureStep(_),
-    provideHadoopSparkUserStep: (KubernetesExecutorConf => 
HadoopSparkUserExecutorFeatureStep) =
-      new HadoopSparkUserExecutorFeatureStep(_),
-    provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
+private[spark] class KubernetesExecutorBuilder {
 
   def buildFromFeatures(
-      kubernetesConf: KubernetesExecutorConf,
-      secMgr: SecurityManager): SparkPod = {
-    val sparkConf = kubernetesConf.sparkConf
-    val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
-    val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME)
-    val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY)
-
-    val baseFeatures = Seq(provideBasicStep(kubernetesConf, secMgr),
-      provideLocalDirsStep(kubernetesConf))
-    val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
-      Seq(provideSecretsStep(kubernetesConf))
-    } else Nil
-    val secretEnvFeature = if 
(kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) {
-      Seq(provideEnvSecretsStep(kubernetesConf))
-    } else Nil
-    val volumesFeature = if (kubernetesConf.volumes.nonEmpty) {
-      Seq(provideVolumesStep(kubernetesConf))
-    } else Nil
-
-    val maybeHadoopConfFeatureSteps = maybeHadoopConfigMap.map { _ =>
-      val maybeKerberosStep =
-        if (maybeDTSecretName.isDefined && maybeDTDataItem.isDefined) {
-          provideKerberosConfStep(kubernetesConf)
-        } else {
-          provideHadoopSparkUserStep(kubernetesConf)
-        }
-      Seq(provideHadoopConfStep(kubernetesConf)) :+
-        maybeKerberosStep
-    }.getOrElse(Seq.empty[KubernetesFeatureConfigStep])
-
-    val allFeatures: Seq[KubernetesFeatureConfigStep] =
-      baseFeatures ++
-      secretFeature ++
-      secretEnvFeature ++
-      volumesFeature ++
-      maybeHadoopConfFeatureSteps
-
-    var executorPod = provideInitialPod()
-    for (feature <- allFeatures) {
-      executorPod = feature.configurePod(executorPod)
-    }
-    executorPod
+      conf: KubernetesExecutorConf,
+      secMgr: SecurityManager,
+      client: KubernetesClient): SparkPod = {
+    val initialPod = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
+      .map { file =>
+        KubernetesUtils.loadPodFromTemplate(
+          client,
+          new File(file),
+          conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
+      }
+      .getOrElse(SparkPod.initialPod())
+
+    val features = Seq(
+      new BasicExecutorFeatureStep(conf, secMgr),
+      new MountSecretsFeatureStep(conf),
+      new EnvSecretsFeatureStep(conf),
+      new LocalDirsFeatureStep(conf),
+      new MountVolumesFeatureStep(conf),
+      new HadoopConfExecutorFeatureStep(conf),
+      new KerberosConfExecutorFeatureStep(conf),
+      new HadoopSparkUserExecutorFeatureStep(conf))
+
+    features.foldLeft(initialPod) { case (pod, feature) => 
feature.configurePod(pod) }
   }
-}
 
-private[spark] object KubernetesExecutorBuilder {
-  def apply(kubernetesClient: KubernetesClient, conf: SparkConf): 
KubernetesExecutorBuilder = {
-    conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
-      .map(new File(_))
-      .map(file => new KubernetesExecutorBuilder(provideInitialPod = () =>
-          KubernetesUtils.loadPodFromTemplate(
-            kubernetesClient,
-            file,
-            conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
-      ))
-      .getOrElse(new KubernetesExecutorBuilder())
-  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
new file mode 100644
index 0000000000000..7dde0c1377168
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+
+import io.fabric8.kubernetes.api.model.{Config => _, _}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{mock, never, verify, when}
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.internal.config.ConfigEntry
+
+abstract class PodBuilderSuite extends SparkFunSuite {
+
+  protected def templateFileConf: ConfigEntry[_]
+
+  protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): 
SparkPod
+
+  private val baseConf = new SparkConf(false)
+    .set(Config.CONTAINER_IMAGE, "spark-executor:latest")
+
+  test("use empty initial pod if template is not specified") {
+    val client = mock(classOf[KubernetesClient])
+    buildPod(baseConf.clone(), client)
+    verify(client, never()).pods()
+  }
+
+  test("load pod template if specified") {
+    val client = mockKubernetesClient()
+    val sparkConf = baseConf.clone().set(templateFileConf.key, 
"template-file.yaml")
+    val pod = buildPod(sparkConf, client)
+    verifyPod(pod)
+  }
+
+  test("complain about misconfigured pod template") {
+    val client = mockKubernetesClient(
+      new PodBuilder()
+        .withNewMetadata()
+        .addToLabels("test-label-key", "test-label-value")
+        .endMetadata()
+        .build())
+    val sparkConf = baseConf.clone().set(templateFileConf.key, 
"template-file.yaml")
+    val exception = intercept[SparkException] {
+      buildPod(sparkConf, client)
+    }
+    assert(exception.getMessage.contains("Could not load pod from template 
file."))
+  }
+
+  private def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): 
KubernetesClient = {
+    val kubernetesClient = mock(classOf[KubernetesClient])
+    val pods =
+      mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, 
DoneablePod]]])
+    val podResource = mock(classOf[PodResource[Pod, DoneablePod]])
+    when(kubernetesClient.pods()).thenReturn(pods)
+    when(pods.load(any(classOf[File]))).thenReturn(podResource)
+    when(podResource.get()).thenReturn(pod)
+    kubernetesClient
+  }
+
+  private def verifyPod(pod: SparkPod): Unit = {
+    val metadata = pod.pod.getMetadata
+    assert(metadata.getLabels.containsKey("test-label-key"))
+    assert(metadata.getAnnotations.containsKey("test-annotation-key"))
+    assert(metadata.getNamespace === "namespace")
+    assert(metadata.getOwnerReferences.asScala.exists(_.getName == 
"owner-reference"))
+    val spec = pod.pod.getSpec
+    assert(!spec.getContainers.asScala.exists(_.getName == 
"executor-container"))
+    assert(spec.getDnsPolicy === "dns-policy")
+    assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ 
== "hostname")))
+    assert(spec.getImagePullSecrets.asScala.exists(_.getName == 
"local-reference"))
+    assert(spec.getInitContainers.asScala.exists(_.getName == 
"init-container"))
+    assert(spec.getNodeName == "node-name")
+    assert(spec.getNodeSelector.get("node-selector-key") === 
"node-selector-value")
+    assert(spec.getSchedulerName === "scheduler")
+    assert(spec.getSecurityContext.getRunAsUser === 1000L)
+    assert(spec.getServiceAccount === "service-account")
+    assert(spec.getSubdomain === "subdomain")
+    assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key"))
+    assert(spec.getVolumes.asScala.exists(_.getName == "test-volume"))
+    val container = pod.container
+    assert(container.getName === "executor-container")
+    assert(container.getArgs.contains("arg"))
+    assert(container.getCommand.equals(List("command").asJava))
+    assert(container.getEnv.asScala.exists(_.getName == "env-key"))
+    assert(container.getResources.getLimits.get("gpu") ===
+      new QuantityBuilder().withAmount("1").build())
+    assert(container.getSecurityContext.getRunAsNonRoot)
+    assert(container.getStdin)
+    assert(container.getTerminationMessagePath === "termination-message-path")
+    assert(container.getTerminationMessagePolicy === 
"termination-message-policy")
+    assert(pod.container.getVolumeMounts.asScala.exists(_.getName == 
"test-volume"))
+  }
+
+  private def podWithSupportedFeatures(): Pod = {
+    new PodBuilder()
+      .withNewMetadata()
+        .addToLabels("test-label-key", "test-label-value")
+        .addToAnnotations("test-annotation-key", "test-annotation-value")
+        .withNamespace("namespace")
+        .addNewOwnerReference()
+          .withController(true)
+          .withName("owner-reference")
+          .endOwnerReference()
+        .endMetadata()
+      .withNewSpec()
+        .withDnsPolicy("dns-policy")
+        .withHostAliases(new 
HostAliasBuilder().withHostnames("hostname").build())
+        .withImagePullSecrets(
+          new 
LocalObjectReferenceBuilder().withName("local-reference").build())
+        .withInitContainers(new 
ContainerBuilder().withName("init-container").build())
+        .withNodeName("node-name")
+        .withNodeSelector(Map("node-selector-key" -> 
"node-selector-value").asJava)
+        .withSchedulerName("scheduler")
+        .withNewSecurityContext()
+          .withRunAsUser(1000L)
+          .endSecurityContext()
+        .withServiceAccount("service-account")
+        .withSubdomain("subdomain")
+        .withTolerations(new TolerationBuilder()
+          .withKey("toleration-key")
+          .withOperator("Equal")
+          .withEffect("NoSchedule")
+          .build())
+        .addNewVolume()
+          .withNewHostPath()
+          .withPath("/test")
+          .endHostPath()
+          .withName("test-volume")
+          .endVolume()
+        .addNewContainer()
+          .withArgs("arg")
+          .withCommand("command")
+          .addNewEnv()
+            .withName("env-key")
+            .withValue("env-value")
+            .endEnv()
+          .withImagePullPolicy("Always")
+          .withName("executor-container")
+          .withNewResources()
+            .withLimits(Map("gpu" -> new 
QuantityBuilder().withAmount("1").build()).asJava)
+            .endResources()
+          .withNewSecurityContext()
+            .withRunAsNonRoot(true)
+            .endSecurityContext()
+          .withStdin(true)
+          .withTerminationMessagePath("termination-message-path")
+          .withTerminationMessagePolicy("termination-message-policy")
+          .addToVolumeMounts(
+            new VolumeMountBuilder()
+              .withName("test-volume")
+              .withMountPath("/test")
+              .build())
+          .endContainer()
+        .endSpec()
+      .build()
+  }
+
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
index 7295b82ca4799..5e7388dc8e672 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
@@ -20,25 +20,32 @@ import java.io.{File, PrintWriter}
 import java.nio.file.Files
 
 import io.fabric8.kubernetes.api.model.ConfigMap
-import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
 
-class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter {
-  private var kubernetesConf : KubernetesConf = _
-  private var templateFile: File = _
+class PodTemplateConfigMapStepSuite extends SparkFunSuite {
 
-  before {
-    templateFile = Files.createTempFile("pod-template", "yml").toFile
+  test("Do nothing when executor template is not specified") {
+    val conf = KubernetesTestConf.createDriverConf()
+    val step = new PodTemplateConfigMapStep(conf)
+
+    val initialPod = SparkPod.initialPod()
+    val configuredPod = step.configurePod(initialPod)
+    assert(configuredPod === initialPod)
+
+    assert(step.getAdditionalKubernetesResources().isEmpty)
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+  }
+
+  test("Mounts executor template volume if config specified") {
+    val templateFile = Files.createTempFile("pod-template", "yml").toFile
     templateFile.deleteOnExit()
 
     val sparkConf = new SparkConf(false)
       .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, 
templateFile.getAbsolutePath)
-    kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
-  }
+    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf)
 
-  test("Mounts executor template volume if config specified") {
     val writer = new PrintWriter(templateFile)
     writer.write("pod-template-contents")
     writer.close()
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index e9c05fef6f5db..1bb926cbca23d 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -126,7 +126,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter 
{
     MockitoAnnotations.initMocks(this)
     kconf = KubernetesTestConf.createDriverConf(
       resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
-    
when(driverBuilder.buildFromFeatures(kconf)).thenReturn(BUILT_KUBERNETES_SPEC)
+    when(driverBuilder.buildFromFeatures(kconf, 
kubernetesClient)).thenReturn(BUILT_KUBERNETES_SPEC)
     when(kubernetesClient.pods()).thenReturn(podOperations)
     when(podOperations.withName(POD_NAME)).thenReturn(namedPods)
 
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index 7e7dc4763c2e7..6518c91a1a1fd 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -16,201 +16,21 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import io.fabric8.kubernetes.api.model.PodBuilder
 import io.fabric8.kubernetes.client.KubernetesClient
-import org.mockito.Mockito._
 
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s._
-import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, 
KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE}
-import org.apache.spark.deploy.k8s.features._
+import org.apache.spark.internal.config.ConfigEntry
 
-class KubernetesDriverBuilderSuite extends SparkFunSuite {
+class KubernetesDriverBuilderSuite extends PodBuilderSuite {
 
-  private val BASIC_STEP_TYPE = "basic"
-  private val CREDENTIALS_STEP_TYPE = "credentials"
-  private val SERVICE_STEP_TYPE = "service"
-  private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
-  private val SECRETS_STEP_TYPE = "mount-secrets"
-  private val DRIVER_CMD_STEP_TYPE = "driver-command"
-  private val ENV_SECRETS_STEP_TYPE = "env-secrets"
-  private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global"
-  private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
-  private val TEMPLATE_VOLUME_STEP_TYPE = "template-volume"
-
-  private val basicFeatureStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep])
-
-  private val credentialsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    CREDENTIALS_STEP_TYPE, classOf[DriverKubernetesCredentialsFeatureStep])
-
-  private val serviceStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    SERVICE_STEP_TYPE, classOf[DriverServiceFeatureStep])
-
-  private val localDirsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
-
-  private val secretsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
-
-  private val driverCommandStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    DRIVER_CMD_STEP_TYPE, classOf[DriverCommandFeatureStep])
-
-  private val envSecretsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
-
-  private val hadoopGlobalStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    HADOOP_GLOBAL_STEP_TYPE, classOf[KerberosConfDriverFeatureStep])
-
-  private val mountVolumesStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
-
-  private val templateVolumeStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    TEMPLATE_VOLUME_STEP_TYPE, classOf[PodTemplateConfigMapStep]
-  )
-
-  private val builderUnderTest: KubernetesDriverBuilder =
-    new KubernetesDriverBuilder(
-      _ => basicFeatureStep,
-      _ => credentialsStep,
-      _ => serviceStep,
-      _ => secretsStep,
-      _ => envSecretsStep,
-      _ => localDirsStep,
-      _ => mountVolumesStep,
-      _ => driverCommandStep,
-      _ => hadoopGlobalStep,
-      _ => templateVolumeStep)
-
-  test("Apply fundamental steps all the time.") {
-    val conf = KubernetesTestConf.createDriverConf()
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf),
-      BASIC_STEP_TYPE,
-      CREDENTIALS_STEP_TYPE,
-      SERVICE_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE,
-      DRIVER_CMD_STEP_TYPE,
-      HADOOP_GLOBAL_STEP_TYPE)
+  override protected def templateFileConf: ConfigEntry[_] = {
+    Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE
   }
 
-  test("Apply secrets step if secrets are present.") {
-    val conf = KubernetesTestConf.createDriverConf(
-      secretEnvNamesToKeyRefs = Map("EnvName" -> "SecretName:secretKey"),
-      secretNamesToMountPaths = Map("secret" -> "secretMountPath"))
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf),
-      BASIC_STEP_TYPE,
-      CREDENTIALS_STEP_TYPE,
-      SERVICE_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE,
-      SECRETS_STEP_TYPE,
-      ENV_SECRETS_STEP_TYPE,
-      DRIVER_CMD_STEP_TYPE,
-      HADOOP_GLOBAL_STEP_TYPE)
-  }
-
-  test("Apply volumes step if mounts are present.") {
-    val volumeSpec = KubernetesVolumeSpec(
-      "volume",
-      "/tmp",
-      "",
-      false,
-      KubernetesHostPathVolumeConf("/path"))
-    val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec))
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf),
-      BASIC_STEP_TYPE,
-      CREDENTIALS_STEP_TYPE,
-      SERVICE_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE,
-      MOUNT_VOLUMES_STEP_TYPE,
-      DRIVER_CMD_STEP_TYPE,
-      HADOOP_GLOBAL_STEP_TYPE)
-  }
-
-  test("Apply volumes step if a mount subpath is present.") {
-    val volumeSpec = KubernetesVolumeSpec(
-      "volume",
-      "/tmp",
-      "foo",
-      false,
-      KubernetesHostPathVolumeConf("/path"))
-    val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec))
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf),
-      BASIC_STEP_TYPE,
-      CREDENTIALS_STEP_TYPE,
-      SERVICE_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE,
-      MOUNT_VOLUMES_STEP_TYPE,
-      DRIVER_CMD_STEP_TYPE,
-      HADOOP_GLOBAL_STEP_TYPE)
-  }
-
-  test("Apply template volume step if executor template is present.") {
-    val sparkConf = new SparkConf(false)
-      .set(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "filename")
+  override protected def buildPod(sparkConf: SparkConf, client: 
KubernetesClient): SparkPod = {
     val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf),
-      BASIC_STEP_TYPE,
-      CREDENTIALS_STEP_TYPE,
-      SERVICE_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE,
-      DRIVER_CMD_STEP_TYPE,
-      HADOOP_GLOBAL_STEP_TYPE,
-      TEMPLATE_VOLUME_STEP_TYPE)
-  }
-
-  private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, 
stepTypes: String*)
-  : Unit = {
-    val addedProperties = resolvedSpec.systemProperties
-      .filter { case (k, _) => !k.startsWith("spark.") }
-      .toMap
-    assert(addedProperties.keys.toSet === stepTypes.toSet)
-    stepTypes.foreach { stepType =>
-      assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === 
stepType)
-      assert(resolvedSpec.driverKubernetesResources.containsSlice(
-        KubernetesFeaturesTestUtils.getSecretsForStepType(stepType)))
-      assert(resolvedSpec.systemProperties(stepType) === stepType)
-    }
-  }
-
-  test("Start with empty pod if template is not specified") {
-    val kubernetesClient = mock(classOf[KubernetesClient])
-    val driverBuilder = KubernetesDriverBuilder.apply(kubernetesClient, new 
SparkConf())
-    verify(kubernetesClient, never()).pods()
+    new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod
   }
 
-  test("Starts with template if specified") {
-    val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient()
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
-      .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")
-    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf)
-    val driverSpec = KubernetesDriverBuilder
-      .apply(kubernetesClient, sparkConf)
-      .buildFromFeatures(kubernetesConf)
-    PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(driverSpec.pod)
-  }
-
-  test("Throws on misconfigured pod template") {
-    val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient(
-      new PodBuilder()
-        .withNewMetadata()
-        .addToLabels("test-label-key", "test-label-value")
-        .endMetadata()
-        .build())
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
-      .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")
-    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf)
-    val exception = intercept[SparkException] {
-      KubernetesDriverBuilder
-        .apply(kubernetesClient, sparkConf)
-        .buildFromFeatures(kubernetesConf)
-    }
-    assert(exception.getMessage.contains("Could not load pod from template 
file."))
-  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala
deleted file mode 100644
index c92e9e6e3b6b3..0000000000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import java.io.File
-
-import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource}
-import org.mockito.Matchers.any
-import org.mockito.Mockito.{mock, when}
-import org.scalatest.FlatSpec
-import scala.collection.JavaConverters._
-
-import org.apache.spark.deploy.k8s.SparkPod
-
-object PodBuilderSuiteUtils extends FlatSpec {
-
-  def loadingMockKubernetesClient(pod: Pod = podWithSupportedFeatures()): 
KubernetesClient = {
-    val kubernetesClient = mock(classOf[KubernetesClient])
-    val pods =
-      mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, 
DoneablePod]]])
-    val podResource = mock(classOf[PodResource[Pod, DoneablePod]])
-    when(kubernetesClient.pods()).thenReturn(pods)
-    when(pods.load(any(classOf[File]))).thenReturn(podResource)
-    when(podResource.get()).thenReturn(pod)
-    kubernetesClient
-  }
-
-  def verifyPodWithSupportedFeatures(pod: SparkPod): Unit = {
-    val metadata = pod.pod.getMetadata
-    assert(metadata.getLabels.containsKey("test-label-key"))
-    assert(metadata.getAnnotations.containsKey("test-annotation-key"))
-    assert(metadata.getNamespace === "namespace")
-    assert(metadata.getOwnerReferences.asScala.exists(_.getName == 
"owner-reference"))
-    val spec = pod.pod.getSpec
-    assert(!spec.getContainers.asScala.exists(_.getName == 
"executor-container"))
-    assert(spec.getDnsPolicy === "dns-policy")
-    assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ 
== "hostname")))
-    assert(spec.getImagePullSecrets.asScala.exists(_.getName == 
"local-reference"))
-    assert(spec.getInitContainers.asScala.exists(_.getName == 
"init-container"))
-    assert(spec.getNodeName == "node-name")
-    assert(spec.getNodeSelector.get("node-selector-key") === 
"node-selector-value")
-    assert(spec.getSchedulerName === "scheduler")
-    assert(spec.getSecurityContext.getRunAsUser === 1000L)
-    assert(spec.getServiceAccount === "service-account")
-    assert(spec.getSubdomain === "subdomain")
-    assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key"))
-    assert(spec.getVolumes.asScala.exists(_.getName == "test-volume"))
-    val container = pod.container
-    assert(container.getName === "executor-container")
-    assert(container.getArgs.contains("arg"))
-    assert(container.getCommand.equals(List("command").asJava))
-    assert(container.getEnv.asScala.exists(_.getName == "env-key"))
-    assert(container.getResources.getLimits.get("gpu") ===
-      new QuantityBuilder().withAmount("1").build())
-    assert(container.getSecurityContext.getRunAsNonRoot)
-    assert(container.getStdin)
-    assert(container.getTerminationMessagePath === "termination-message-path")
-    assert(container.getTerminationMessagePolicy === 
"termination-message-policy")
-    assert(pod.container.getVolumeMounts.asScala.exists(_.getName == 
"test-volume"))
-
-  }
-
-
-  def podWithSupportedFeatures(): Pod = new PodBuilder()
-        .withNewMetadata()
-          .addToLabels("test-label-key", "test-label-value")
-          .addToAnnotations("test-annotation-key", "test-annotation-value")
-          .withNamespace("namespace")
-          .addNewOwnerReference()
-            .withController(true)
-            .withName("owner-reference")
-            .endOwnerReference()
-          .endMetadata()
-        .withNewSpec()
-          .withDnsPolicy("dns-policy")
-          .withHostAliases(new 
HostAliasBuilder().withHostnames("hostname").build())
-          .withImagePullSecrets(
-            new 
LocalObjectReferenceBuilder().withName("local-reference").build())
-          .withInitContainers(new 
ContainerBuilder().withName("init-container").build())
-          .withNodeName("node-name")
-          .withNodeSelector(Map("node-selector-key" -> 
"node-selector-value").asJava)
-          .withSchedulerName("scheduler")
-          .withNewSecurityContext()
-            .withRunAsUser(1000L)
-            .endSecurityContext()
-          .withServiceAccount("service-account")
-          .withSubdomain("subdomain")
-          .withTolerations(new TolerationBuilder()
-            .withKey("toleration-key")
-            .withOperator("Equal")
-            .withEffect("NoSchedule")
-            .build())
-          .addNewVolume()
-            .withNewHostPath()
-            .withPath("/test")
-            .endHostPath()
-            .withName("test-volume")
-            .endVolume()
-          .addNewContainer()
-            .withArgs("arg")
-            .withCommand("command")
-            .addNewEnv()
-              .withName("env-key")
-              .withValue("env-value")
-              .endEnv()
-            .withImagePullPolicy("Always")
-            .withName("executor-container")
-            .withNewResources()
-              .withLimits(Map("gpu" -> new 
QuantityBuilder().withAmount("1").build()).asJava)
-              .endResources()
-            .withNewSecurityContext()
-              .withRunAsNonRoot(true)
-              .endSecurityContext()
-            .withStdin(true)
-            .withTerminationMessagePath("termination-message-path")
-            .withTerminationMessagePolicy("termination-message-policy")
-            .addToVolumeMounts(
-              new VolumeMountBuilder()
-                .withName("test-volume")
-                .withMountPath("/test")
-                .build())
-            .endContainer()
-          .endSpec()
-        .build()
-
-}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index d4fa31af3d5ce..278a3821a6f3d 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -80,8 +80,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     when(kubernetesClient.pods()).thenReturn(podOperations)
     when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
     when(driverPodOperations.get).thenReturn(driverPod)
-    
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), 
meq(secMgr)))
-      .thenAnswer(executorPodAnswer())
+    
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), 
meq(secMgr),
+      meq(kubernetesClient))).thenAnswer(executorPodAnswer())
     snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
     waitForExecutorPodsClock = new ManualClock(0L)
     podsAllocatorUnderTest = new ExecutorPodsAllocator(
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index ef521fd801e97..bd716174a8271 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -16,147 +16,23 @@
  */
 package org.apache.spark.scheduler.cluster.k8s
 
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{Config => _, _}
 import io.fabric8.kubernetes.client.KubernetesClient
-import org.mockito.Mockito.{mock, never, verify}
 
-import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.k8s._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features._
-import org.apache.spark.deploy.k8s.submit.PodBuilderSuiteUtils
-import org.apache.spark.util.SparkConfWithEnv
-
-class KubernetesExecutorBuilderSuite extends SparkFunSuite {
-  private val BASIC_STEP_TYPE = "basic"
-  private val SECRETS_STEP_TYPE = "mount-secrets"
-  private val ENV_SECRETS_STEP_TYPE = "env-secrets"
-  private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
-  private val HADOOP_CONF_STEP_TYPE = "hadoop-conf-step"
-  private val HADOOP_SPARK_USER_STEP_TYPE = "hadoop-spark-user"
-  private val KERBEROS_CONF_STEP_TYPE = "kerberos-step"
-  private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
-
-  private val secMgr = new SecurityManager(new SparkConf(false))
-
-  private val basicFeatureStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
-  private val mountSecretsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
-  private val envSecretsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
-  private val localDirsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
-  private val hadoopConfStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    HADOOP_CONF_STEP_TYPE, classOf[HadoopConfExecutorFeatureStep])
-  private val hadoopSparkUser = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    HADOOP_SPARK_USER_STEP_TYPE, classOf[HadoopSparkUserExecutorFeatureStep])
-  private val kerberosConf = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    KERBEROS_CONF_STEP_TYPE, classOf[KerberosConfExecutorFeatureStep])
-  private val mountVolumesStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
-    MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
+import org.apache.spark.internal.config.ConfigEntry
 
-  private val builderUnderTest = new KubernetesExecutorBuilder(
-    (_, _) => basicFeatureStep,
-    _ => mountSecretsStep,
-    _ => envSecretsStep,
-    _ => localDirsStep,
-    _ => mountVolumesStep,
-    _ => hadoopConfStep,
-    _ => kerberosConf,
-    _ => hadoopSparkUser)
-
-  test("Basic steps are consistently applied.") {
-    val conf = KubernetesTestConf.createExecutorConf()
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, 
LOCAL_DIRS_STEP_TYPE)
-  }
-
-  test("Apply secrets step if secrets are present.") {
-    val conf = KubernetesTestConf.createExecutorConf(
-      secretEnvNamesToKeyRefs = Map("secret-name" -> "secret-key"),
-      secretNamesToMountPaths = Map("secret" -> "secretMountPath"))
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf, secMgr),
-      BASIC_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE,
-      SECRETS_STEP_TYPE,
-      ENV_SECRETS_STEP_TYPE)
-  }
+class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
 
-  test("Apply volumes step if mounts are present.") {
-    val volumeSpec = KubernetesVolumeSpec(
-      "volume",
-      "/tmp",
-      "",
-      false,
-      KubernetesHostPathVolumeConf("/checkpoint"))
-    val conf = KubernetesTestConf.createExecutorConf(
-      volumes = Seq(volumeSpec))
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf, secMgr),
-      BASIC_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE,
-      MOUNT_VOLUMES_STEP_TYPE)
+  override protected def templateFileConf: ConfigEntry[_] = {
+    Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE
   }
 
-  test("Apply basicHadoop step if HADOOP_CONF_DIR is defined") {
-    // HADOOP_DELEGATION_TOKEN
-    val conf = KubernetesTestConf.createExecutorConf(
-      sparkConf = new SparkConfWithEnv(Map("HADOOP_CONF_DIR" -> 
"/var/hadoop-conf"))
-        .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name")
-        .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name"))
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf, secMgr),
-      BASIC_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE,
-      HADOOP_CONF_STEP_TYPE,
-      HADOOP_SPARK_USER_STEP_TYPE)
+  override protected def buildPod(sparkConf: SparkConf, client: 
KubernetesClient): SparkPod = {
+    sparkConf.set("spark.driver.host", "https://driver.host.com";)
+    val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
+    val secMgr = new SecurityManager(sparkConf)
+    new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client)
   }
 
-  test("Apply kerberos step if DT secrets created") {
-    val conf = KubernetesTestConf.createExecutorConf(
-      sparkConf = new SparkConf(false)
-        .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name")
-        .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name")
-        .set(KERBEROS_SPARK_USER_NAME, "spark-user")
-        .set(KERBEROS_DT_SECRET_NAME, "dt-secret")
-        .set(KERBEROS_DT_SECRET_KEY, "dt-key" ))
-    validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf, secMgr),
-      BASIC_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE,
-      HADOOP_CONF_STEP_TYPE,
-      KERBEROS_CONF_STEP_TYPE)
-  }
-
-  private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: 
String*): Unit = {
-    assert(resolvedPod.pod.getMetadata.getLabels.asScala.keys.toSet === 
stepTypes.toSet)
-  }
-
-  test("Starts with empty executor pod if template is not specified") {
-    val kubernetesClient = mock(classOf[KubernetesClient])
-    val executorBuilder = KubernetesExecutorBuilder.apply(kubernetesClient, 
new SparkConf())
-    verify(kubernetesClient, never()).pods()
-  }
-
-  test("Starts with executor template if specified") {
-    val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient()
-    val sparkConf = new SparkConf(false)
-      .set("spark.driver.host", "https://driver.host.com";)
-      .set(Config.CONTAINER_IMAGE, "spark-executor:latest")
-      .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "template-file.yaml")
-    val kubernetesConf = KubernetesTestConf.createExecutorConf(
-      sparkConf = sparkConf,
-      driverPod = Some(new PodBuilder()
-        .withNewMetadata()
-          .withName("driver")
-          .endMetadata()
-        .build()))
-    val sparkPod = KubernetesExecutorBuilder(kubernetesClient, sparkConf)
-      .buildFromFeatures(kubernetesConf, secMgr)
-    PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod)
-  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Put all feature-related code in the feature step itself
> -------------------------------------------------------
>
>                 Key: SPARK-25877
>                 URL: https://issues.apache.org/jira/browse/SPARK-25877
>             Project: Spark
>          Issue Type: Improvement
>          Components: Kubernetes
>    Affects Versions: 2.4.0
>            Reporter: Marcelo Vanzin
>            Priority: Major
>             Fix For: 3.0.0
>
>
> This is a child task of SPARK-25874. It covers having all the code related to 
> features in the feature steps themselves, including logic about whether a 
> step should be applied or not.
> Please refer to the parent bug for further details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to