Repository: spark
Updated Branches:
  refs/heads/master 4de638c19 -> f15906da1


http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index bf4ec04..6a50159 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -38,6 +38,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
   private val DRIVER_POD_UID = "pod-id"
   private val DRIVER_POD_API_VERSION = "v1"
   private val DRIVER_POD_KIND = "pod"
+  private val KUBERNETES_RESOURCE_PREFIX = "resource-example"
 
   private type ResourceList = 
NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
       HasMetadata, Boolean]
@@ -61,6 +62,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
   private val submissionSteps = Seq(FirstTestConfigurationStep, 
SecondTestConfigurationStep)
   private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _
   private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _
+  private var createdContainerArgumentCaptor: ArgumentCaptor[Container] = _
 
   before {
     MockitoAnnotations.initMocks(this)
@@ -94,7 +96,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
       kubernetesClient,
       false,
       "spark",
-      loggingPodStatusWatcher)
+      loggingPodStatusWatcher,
+      KUBERNETES_RESOURCE_PREFIX)
     submissionClient.run()
     val createdPod = createdPodArgumentCaptor.getValue
     assert(createdPod.getMetadata.getName === 
FirstTestConfigurationStep.podName)
@@ -108,62 +111,52 @@ class ClientSuite extends SparkFunSuite with 
BeforeAndAfter {
       SecondTestConfigurationStep.containerName)
   }
 
-  test("The client should create the secondary Kubernetes resources.") {
+  test("The client should create Kubernetes resources") {
+    val EXAMPLE_JAVA_OPTS = "-XX:+HeapDumpOnOutOfMemoryError 
-XX:+PrintGCDetails"
+    val EXPECTED_JAVA_OPTS = "-XX\\:+HeapDumpOnOutOfMemoryError 
-XX\\:+PrintGCDetails"
     val submissionClient = new Client(
       submissionSteps,
-      new SparkConf(false),
+      new SparkConf(false)
+        .set(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS, 
EXAMPLE_JAVA_OPTS),
       kubernetesClient,
       false,
       "spark",
-      loggingPodStatusWatcher)
+      loggingPodStatusWatcher,
+      KUBERNETES_RESOURCE_PREFIX)
     submissionClient.run()
     val createdPod = createdPodArgumentCaptor.getValue
     val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
-    assert(otherCreatedResources.size === 1)
-    val createdResource = 
Iterables.getOnlyElement(otherCreatedResources).asInstanceOf[Secret]
-    assert(createdResource.getMetadata.getName === 
FirstTestConfigurationStep.secretName)
-    assert(createdResource.getData.asScala ===
+    assert(otherCreatedResources.size === 2)
+    val secrets = otherCreatedResources.toArray
+      .filter(_.isInstanceOf[Secret]).map(_.asInstanceOf[Secret])
+    val configMaps = otherCreatedResources.toArray
+      .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap])
+    assert(secrets.nonEmpty)
+    val secret = secrets.head
+    assert(secret.getMetadata.getName === 
FirstTestConfigurationStep.secretName)
+    assert(secret.getData.asScala ===
       Map(FirstTestConfigurationStep.secretKey -> 
FirstTestConfigurationStep.secretData))
-    val ownerReference = 
Iterables.getOnlyElement(createdResource.getMetadata.getOwnerReferences)
+    val ownerReference = 
Iterables.getOnlyElement(secret.getMetadata.getOwnerReferences)
     assert(ownerReference.getName === createdPod.getMetadata.getName)
     assert(ownerReference.getKind === DRIVER_POD_KIND)
     assert(ownerReference.getUid === DRIVER_POD_UID)
     assert(ownerReference.getApiVersion === DRIVER_POD_API_VERSION)
-  }
-
-  test("The client should attach the driver container with the appropriate JVM 
options.") {
-    val sparkConf = new SparkConf(false)
-      .set("spark.logConf", "true")
-      .set(
-        org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS,
-          "-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails")
-    val submissionClient = new Client(
-      submissionSteps,
-      sparkConf,
-      kubernetesClient,
-      false,
-      "spark",
-      loggingPodStatusWatcher)
-    submissionClient.run()
-    val createdPod = createdPodArgumentCaptor.getValue
+    assert(configMaps.nonEmpty)
+    val configMap = configMaps.head
+    assert(configMap.getMetadata.getName ===
+      s"$KUBERNETES_RESOURCE_PREFIX-driver-conf-map")
+    assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME))
+    
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains(EXPECTED_JAVA_OPTS))
+    assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains(
+      "spark.custom-conf=custom-conf-value"))
     val driverContainer = 
Iterables.getOnlyElement(createdPod.getSpec.getContainers)
     assert(driverContainer.getName === 
SecondTestConfigurationStep.containerName)
-    val driverJvmOptsEnvs = driverContainer.getEnv.asScala.filter { env =>
-      env.getName.startsWith(ENV_JAVA_OPT_PREFIX)
-    }.sortBy(_.getName)
-    assert(driverJvmOptsEnvs.size === 4)
-
-    val expectedJvmOptsValues = Seq(
-      "-Dspark.logConf=true",
-      s"-D${SecondTestConfigurationStep.sparkConfKey}=" +
-        s"${SecondTestConfigurationStep.sparkConfValue}",
-      "-XX:+HeapDumpOnOutOfMemoryError",
-      "-XX:+PrintGCDetails")
-    driverJvmOptsEnvs.zip(expectedJvmOptsValues).zipWithIndex.foreach {
-      case ((resolvedEnv, expectedJvmOpt), index) =>
-        assert(resolvedEnv.getName === s"$ENV_JAVA_OPT_PREFIX$index")
-        assert(resolvedEnv.getValue === expectedJvmOpt)
-    }
+    val driverEnv = driverContainer.getEnv.asScala.head
+    assert(driverEnv.getName === ENV_SPARK_CONF_DIR)
+    assert(driverEnv.getValue === SPARK_CONF_DIR_INTERNAL)
+    val driverMount = driverContainer.getVolumeMounts.asScala.head
+    assert(driverMount.getName === SPARK_CONF_VOLUME)
+    assert(driverMount.getMountPath === SPARK_CONF_DIR_INTERNAL)
   }
 
   test("Waiting for app completion should stall on the watcher") {
@@ -173,7 +166,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter 
{
       kubernetesClient,
       true,
       "spark",
-      loggingPodStatusWatcher)
+      loggingPodStatusWatcher,
+      KUBERNETES_RESOURCE_PREFIX)
     submissionClient.run()
     verify(loggingPodStatusWatcher).awaitCompletion()
   }
@@ -209,13 +203,11 @@ private object FirstTestConfigurationStep extends 
DriverConfigurationStep {
 }
 
 private object SecondTestConfigurationStep extends DriverConfigurationStep {
-
   val annotationKey = "second-submit"
   val annotationValue = "submitted"
   val sparkConfKey = "spark.custom-conf"
   val sparkConfValue = "custom-conf-value"
   val containerName = "driverContainer"
-
   override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
     val modifiedPod = new PodBuilder(driverSpec.driverPod)
       .editMetadata()

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
index 033d303..df34d2d 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
@@ -25,7 +25,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
   private val DRIVER_IMAGE = "driver-image"
   private val IC_IMAGE = "init-container-image"
   private val APP_ID = "spark-app-id"
-  private val LAUNCH_TIME = 975256L
+  private val KUBERNETES_RESOURCE_PREFIX = "example-prefix"
   private val APP_NAME = "spark"
   private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
   private val APP_ARGS = Array("arg1", "arg2")
@@ -38,7 +38,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
     val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
     val orchestrator = new DriverConfigOrchestrator(
       APP_ID,
-      LAUNCH_TIME,
+      KUBERNETES_RESOURCE_PREFIX,
       Some(mainAppResource),
       APP_NAME,
       MAIN_CLASS,
@@ -49,15 +49,14 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
       classOf[BasicDriverConfigurationStep],
       classOf[DriverServiceBootstrapStep],
       classOf[DriverKubernetesCredentialsStep],
-      classOf[DependencyResolutionStep]
-    )
+      classOf[DependencyResolutionStep])
   }
 
   test("Base submission steps without a main app resource.") {
     val sparkConf = new SparkConf(false).set(CONTAINER_IMAGE, DRIVER_IMAGE)
     val orchestrator = new DriverConfigOrchestrator(
       APP_ID,
-      LAUNCH_TIME,
+      KUBERNETES_RESOURCE_PREFIX,
       Option.empty,
       APP_NAME,
       MAIN_CLASS,
@@ -67,31 +66,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
       orchestrator,
       classOf[BasicDriverConfigurationStep],
       classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep]
-    )
-  }
-
-  test("Submission steps with an init-container.") {
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, DRIVER_IMAGE)
-      .set(INIT_CONTAINER_IMAGE.key, IC_IMAGE)
-      .set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar")
-    val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
-    val orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      LAUNCH_TIME,
-      Some(mainAppResource),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    validateStepTypes(
-      orchestrator,
-      classOf[BasicDriverConfigurationStep],
-      classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep],
-      classOf[DependencyResolutionStep],
-      classOf[DriverInitContainerBootstrapStep])
+      classOf[DriverKubernetesCredentialsStep])
   }
 
   test("Submission steps with driver secrets to mount") {
@@ -102,7 +77,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
     val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
     val orchestrator = new DriverConfigOrchestrator(
       APP_ID,
-      LAUNCH_TIME,
+      KUBERNETES_RESOURCE_PREFIX,
       Some(mainAppResource),
       APP_NAME,
       MAIN_CLASS,
@@ -122,7 +97,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
       .set(CONTAINER_IMAGE, DRIVER_IMAGE)
     var orchestrator = new DriverConfigOrchestrator(
       APP_ID,
-      LAUNCH_TIME,
+      KUBERNETES_RESOURCE_PREFIX,
       Some(JavaMainAppResource("file:///var/apps/jars/main.jar")),
       APP_NAME,
       MAIN_CLASS,
@@ -135,7 +110,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
     sparkConf.set("spark.files", "/path/to/file1,/path/to/file2")
     orchestrator = new DriverConfigOrchestrator(
       APP_ID,
-      LAUNCH_TIME,
+      KUBERNETES_RESOURCE_PREFIX,
       Some(JavaMainAppResource("local:///var/apps/jars/main.jar")),
       APP_NAME,
       MAIN_CLASS,

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
index b136f2c..ce06853 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
@@ -73,16 +73,13 @@ class BasicDriverConfigurationStepSuite extends 
SparkFunSuite {
     assert(preparedDriverSpec.driverContainer.getImage === 
"spark-driver:latest")
     assert(preparedDriverSpec.driverContainer.getImagePullPolicy === 
CONTAINER_IMAGE_PULL_POLICY)
 
-    assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
+    assert(preparedDriverSpec.driverContainer.getEnv.size === 4)
     val envs = preparedDriverSpec.driverContainer
       .getEnv
       .asScala
       .map(env => (env.getName, env.getValue))
       .toMap
     assert(envs(ENV_CLASSPATH) === "/opt/spark/spark-examples.jar")
-    assert(envs(ENV_DRIVER_MEMORY) === "256M")
-    assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
-    assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2 \"arg 3\"")
     assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
     assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
 
@@ -112,7 +109,8 @@ class BasicDriverConfigurationStepSuite extends 
SparkFunSuite {
     val expectedSparkConf = Map(
       KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
       "spark.app.id" -> APP_ID,
-      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX)
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
+      "spark.kubernetes.submitInDriver" -> "true")
     assert(resolvedSparkConf === expectedSparkConf)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
index 991b03c..ca43fc9 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
@@ -29,24 +29,17 @@ import 
org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
 class DependencyResolutionStepSuite extends SparkFunSuite {
 
   private val SPARK_JARS = Seq(
-    "hdfs://localhost:9000/apps/jars/jar1.jar",
-    "file:///home/user/apps/jars/jar2.jar",
-    "local:///var/apps/jars/jar3.jar")
+    "apps/jars/jar1.jar",
+    "local:///var/apps/jars/jar2.jar")
 
   private val SPARK_FILES = Seq(
-    "file:///home/user/apps/files/file1.txt",
-    "hdfs://localhost:9000/apps/files/file2.txt",
-    "local:///var/apps/files/file3.txt")
-
-  private val JARS_DOWNLOAD_PATH = "/mnt/spark-data/jars"
-  private val FILES_DOWNLOAD_PATH = "/mnt/spark-data/files"
+    "apps/files/file1.txt",
+    "local:///var/apps/files/file2.txt")
 
   test("Added dependencies should be resolved in Spark configuration and 
environment") {
     val dependencyResolutionStep = new DependencyResolutionStep(
       SPARK_JARS,
-      SPARK_FILES,
-      JARS_DOWNLOAD_PATH,
-      FILES_DOWNLOAD_PATH)
+      SPARK_FILES)
     val driverPod = new PodBuilder().build()
     val baseDriverSpec = KubernetesDriverSpec(
       driverPod = driverPod,
@@ -58,24 +51,19 @@ class DependencyResolutionStepSuite extends SparkFunSuite {
     assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
     val resolvedSparkJars = 
preparedDriverSpec.driverSparkConf.get("spark.jars").split(",").toSet
     val expectedResolvedSparkJars = Set(
-      "hdfs://localhost:9000/apps/jars/jar1.jar",
-      s"$JARS_DOWNLOAD_PATH/jar2.jar",
-      "/var/apps/jars/jar3.jar")
+      "apps/jars/jar1.jar",
+      "/var/apps/jars/jar2.jar")
     assert(resolvedSparkJars === expectedResolvedSparkJars)
     val resolvedSparkFiles = 
preparedDriverSpec.driverSparkConf.get("spark.files").split(",").toSet
     val expectedResolvedSparkFiles = Set(
-      s"$FILES_DOWNLOAD_PATH/file1.txt",
-      s"hdfs://localhost:9000/apps/files/file2.txt",
-      s"/var/apps/files/file3.txt")
+      "apps/files/file1.txt",
+      "/var/apps/files/file2.txt")
     assert(resolvedSparkFiles === expectedResolvedSparkFiles)
     val driverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
     assert(driverEnv.size === 1)
     assert(driverEnv.head.getName === ENV_MOUNTED_CLASSPATH)
     val resolvedDriverClasspath = 
driverEnv.head.getValue.split(File.pathSeparator).toSet
-    val expectedResolvedDriverClasspath = Set(
-      s"$JARS_DOWNLOAD_PATH/jar1.jar",
-      s"$JARS_DOWNLOAD_PATH/jar2.jar",
-      "/var/apps/jars/jar3.jar")
+    val expectedResolvedDriverClasspath = expectedResolvedSparkJars
     assert(resolvedDriverClasspath === expectedResolvedDriverClasspath)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
deleted file mode 100644
index 758871e..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.StringReader
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-import com.google.common.collect.Maps
-import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, 
HasMetadata, PodBuilder, SecretBuilder}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import 
org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep,
 InitContainerSpec}
-import org.apache.spark.util.Utils
-
-class DriverInitContainerBootstrapStepSuite extends SparkFunSuite {
-
-  private val CONFIG_MAP_NAME = "spark-init-config-map"
-  private val CONFIG_MAP_KEY = "spark-init-config-map-key"
-
-  test("The init container bootstrap step should use all of the init container 
steps") {
-    val baseDriverSpec = KubernetesDriverSpec(
-      driverPod = new PodBuilder().build(),
-      driverContainer = new ContainerBuilder().build(),
-      driverSparkConf = new SparkConf(false),
-      otherKubernetesResources = Seq.empty[HasMetadata])
-    val initContainerSteps = Seq(
-      FirstTestInitContainerConfigurationStep,
-      SecondTestInitContainerConfigurationStep)
-    val bootstrapStep = new DriverInitContainerBootstrapStep(
-      initContainerSteps,
-      CONFIG_MAP_NAME,
-      CONFIG_MAP_KEY)
-
-    val preparedDriverSpec = bootstrapStep.configureDriver(baseDriverSpec)
-
-    assert(preparedDriverSpec.driverPod.getMetadata.getLabels.asScala ===
-      FirstTestInitContainerConfigurationStep.additionalLabels)
-    val additionalDriverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
-    assert(additionalDriverEnv.size === 1)
-    assert(additionalDriverEnv.head.getName ===
-      FirstTestInitContainerConfigurationStep.additionalMainContainerEnvKey)
-    assert(additionalDriverEnv.head.getValue ===
-      FirstTestInitContainerConfigurationStep.additionalMainContainerEnvValue)
-
-    assert(preparedDriverSpec.otherKubernetesResources.size === 2)
-    assert(preparedDriverSpec.otherKubernetesResources.contains(
-      FirstTestInitContainerConfigurationStep.additionalKubernetesResource))
-    assert(preparedDriverSpec.otherKubernetesResources.exists {
-      case configMap: ConfigMap =>
-        val hasMatchingName = configMap.getMetadata.getName == CONFIG_MAP_NAME
-        val configMapData = configMap.getData.asScala
-        val hasCorrectNumberOfEntries = configMapData.size == 1
-        val initContainerPropertiesRaw = configMapData(CONFIG_MAP_KEY)
-        val initContainerProperties = new Properties()
-        Utils.tryWithResource(new StringReader(initContainerPropertiesRaw)) {
-          initContainerProperties.load(_)
-        }
-        val initContainerPropertiesMap = 
Maps.fromProperties(initContainerProperties).asScala
-        val expectedInitContainerProperties = Map(
-          
SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyKey ->
-            
SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyValue)
-        val hasMatchingProperties = initContainerPropertiesMap == 
expectedInitContainerProperties
-        hasMatchingName && hasCorrectNumberOfEntries && hasMatchingProperties
-
-      case _ => false
-    })
-
-    val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers
-    assert(initContainers.size() === 1)
-    val initContainerEnv = initContainers.get(0).getEnv.asScala
-    assert(initContainerEnv.size === 1)
-    assert(initContainerEnv.head.getName ===
-      SecondTestInitContainerConfigurationStep.additionalInitContainerEnvKey)
-    assert(initContainerEnv.head.getValue ===
-      SecondTestInitContainerConfigurationStep.additionalInitContainerEnvValue)
-
-    val expectedSparkConf = Map(
-      INIT_CONTAINER_CONFIG_MAP_NAME.key -> CONFIG_MAP_NAME,
-      INIT_CONTAINER_CONFIG_MAP_KEY_CONF.key -> CONFIG_MAP_KEY,
-      SecondTestInitContainerConfigurationStep.additionalDriverSparkConfKey ->
-        
SecondTestInitContainerConfigurationStep.additionalDriverSparkConfValue)
-    assert(preparedDriverSpec.driverSparkConf.getAll.toMap === 
expectedSparkConf)
-  }
-}
-
-private object FirstTestInitContainerConfigurationStep extends 
InitContainerConfigurationStep {
-
-  val additionalLabels = Map("additionalLabelkey" -> "additionalLabelValue")
-  val additionalMainContainerEnvKey = "TEST_ENV_MAIN_KEY"
-  val additionalMainContainerEnvValue = "TEST_ENV_MAIN_VALUE"
-  val additionalKubernetesResource = new SecretBuilder()
-    .withNewMetadata()
-    .withName("test-secret")
-    .endMetadata()
-    .addToData("secret-key", "secret-value")
-    .build()
-
-  override def configureInitContainer(initContainerSpec: InitContainerSpec): 
InitContainerSpec = {
-    val driverPod = new PodBuilder(initContainerSpec.driverPod)
-      .editOrNewMetadata()
-      .addToLabels(additionalLabels.asJava)
-      .endMetadata()
-      .build()
-    val mainContainer = new ContainerBuilder(initContainerSpec.driverContainer)
-      .addNewEnv()
-      .withName(additionalMainContainerEnvKey)
-      .withValue(additionalMainContainerEnvValue)
-      .endEnv()
-      .build()
-    initContainerSpec.copy(
-      driverPod = driverPod,
-      driverContainer = mainContainer,
-      dependentResources = initContainerSpec.dependentResources ++
-        Seq(additionalKubernetesResource))
-  }
-}
-
-private object SecondTestInitContainerConfigurationStep extends 
InitContainerConfigurationStep {
-  val additionalInitContainerEnvKey = "TEST_ENV_INIT_KEY"
-  val additionalInitContainerEnvValue = "TEST_ENV_INIT_VALUE"
-  val additionalInitContainerPropertyKey = "spark.initcontainer.testkey"
-  val additionalInitContainerPropertyValue = "testvalue"
-  val additionalDriverSparkConfKey = "spark.driver.testkey"
-  val additionalDriverSparkConfValue = "spark.driver.testvalue"
-
-  override def configureInitContainer(initContainerSpec: InitContainerSpec): 
InitContainerSpec = {
-    val initContainer = new ContainerBuilder(initContainerSpec.initContainer)
-      .addNewEnv()
-      .withName(additionalInitContainerEnvKey)
-      .withValue(additionalInitContainerEnvValue)
-      .endEnv()
-      .build()
-    val initContainerProperties = initContainerSpec.properties ++
-      Map(additionalInitContainerPropertyKey -> 
additionalInitContainerPropertyValue)
-    val driverSparkConf = initContainerSpec.driverSparkConf ++
-      Map(additionalDriverSparkConfKey -> additionalDriverSparkConfValue)
-    initContainerSpec.copy(
-      initContainer = initContainer,
-      properties = initContainerProperties,
-      driverSparkConf = driverSparkConf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
deleted file mode 100644
index 4553f9f..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model._
-import org.mockito.{Mock, MockitoAnnotations}
-import org.mockito.Matchers.any
-import org.mockito.Mockito.when
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, 
PodWithDetachedInitContainer}
-import org.apache.spark.deploy.k8s.Config._
-
-class BasicInitContainerConfigurationStepSuite extends SparkFunSuite with 
BeforeAndAfter {
-
-  private val SPARK_JARS = Seq(
-    "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
-  private val SPARK_FILES = Seq(
-    "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
-  private val JARS_DOWNLOAD_PATH = "/var/data/jars"
-  private val FILES_DOWNLOAD_PATH = "/var/data/files"
-  private val POD_LABEL = Map("bootstrap" -> "true")
-  private val INIT_CONTAINER_NAME = "init-container"
-  private val DRIVER_CONTAINER_NAME = "driver-container"
-
-  @Mock
-  private var podAndInitContainerBootstrap : InitContainerBootstrap = _
-
-  before {
-    MockitoAnnotations.initMocks(this)
-    when(podAndInitContainerBootstrap.bootstrapInitContainer(
-      any[PodWithDetachedInitContainer])).thenAnswer(new 
Answer[PodWithDetachedInitContainer] {
-      override def answer(invocation: InvocationOnMock) : 
PodWithDetachedInitContainer = {
-        val pod = invocation.getArgumentAt(0, 
classOf[PodWithDetachedInitContainer])
-        pod.copy(
-          pod = new PodBuilder(pod.pod)
-            .withNewMetadata()
-            .addToLabels("bootstrap", "true")
-            .endMetadata()
-            .withNewSpec().endSpec()
-            .build(),
-          initContainer = new ContainerBuilder()
-            .withName(INIT_CONTAINER_NAME)
-            .build(),
-          mainContainer = new ContainerBuilder()
-            .withName(DRIVER_CONTAINER_NAME)
-            .build()
-        )}})
-  }
-
-  test("additionalDriverSparkConf with mix of remote files and jars") {
-    val baseInitStep = new BasicInitContainerConfigurationStep(
-      SPARK_JARS,
-      SPARK_FILES,
-      JARS_DOWNLOAD_PATH,
-      FILES_DOWNLOAD_PATH,
-      podAndInitContainerBootstrap)
-    val expectedDriverSparkConf = Map(
-      JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH,
-      FILES_DOWNLOAD_LOCATION.key -> FILES_DOWNLOAD_PATH,
-      INIT_CONTAINER_REMOTE_JARS.key -> 
"hdfs://localhost:9000/app/jars/jar1.jar",
-      INIT_CONTAINER_REMOTE_FILES.key -> 
"hdfs://localhost:9000/app/files/file1.txt")
-    val initContainerSpec = InitContainerSpec(
-      Map.empty[String, String],
-      Map.empty[String, String],
-      new Container(),
-      new Container(),
-      new Pod,
-      Seq.empty[HasMetadata])
-    val returnContainerSpec = 
baseInitStep.configureInitContainer(initContainerSpec)
-    assert(expectedDriverSparkConf === returnContainerSpec.properties)
-    assert(returnContainerSpec.initContainer.getName === INIT_CONTAINER_NAME)
-    assert(returnContainerSpec.driverContainer.getName === 
DRIVER_CONTAINER_NAME)
-    assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === 
POD_LABEL)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
deleted file mode 100644
index 09b42e4..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-
-class InitContainerConfigOrchestratorSuite extends SparkFunSuite {
-
-  private val DOCKER_IMAGE = "init-container"
-  private val SPARK_JARS = Seq(
-    "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
-  private val SPARK_FILES = Seq(
-    "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
-  private val JARS_DOWNLOAD_PATH = "/var/data/jars"
-  private val FILES_DOWNLOAD_PATH = "/var/data/files"
-  private val DOCKER_IMAGE_PULL_POLICY: String = "IfNotPresent"
-  private val CUSTOM_LABEL_KEY = "customLabel"
-  private val CUSTOM_LABEL_VALUE = "customLabelValue"
-  private val INIT_CONTAINER_CONFIG_MAP_NAME = "spark-init-config-map"
-  private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key"
-  private val SECRET_FOO = "foo"
-  private val SECRET_BAR = "bar"
-  private val SECRET_MOUNT_PATH = "/etc/secrets/init-container"
-
-  test("including basic configuration step") {
-    val sparkConf = new SparkConf(true)
-      .set(CONTAINER_IMAGE, DOCKER_IMAGE)
-      .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", 
CUSTOM_LABEL_VALUE)
-
-    val orchestrator = new InitContainerConfigOrchestrator(
-      SPARK_JARS.take(1),
-      SPARK_FILES,
-      JARS_DOWNLOAD_PATH,
-      FILES_DOWNLOAD_PATH,
-      DOCKER_IMAGE_PULL_POLICY,
-      INIT_CONTAINER_CONFIG_MAP_NAME,
-      INIT_CONTAINER_CONFIG_MAP_KEY,
-      sparkConf)
-    val initSteps = orchestrator.getAllConfigurationSteps
-    assert(initSteps.lengthCompare(1) == 0)
-    assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep])
-  }
-
-  test("including step to mount user-specified secrets") {
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, DOCKER_IMAGE)
-      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
-      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
-
-    val orchestrator = new InitContainerConfigOrchestrator(
-      SPARK_JARS.take(1),
-      SPARK_FILES,
-      JARS_DOWNLOAD_PATH,
-      FILES_DOWNLOAD_PATH,
-      DOCKER_IMAGE_PULL_POLICY,
-      INIT_CONTAINER_CONFIG_MAP_NAME,
-      INIT_CONTAINER_CONFIG_MAP_KEY,
-      sparkConf)
-    val initSteps = orchestrator.getAllConfigurationSteps
-    assert(initSteps.length === 2)
-    assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep])
-    assert(initSteps(1).isInstanceOf[InitContainerMountSecretsStep])
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
deleted file mode 100644
index 7ac0bde..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}
-
-class InitContainerMountSecretsStepSuite extends SparkFunSuite {
-
-  private val SECRET_FOO = "foo"
-  private val SECRET_BAR = "bar"
-  private val SECRET_MOUNT_PATH = "/etc/secrets/init-container"
-
-  test("mounts all given secrets") {
-    val baseInitContainerSpec = InitContainerSpec(
-      Map.empty,
-      Map.empty,
-      new ContainerBuilder().build(),
-      new ContainerBuilder().build(),
-      new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
-      Seq.empty)
-    val secretNamesToMountPaths = Map(
-      SECRET_FOO -> SECRET_MOUNT_PATH,
-      SECRET_BAR -> SECRET_MOUNT_PATH)
-
-    val mountSecretsBootstrap = new 
MountSecretsBootstrap(secretNamesToMountPaths)
-    val initContainerMountSecretsStep = new 
InitContainerMountSecretsStep(mountSecretsBootstrap)
-    val configuredInitContainerSpec = 
initContainerMountSecretsStep.configureInitContainer(
-      baseInitContainerSpec)
-    val initContainerWithSecretsMounted = 
configuredInitContainerSpec.initContainer
-
-    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
-      assert(SecretVolumeUtils.containerHasVolume(
-        initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
index a3c615b..7755b93 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
@@ -19,15 +19,13 @@ package org.apache.spark.scheduler.cluster.k8s
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model._
-import org.mockito.{AdditionalAnswers, MockitoAnnotations}
-import org.mockito.Matchers.any
-import org.mockito.Mockito._
+import org.mockito.MockitoAnnotations
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, 
MountSecretsBootstrap, PodWithDetachedInitContainer, SecretVolumeUtils}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.MountSecretsBootstrap
 
 class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with 
BeforeAndAfterEach {
 
@@ -55,10 +53,11 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
       .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
       .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
       .set(CONTAINER_IMAGE, executorImage)
+      .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
   }
 
   test("basic executor pod has reasonable defaults") {
-    val factory = new ExecutorPodFactory(baseConf, None, None, None)
+    val factory = new ExecutorPodFactory(baseConf, None)
     val executor = factory.createExecutorPod(
       "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
 
@@ -89,7 +88,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
     conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
       "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")
 
-    val factory = new ExecutorPodFactory(conf, None, None, None)
+    val factory = new ExecutorPodFactory(conf, None)
     val executor = factory.createExecutorPod(
       "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
 
@@ -101,7 +100,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
     conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
     conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
 
-    val factory = new ExecutorPodFactory(conf, None, None, None)
+    val factory = new ExecutorPodFactory(conf, None)
     val executor = factory.createExecutorPod(
       "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), 
driverPod, Map[String, Int]())
 
@@ -116,11 +115,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
     val conf = baseConf.clone()
 
     val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> 
"/var/secret1"))
-    val factory = new ExecutorPodFactory(
-      conf,
-      Some(secretsBootstrap),
-      None,
-      None)
+    val factory = new ExecutorPodFactory(conf, Some(secretsBootstrap))
     val executor = factory.createExecutorPod(
       "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
 
@@ -138,50 +133,6 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
     checkOwnerReferences(executor, driverPodUid)
   }
 
-  test("init-container bootstrap step adds an init container") {
-    val conf = baseConf.clone()
-    val initContainerBootstrap = mock(classOf[InitContainerBootstrap])
-    when(initContainerBootstrap.bootstrapInitContainer(
-      
any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
-
-    val factory = new ExecutorPodFactory(
-      conf,
-      None,
-      Some(initContainerBootstrap),
-      None)
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
-
-    assert(executor.getSpec.getInitContainers.size() === 1)
-    checkOwnerReferences(executor, driverPodUid)
-  }
-
-  test("init-container with secrets mount bootstrap") {
-    val conf = baseConf.clone()
-    val initContainerBootstrap = mock(classOf[InitContainerBootstrap])
-    when(initContainerBootstrap.bootstrapInitContainer(
-      
any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
-    val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> 
"/var/secret1"))
-
-    val factory = new ExecutorPodFactory(
-      conf,
-      Some(secretsBootstrap),
-      Some(initContainerBootstrap),
-      Some(secretsBootstrap))
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
-
-    assert(executor.getSpec.getVolumes.size() === 1)
-    assert(SecretVolumeUtils.podHasVolume(executor, "secret1-volume"))
-    assert(SecretVolumeUtils.containerHasVolume(
-      executor.getSpec.getContainers.get(0), "secret1-volume", "/var/secret1"))
-    assert(executor.getSpec.getInitContainers.size() === 1)
-    assert(SecretVolumeUtils.containerHasVolume(
-      executor.getSpec.getInitContainers.get(0), "secret1-volume", 
"/var/secret1"))
-
-    checkOwnerReferences(executor, driverPodUid)
-  }
-
   // There is always exactly one controller reference, and it points to the 
driver pod.
   private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit 
= {
     assert(executor.getMetadata.getOwnerReferences.size() === 1)
@@ -197,8 +148,8 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
       ENV_EXECUTOR_CORES -> "1",
       ENV_EXECUTOR_MEMORY -> "1g",
       ENV_APPLICATION_ID -> "dummy",
-      ENV_EXECUTOR_POD_IP -> null,
-      ENV_MOUNTED_CLASSPATH -> "/var/spark-data/spark-jars/*") ++ 
additionalEnvVars
+      ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
+      ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
 
     assert(executor.getSpec.getContainers.size() === 1)
     assert(executor.getSpec.getContainers.get(0).getEnv.size() === 
defaultEnvs.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
index 491b7cf..9badf85 100644
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
@@ -40,7 +40,6 @@ RUN set -ex && \
 COPY ${spark_jars} /opt/spark/jars
 COPY bin /opt/spark/bin
 COPY sbin /opt/spark/sbin
-COPY conf /opt/spark/conf
 COPY ${img_path}/spark/entrypoint.sh /opt/
 COPY examples /opt/spark/examples
 COPY data /opt/spark/data

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index d0cf284..3e16611 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -56,14 +56,10 @@ fi
 case "$SPARK_K8S_CMD" in
   driver)
     CMD=(
-      ${JAVA_HOME}/bin/java
-      "${SPARK_JAVA_OPTS[@]}"
-      -cp "$SPARK_CLASSPATH"
-      -Xms$SPARK_DRIVER_MEMORY
-      -Xmx$SPARK_DRIVER_MEMORY
-      -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS
-      $SPARK_DRIVER_CLASS
-      $SPARK_DRIVER_ARGS
+      "$SPARK_HOME/bin/spark-submit"
+      --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
+      --deploy-mode client
+      "$@"
     )
     ;;
 
@@ -83,14 +79,6 @@ case "$SPARK_K8S_CMD" in
     )
     ;;
 
-  init)
-    CMD=(
-      "$SPARK_HOME/bin/spark-class"
-      "org.apache.spark.deploy.k8s.SparkPodInitContainer"
-      "$@"
-    )
-    ;;
-
   *)
     echo "Unknown command: $SPARK_K8S_CMD" 1>&2
     exit 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to