Repository: spark
Updated Branches:
  refs/heads/master 32ec269d0 -> 171f6ddad


http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
new file mode 100644
index 0000000..c0e7bb2
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps.initcontainer
+
+import org.apache.spark.deploy.k8s.MountSecretsBootstrap
+
+/**
+ * An init-container configuration step for mounting user-specified secrets 
onto user-specified
+ * paths.
+ *
+ * @param bootstrap a utility actually handling mounting of the secrets
+ */
+private[spark] class InitContainerMountSecretsStep(
+    bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
+
+  override def configureInitContainer(spec: InitContainerSpec) : 
InitContainerSpec = {
+    val (driverPod, initContainer) = bootstrap.mountSecrets(
+      spec.driverPod,
+      spec.initContainer)
+    spec.copy(
+      driverPod = driverPod,
+      initContainer = initContainer
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
new file mode 100644
index 0000000..b52c343
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps.initcontainer
+
+import io.fabric8.kubernetes.api.model.{Container, HasMetadata, Pod}
+
+/**
+ * Represents a specification of the init-container for the driver pod.
+ *
+ * @param properties properties that should be set on the init-container
+ * @param driverSparkConf Spark configuration properties that will be carried 
back to the driver
+ * @param initContainer the init-container object
+ * @param driverContainer the driver container object
+ * @param driverPod the driver pod object
+ * @param dependentResources resources the init-container depends on to work
+ */
+private[spark] case class InitContainerSpec(
+    properties: Map[String, String],
+    driverSparkConf: Map[String, String],
+    initContainer: Container,
+    driverContainer: Container,
+    driverPod: Pod,
+    dependentResources: Seq[HasMetadata])

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
new file mode 100644
index 0000000..4a4b628
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.rest.k8s
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Process that fetches files from a resource staging server and/or arbitrary 
remote locations.
+ *
+ * The init-container can handle fetching files from any of those sources, but 
not all of the
+ * sources need to be specified. This allows for composing multiple instances 
of this container
+ * with different configurations for different download sources, or using the 
same container to
+ * download everything at once.
+ */
+private[spark] class SparkPodInitContainer(
+    sparkConf: SparkConf,
+    fileFetcher: FileFetcher) extends Logging {
+
+  private val maxThreadPoolSize = 
sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE)
+  private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("download-executor", 
maxThreadPoolSize))
+
+  private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION))
+  private val filesDownloadDir = new 
File(sparkConf.get(FILES_DOWNLOAD_LOCATION))
+
+  private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS)
+  private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES)
+
+  private val downloadTimeoutMinutes = 
sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
+
+  def run(): Unit = {
+    logInfo(s"Downloading remote jars: $remoteJars")
+    downloadFiles(
+      remoteJars,
+      jarsDownloadDir,
+      s"Remote jars download directory specified at $jarsDownloadDir does not 
exist " +
+        "or is not a directory.")
+
+    logInfo(s"Downloading remote files: $remoteFiles")
+    downloadFiles(
+      remoteFiles,
+      filesDownloadDir,
+      s"Remote files download directory specified at $filesDownloadDir does 
not exist " +
+        "or is not a directory.")
+
+    downloadExecutor.shutdown()
+    downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
+  }
+
+  private def downloadFiles(
+      filesCommaSeparated: Option[String],
+      downloadDir: File,
+      errMessage: String): Unit = {
+    filesCommaSeparated.foreach { files =>
+      require(downloadDir.isDirectory, errMessage)
+      Utils.stringToSeq(files).foreach { file =>
+        Future[Unit] {
+          fileFetcher.fetchFile(file, downloadDir)
+        }
+      }
+    }
+  }
+}
+
+private class FileFetcher(sparkConf: SparkConf, securityManager: 
SparkSecurityManager) {
+
+  def fetchFile(uri: String, targetDir: File): Unit = {
+    Utils.fetchFile(
+      url = uri,
+      targetDir = targetDir,
+      conf = sparkConf,
+      securityMgr = securityManager,
+      hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf),
+      timestamp = System.currentTimeMillis(),
+      useCache = false)
+  }
+}
+
+object SparkPodInitContainer extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    logInfo("Starting init-container to download Spark application 
dependencies.")
+    val sparkConf = new SparkConf(true)
+    if (args.nonEmpty) {
+      Utils.loadDefaultSparkProperties(sparkConf, args(0))
+    }
+
+    val securityManager = new SparkSecurityManager(sparkConf)
+    val fileFetcher = new FileFetcher(sparkConf, securityManager)
+    new SparkPodInitContainer(sparkConf, fileFetcher).run()
+    logInfo("Finished downloading application dependencies.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
index 7022615..ba5d891 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -21,35 +21,35 @@ import scala.collection.JavaConverters._
 import io.fabric8.kubernetes.api.model._
 
 import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, 
MountSecretsBootstrap, PodWithDetachedInitContainer}
 import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.ConfigurationUtils
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
 import org.apache.spark.util.Utils
 
 /**
- * A factory class for configuring and creating executor pods.
+ * A factory class for bootstrapping and creating executor pods with the given 
bootstrapping
+ * components.
+ *
+ * @param sparkConf Spark configuration
+ * @param mountSecretsBootstrap an optional component for mounting 
user-specified secrets onto
+ *                              user-specified paths into the executor 
container
+ * @param initContainerBootstrap an optional component for bootstrapping the 
executor init-container
+ *                               if one is needed, i.e., when there are remote 
dependencies to
+ *                               localize
+ * @param initContainerMountSecretsBootstrap an optional component for 
mounting user-specified
+ *                                           secrets onto user-specified paths 
into the executor
+ *                                           init-container
  */
-private[spark] trait ExecutorPodFactory {
-
-  /**
-   * Configure and construct an executor pod with the given parameters.
-   */
-  def createExecutorPod(
-      executorId: String,
-      applicationId: String,
-      driverUrl: String,
-      executorEnvs: Seq[(String, String)],
-      driverPod: Pod,
-      nodeToLocalTaskCount: Map[String, Int]): Pod
-}
-
-private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
-  extends ExecutorPodFactory {
+private[spark] class ExecutorPodFactory(
+    sparkConf: SparkConf,
+    mountSecretsBootstrap: Option[MountSecretsBootstrap],
+    initContainerBootstrap: Option[InitContainerBootstrap],
+    initContainerMountSecretsBootstrap: Option[MountSecretsBootstrap]) {
 
   private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
 
-  private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
+  private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
     sparkConf,
     KUBERNETES_EXECUTOR_LABEL_PREFIX)
   require(
@@ -64,11 +64,11 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: 
SparkConf)
     s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
 
   private val executorAnnotations =
-    ConfigurationUtils.parsePrefixedKeyValuePairs(
+    KubernetesUtils.parsePrefixedKeyValuePairs(
       sparkConf,
       KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
   private val nodeSelector =
-    ConfigurationUtils.parsePrefixedKeyValuePairs(
+    KubernetesUtils.parsePrefixedKeyValuePairs(
       sparkConf,
       KUBERNETES_NODE_SELECTOR_PREFIX)
 
@@ -94,7 +94,10 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: 
SparkConf)
   private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
   private val executorLimitCores = 
sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
 
-  override def createExecutorPod(
+  /**
+   * Configure and construct an executor pod with the given parameters.
+   */
+  def createExecutorPod(
       executorId: String,
       applicationId: String,
       driverUrl: String,
@@ -198,7 +201,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: 
SparkConf)
         .endSpec()
       .build()
 
-    val containerWithExecutorLimitCores = executorLimitCores.map { limitCores 
=>
+    val containerWithLimitCores = executorLimitCores.map { limitCores =>
       val executorCpuLimitQuantity = new QuantityBuilder(false)
         .withAmount(limitCores)
         .build()
@@ -209,9 +212,33 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: 
SparkConf)
         .build()
     }.getOrElse(executorContainer)
 
-    new PodBuilder(executorPod)
+    val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
+      mountSecretsBootstrap.map { bootstrap =>
+        bootstrap.mountSecrets(executorPod, containerWithLimitCores)
+      }.getOrElse((executorPod, containerWithLimitCores))
+
+    val (bootstrappedPod, bootstrappedContainer) =
+      initContainerBootstrap.map { bootstrap =>
+        val podWithInitContainer = bootstrap.bootstrapInitContainer(
+          PodWithDetachedInitContainer(
+            maybeSecretsMountedPod,
+            new ContainerBuilder().build(),
+            maybeSecretsMountedContainer))
+
+        val (pod, mayBeSecretsMountedInitContainer) =
+          initContainerMountSecretsBootstrap.map { bootstrap =>
+            bootstrap.mountSecrets(podWithInitContainer.pod, 
podWithInitContainer.initContainer)
+          }.getOrElse((podWithInitContainer.pod, 
podWithInitContainer.initContainer))
+
+        val bootstrappedPod = KubernetesUtils.appendInitContainer(
+          pod, mayBeSecretsMountedInitContainer)
+
+        (bootstrappedPod, podWithInitContainer.mainContainer)
+      }.getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer))
+
+    new PodBuilder(bootstrappedPod)
       .editSpec()
-        .addToContainers(containerWithExecutorLimitCores)
+        .addToContainers(bootstrappedContainer)
         .endSpec()
       .build()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index b8bb152..a942db6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,9 +21,9 @@ import java.io.File
 import io.fabric8.kubernetes.client.Config
 
 import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, 
MountSecretsBootstrap, SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, 
TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.util.ThreadUtils
@@ -45,6 +45,59 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
       masterURL: String,
       scheduler: TaskScheduler): SchedulerBackend = {
     val sparkConf = sc.getConf
+    val initContainerConfigMap = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_NAME)
+    val initContainerConfigMapKey = 
sparkConf.get(INIT_CONTAINER_CONFIG_MAP_KEY_CONF)
+
+    if (initContainerConfigMap.isEmpty) {
+      logWarning("The executor's init-container config map is not specified. 
Executors will " +
+        "therefore not attempt to fetch remote or submitted dependencies.")
+    }
+
+    if (initContainerConfigMapKey.isEmpty) {
+      logWarning("The executor's init-container config map key is not 
specified. Executors will " +
+        "therefore not attempt to fetch remote or submitted dependencies.")
+    }
+
+    // Only set up the bootstrap if they've provided both the config map key 
and the config map
+    // name. The config map might not be provided if init-containers aren't 
being used to
+    // bootstrap dependencies.
+    val initContainerBootstrap = for {
+      configMap <- initContainerConfigMap
+      configMapKey <- initContainerConfigMapKey
+    } yield {
+      val initContainerImage = sparkConf
+        .get(INIT_CONTAINER_IMAGE)
+        .getOrElse(throw new SparkException(
+          "Must specify the init-container image when there are remote 
dependencies"))
+      new InitContainerBootstrap(
+        initContainerImage,
+        sparkConf.get(CONTAINER_IMAGE_PULL_POLICY),
+        sparkConf.get(JARS_DOWNLOAD_LOCATION),
+        sparkConf.get(FILES_DOWNLOAD_LOCATION),
+        configMap,
+        configMapKey,
+        SPARK_POD_EXECUTOR_ROLE,
+        sparkConf)
+    }
+
+    val executorSecretNamesToMountPaths = 
KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
+    val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
+      Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
+    } else {
+      None
+    }
+    // Mount user-specified executor secrets also into the executor's 
init-container. The
+    // init-container may need credentials in the secrets to be able to 
download remote
+    // dependencies. The executor's main container and its init-container 
share the secrets
+    // because the init-container is sort of an implementation details and 
this sharing
+    // avoids introducing a dedicated configuration property just for the 
init-container.
+    val initContainerMountSecretsBootstrap = if 
(initContainerBootstrap.nonEmpty &&
+      executorSecretNamesToMountPaths.nonEmpty) {
+      Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
+    } else {
+      None
+    }
 
     val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
       KUBERNETES_MASTER_INTERNAL_URL,
@@ -54,7 +107,12 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-    val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf)
+    val executorPodFactory = new ExecutorPodFactory(
+      sparkConf,
+      mountSecretBootstrap,
+      initContainerBootstrap,
+      initContainerMountSecretsBootstrap)
+
     val allocatorExecutor = ThreadUtils
       .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
     val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
new file mode 100644
index 0000000..f193b1f
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.submit.steps._
+
+class DriverConfigOrchestratorSuite extends SparkFunSuite {
+
+  private val DRIVER_IMAGE = "driver-image"
+  private val IC_IMAGE = "init-container-image"
+  private val APP_ID = "spark-app-id"
+  private val LAUNCH_TIME = 975256L
+  private val APP_NAME = "spark"
+  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
+  private val APP_ARGS = Array("arg1", "arg2")
+  private val SECRET_FOO = "foo"
+  private val SECRET_BAR = "bar"
+  private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
+
+  test("Base submission steps with a main app resource.") {
+    val sparkConf = new SparkConf(false)
+      .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
+    val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
+    val orchestrator = new DriverConfigOrchestrator(
+      APP_ID,
+      LAUNCH_TIME,
+      Some(mainAppResource),
+      APP_NAME,
+      MAIN_CLASS,
+      APP_ARGS,
+      sparkConf)
+    validateStepTypes(
+      orchestrator,
+      classOf[BasicDriverConfigurationStep],
+      classOf[DriverServiceBootstrapStep],
+      classOf[DriverKubernetesCredentialsStep],
+      classOf[DependencyResolutionStep]
+    )
+  }
+
+  test("Base submission steps without a main app resource.") {
+    val sparkConf = new SparkConf(false)
+      .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
+    val orchestrator = new DriverConfigOrchestrator(
+      APP_ID,
+      LAUNCH_TIME,
+      Option.empty,
+      APP_NAME,
+      MAIN_CLASS,
+      APP_ARGS,
+      sparkConf)
+    validateStepTypes(
+      orchestrator,
+      classOf[BasicDriverConfigurationStep],
+      classOf[DriverServiceBootstrapStep],
+      classOf[DriverKubernetesCredentialsStep]
+    )
+  }
+
+  test("Submission steps with an init-container.") {
+    val sparkConf = new SparkConf(false)
+      .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
+      .set(INIT_CONTAINER_IMAGE, IC_IMAGE)
+      .set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar")
+    val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
+    val orchestrator = new DriverConfigOrchestrator(
+      APP_ID,
+      LAUNCH_TIME,
+      Some(mainAppResource),
+      APP_NAME,
+      MAIN_CLASS,
+      APP_ARGS,
+      sparkConf)
+    validateStepTypes(
+      orchestrator,
+      classOf[BasicDriverConfigurationStep],
+      classOf[DriverServiceBootstrapStep],
+      classOf[DriverKubernetesCredentialsStep],
+      classOf[DependencyResolutionStep],
+      classOf[DriverInitContainerBootstrapStep])
+  }
+
+  test("Submission steps with driver secrets to mount") {
+    val sparkConf = new SparkConf(false)
+      .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
+      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
+      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
+    val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
+    val orchestrator = new DriverConfigOrchestrator(
+      APP_ID,
+      LAUNCH_TIME,
+      Some(mainAppResource),
+      APP_NAME,
+      MAIN_CLASS,
+      APP_ARGS,
+      sparkConf)
+    validateStepTypes(
+      orchestrator,
+      classOf[BasicDriverConfigurationStep],
+      classOf[DriverServiceBootstrapStep],
+      classOf[DriverKubernetesCredentialsStep],
+      classOf[DependencyResolutionStep],
+      classOf[DriverMountSecretsStep])
+  }
+
+  private def validateStepTypes(
+      orchestrator: DriverConfigOrchestrator,
+      types: Class[_ <: DriverConfigurationStep]*): Unit = {
+    val steps = orchestrator.getAllConfigurationSteps
+    assert(steps.size === types.size)
+    assert(steps.map(_.getClass) === types)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
deleted file mode 100644
index 98f9f27..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config.DRIVER_CONTAINER_IMAGE
-import org.apache.spark.deploy.k8s.submit.steps._
-
-class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
-
-  private val NAMESPACE = "default"
-  private val DRIVER_IMAGE = "driver-image"
-  private val APP_ID = "spark-app-id"
-  private val LAUNCH_TIME = 975256L
-  private val APP_NAME = "spark"
-  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
-  private val APP_ARGS = Array("arg1", "arg2")
-
-  test("Base submission steps with a main app resource.") {
-    val sparkConf = new SparkConf(false)
-      .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
-    val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
-    val orchestrator = new DriverConfigurationStepsOrchestrator(
-      NAMESPACE,
-      APP_ID,
-      LAUNCH_TIME,
-      Some(mainAppResource),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    validateStepTypes(
-      orchestrator,
-      classOf[BaseDriverConfigurationStep],
-      classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep],
-      classOf[DependencyResolutionStep]
-    )
-  }
-
-  test("Base submission steps without a main app resource.") {
-    val sparkConf = new SparkConf(false)
-      .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
-    val orchestrator = new DriverConfigurationStepsOrchestrator(
-      NAMESPACE,
-      APP_ID,
-      LAUNCH_TIME,
-      Option.empty,
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    validateStepTypes(
-      orchestrator,
-      classOf[BaseDriverConfigurationStep],
-      classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep]
-    )
-  }
-
-  private def validateStepTypes(
-      orchestrator: DriverConfigurationStepsOrchestrator,
-      types: Class[_ <: DriverConfigurationStep]*): Unit = {
-    val steps = orchestrator.getAllConfigurationSteps()
-    assert(steps.size === types.size)
-    assert(steps.map(_.getClass) === types)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/SecretVolumeUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/SecretVolumeUtils.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/SecretVolumeUtils.scala
new file mode 100644
index 0000000..8388c16
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/SecretVolumeUtils.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{Container, Pod}
+
+private[spark] object SecretVolumeUtils {
+
+  def podHasVolume(driverPod: Pod, volumeName: String): Boolean = {
+    driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == 
volumeName)
+  }
+
+  def containerHasVolume(
+      driverContainer: Container,
+      volumeName: String,
+      mountPath: String): Boolean = {
+    driverContainer.getVolumeMounts.asScala.exists(volumeMount =>
+      volumeMount.getName == volumeName && volumeMount.getMountPath == 
mountPath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala
deleted file mode 100644
index f7c1b31..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-class BaseDriverConfigurationStepSuite extends SparkFunSuite {
-
-  private val APP_ID = "spark-app-id"
-  private val RESOURCE_NAME_PREFIX = "spark"
-  private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
-  private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
-  private val APP_NAME = "spark-test"
-  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
-  private val APP_ARGS = Array("arg1", "arg2", "arg 3")
-  private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
-  private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
-  private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
-  private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"
-
-  test("Set all possible configurations from the user.") {
-    val sparkConf = new SparkConf()
-      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
-      .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, 
"/opt/spark/spark-examples.jar")
-      .set("spark.driver.cores", "2")
-      .set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
-      .set(DRIVER_CONTAINER_IMAGE, "spark-driver:latest")
-      .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", 
CUSTOM_ANNOTATION_VALUE)
-      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", 
"customDriverEnv1")
-      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", 
"customDriverEnv2")
-
-    val submissionStep = new BaseDriverConfigurationStep(
-      APP_ID,
-      RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      CONTAINER_IMAGE_PULL_POLICY,
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    val basePod = new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
-    val baseDriverSpec = KubernetesDriverSpec(
-      driverPod = basePod,
-      driverContainer = new ContainerBuilder().build(),
-      driverSparkConf = new SparkConf(false),
-      otherKubernetesResources = Seq.empty[HasMetadata])
-    val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)
-
-    assert(preparedDriverSpec.driverContainer.getName === 
DRIVER_CONTAINER_NAME)
-    assert(preparedDriverSpec.driverContainer.getImage === 
"spark-driver:latest")
-    assert(preparedDriverSpec.driverContainer.getImagePullPolicy === 
CONTAINER_IMAGE_PULL_POLICY)
-
-    assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
-    val envs = preparedDriverSpec.driverContainer
-      .getEnv
-      .asScala
-      .map(env => (env.getName, env.getValue))
-      .toMap
-    assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === 
"/opt/spark/spark-examples.jar")
-    assert(envs(ENV_DRIVER_MEMORY) === "256M")
-    assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
-    assert(envs(ENV_DRIVER_ARGS) === "\"arg1\" \"arg2\" \"arg 3\"")
-    assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
-    assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
-
-    assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar =>
-      envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
-        envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
-        envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))
-
-    val resourceRequirements = preparedDriverSpec.driverContainer.getResources
-    val requests = resourceRequirements.getRequests.asScala
-    assert(requests("cpu").getAmount === "2")
-    assert(requests("memory").getAmount === "256Mi")
-    val limits = resourceRequirements.getLimits.asScala
-    assert(limits("memory").getAmount === "456Mi")
-    assert(limits("cpu").getAmount === "4")
-
-    val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata
-    assert(driverPodMetadata.getName === "spark-driver-pod")
-    assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
-    val expectedAnnotations = Map(
-      CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
-      SPARK_APP_NAME_ANNOTATION -> APP_NAME)
-    assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
-    assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
-
-    val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
-    val expectedSparkConf = Map(
-      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
-      "spark.app.id" -> APP_ID,
-      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX)
-    assert(resolvedSparkConf === expectedSparkConf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
new file mode 100644
index 0000000..e864c6a
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+class BasicDriverConfigurationStepSuite extends SparkFunSuite {
+
+  private val APP_ID = "spark-app-id"
+  private val RESOURCE_NAME_PREFIX = "spark"
+  private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
+  private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
+  private val APP_NAME = "spark-test"
+  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
+  private val APP_ARGS = Array("arg1", "arg2", "arg 3")
+  private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
+  private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
+  private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
+  private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"
+
+  test("Set all possible configurations from the user.") {
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+      .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, 
"/opt/spark/spark-examples.jar")
+      .set("spark.driver.cores", "2")
+      .set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
+      .set(DRIVER_CONTAINER_IMAGE, "spark-driver:latest")
+      .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", 
CUSTOM_ANNOTATION_VALUE)
+      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", 
"customDriverEnv1")
+      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", 
"customDriverEnv2")
+
+    val submissionStep = new BasicDriverConfigurationStep(
+      APP_ID,
+      RESOURCE_NAME_PREFIX,
+      DRIVER_LABELS,
+      CONTAINER_IMAGE_PULL_POLICY,
+      APP_NAME,
+      MAIN_CLASS,
+      APP_ARGS,
+      sparkConf)
+    val basePod = new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
+    val baseDriverSpec = KubernetesDriverSpec(
+      driverPod = basePod,
+      driverContainer = new ContainerBuilder().build(),
+      driverSparkConf = new SparkConf(false),
+      otherKubernetesResources = Seq.empty[HasMetadata])
+    val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)
+
+    assert(preparedDriverSpec.driverContainer.getName === 
DRIVER_CONTAINER_NAME)
+    assert(preparedDriverSpec.driverContainer.getImage === 
"spark-driver:latest")
+    assert(preparedDriverSpec.driverContainer.getImagePullPolicy === 
CONTAINER_IMAGE_PULL_POLICY)
+
+    assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
+    val envs = preparedDriverSpec.driverContainer
+      .getEnv
+      .asScala
+      .map(env => (env.getName, env.getValue))
+      .toMap
+    assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === 
"/opt/spark/spark-examples.jar")
+    assert(envs(ENV_DRIVER_MEMORY) === "256M")
+    assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
+    assert(envs(ENV_DRIVER_ARGS) === "\"arg1\" \"arg2\" \"arg 3\"")
+    assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
+    assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
+
+    assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar =>
+      envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
+        envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
+        envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))
+
+    val resourceRequirements = preparedDriverSpec.driverContainer.getResources
+    val requests = resourceRequirements.getRequests.asScala
+    assert(requests("cpu").getAmount === "2")
+    assert(requests("memory").getAmount === "256Mi")
+    val limits = resourceRequirements.getLimits.asScala
+    assert(limits("memory").getAmount === "456Mi")
+    assert(limits("cpu").getAmount === "4")
+
+    val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata
+    assert(driverPodMetadata.getName === "spark-driver-pod")
+    assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
+    val expectedAnnotations = Map(
+      CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
+      SPARK_APP_NAME_ANNOTATION -> APP_NAME)
+    assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
+    assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
+
+    val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
+    val expectedSparkConf = Map(
+      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
+      "spark.app.id" -> APP_ID,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX)
+    assert(resolvedSparkConf === expectedSparkConf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
new file mode 100644
index 0000000..758871e
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.StringReader
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.Maps
+import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, 
HasMetadata, PodBuilder, SecretBuilder}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import 
org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep,
 InitContainerSpec}
+import org.apache.spark.util.Utils
+
+class DriverInitContainerBootstrapStepSuite extends SparkFunSuite {
+
+  private val CONFIG_MAP_NAME = "spark-init-config-map"
+  private val CONFIG_MAP_KEY = "spark-init-config-map-key"
+
+  test("The init container bootstrap step should use all of the init container 
steps") {
+    val baseDriverSpec = KubernetesDriverSpec(
+      driverPod = new PodBuilder().build(),
+      driverContainer = new ContainerBuilder().build(),
+      driverSparkConf = new SparkConf(false),
+      otherKubernetesResources = Seq.empty[HasMetadata])
+    val initContainerSteps = Seq(
+      FirstTestInitContainerConfigurationStep,
+      SecondTestInitContainerConfigurationStep)
+    val bootstrapStep = new DriverInitContainerBootstrapStep(
+      initContainerSteps,
+      CONFIG_MAP_NAME,
+      CONFIG_MAP_KEY)
+
+    val preparedDriverSpec = bootstrapStep.configureDriver(baseDriverSpec)
+
+    assert(preparedDriverSpec.driverPod.getMetadata.getLabels.asScala ===
+      FirstTestInitContainerConfigurationStep.additionalLabels)
+    val additionalDriverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
+    assert(additionalDriverEnv.size === 1)
+    assert(additionalDriverEnv.head.getName ===
+      FirstTestInitContainerConfigurationStep.additionalMainContainerEnvKey)
+    assert(additionalDriverEnv.head.getValue ===
+      FirstTestInitContainerConfigurationStep.additionalMainContainerEnvValue)
+
+    assert(preparedDriverSpec.otherKubernetesResources.size === 2)
+    assert(preparedDriverSpec.otherKubernetesResources.contains(
+      FirstTestInitContainerConfigurationStep.additionalKubernetesResource))
+    assert(preparedDriverSpec.otherKubernetesResources.exists {
+      case configMap: ConfigMap =>
+        val hasMatchingName = configMap.getMetadata.getName == CONFIG_MAP_NAME
+        val configMapData = configMap.getData.asScala
+        val hasCorrectNumberOfEntries = configMapData.size == 1
+        val initContainerPropertiesRaw = configMapData(CONFIG_MAP_KEY)
+        val initContainerProperties = new Properties()
+        Utils.tryWithResource(new StringReader(initContainerPropertiesRaw)) {
+          initContainerProperties.load(_)
+        }
+        val initContainerPropertiesMap = 
Maps.fromProperties(initContainerProperties).asScala
+        val expectedInitContainerProperties = Map(
+          
SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyKey ->
+            
SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyValue)
+        val hasMatchingProperties = initContainerPropertiesMap == 
expectedInitContainerProperties
+        hasMatchingName && hasCorrectNumberOfEntries && hasMatchingProperties
+
+      case _ => false
+    })
+
+    val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers
+    assert(initContainers.size() === 1)
+    val initContainerEnv = initContainers.get(0).getEnv.asScala
+    assert(initContainerEnv.size === 1)
+    assert(initContainerEnv.head.getName ===
+      SecondTestInitContainerConfigurationStep.additionalInitContainerEnvKey)
+    assert(initContainerEnv.head.getValue ===
+      SecondTestInitContainerConfigurationStep.additionalInitContainerEnvValue)
+
+    val expectedSparkConf = Map(
+      INIT_CONTAINER_CONFIG_MAP_NAME.key -> CONFIG_MAP_NAME,
+      INIT_CONTAINER_CONFIG_MAP_KEY_CONF.key -> CONFIG_MAP_KEY,
+      SecondTestInitContainerConfigurationStep.additionalDriverSparkConfKey ->
+        
SecondTestInitContainerConfigurationStep.additionalDriverSparkConfValue)
+    assert(preparedDriverSpec.driverSparkConf.getAll.toMap === 
expectedSparkConf)
+  }
+}
+
+private object FirstTestInitContainerConfigurationStep extends 
InitContainerConfigurationStep {
+
+  val additionalLabels = Map("additionalLabelkey" -> "additionalLabelValue")
+  val additionalMainContainerEnvKey = "TEST_ENV_MAIN_KEY"
+  val additionalMainContainerEnvValue = "TEST_ENV_MAIN_VALUE"
+  val additionalKubernetesResource = new SecretBuilder()
+    .withNewMetadata()
+    .withName("test-secret")
+    .endMetadata()
+    .addToData("secret-key", "secret-value")
+    .build()
+
+  override def configureInitContainer(initContainerSpec: InitContainerSpec): 
InitContainerSpec = {
+    val driverPod = new PodBuilder(initContainerSpec.driverPod)
+      .editOrNewMetadata()
+      .addToLabels(additionalLabels.asJava)
+      .endMetadata()
+      .build()
+    val mainContainer = new ContainerBuilder(initContainerSpec.driverContainer)
+      .addNewEnv()
+      .withName(additionalMainContainerEnvKey)
+      .withValue(additionalMainContainerEnvValue)
+      .endEnv()
+      .build()
+    initContainerSpec.copy(
+      driverPod = driverPod,
+      driverContainer = mainContainer,
+      dependentResources = initContainerSpec.dependentResources ++
+        Seq(additionalKubernetesResource))
+  }
+}
+
+private object SecondTestInitContainerConfigurationStep extends 
InitContainerConfigurationStep {
+  val additionalInitContainerEnvKey = "TEST_ENV_INIT_KEY"
+  val additionalInitContainerEnvValue = "TEST_ENV_INIT_VALUE"
+  val additionalInitContainerPropertyKey = "spark.initcontainer.testkey"
+  val additionalInitContainerPropertyValue = "testvalue"
+  val additionalDriverSparkConfKey = "spark.driver.testkey"
+  val additionalDriverSparkConfValue = "spark.driver.testvalue"
+
+  override def configureInitContainer(initContainerSpec: InitContainerSpec): 
InitContainerSpec = {
+    val initContainer = new ContainerBuilder(initContainerSpec.initContainer)
+      .addNewEnv()
+      .withName(additionalInitContainerEnvKey)
+      .withValue(additionalInitContainerEnvValue)
+      .endEnv()
+      .build()
+    val initContainerProperties = initContainerSpec.properties ++
+      Map(additionalInitContainerPropertyKey -> 
additionalInitContainerPropertyValue)
+    val driverSparkConf = initContainerSpec.driverSparkConf ++
+      Map(additionalDriverSparkConfKey -> additionalDriverSparkConfValue)
+    initContainerSpec.copy(
+      initContainer = initContainer,
+      properties = initContainerProperties,
+      driverSparkConf = driverSparkConf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
new file mode 100644
index 0000000..9ec0cb5
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.MountSecretsBootstrap
+import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, 
SecretVolumeUtils}
+
+class DriverMountSecretsStepSuite extends SparkFunSuite {
+
+  private val SECRET_FOO = "foo"
+  private val SECRET_BAR = "bar"
+  private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
+
+  test("mounts all given secrets") {
+    val baseDriverSpec = KubernetesDriverSpec.initialSpec(new SparkConf(false))
+    val secretNamesToMountPaths = Map(
+      SECRET_FOO -> SECRET_MOUNT_PATH,
+      SECRET_BAR -> SECRET_MOUNT_PATH)
+
+    val mountSecretsBootstrap = new 
MountSecretsBootstrap(secretNamesToMountPaths)
+    val mountSecretsStep = new DriverMountSecretsStep(mountSecretsBootstrap)
+    val configuredDriverSpec = mountSecretsStep.configureDriver(baseDriverSpec)
+    val driverPodWithSecretsMounted = configuredDriverSpec.driverPod
+    val driverContainerWithSecretsMounted = 
configuredDriverSpec.driverContainer
+
+    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
+      assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, 
volumeName))
+    }
+    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
+      assert(SecretVolumeUtils.containerHasVolume(
+        driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
new file mode 100644
index 0000000..4553f9f
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps.initcontainer
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.when
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.{InitContainerBootstrap, 
PodWithDetachedInitContainer}
+import org.apache.spark.deploy.k8s.Config._
+
+class BasicInitContainerConfigurationStepSuite extends SparkFunSuite with 
BeforeAndAfter {
+
+  private val SPARK_JARS = Seq(
+    "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
+  private val SPARK_FILES = Seq(
+    "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
+  private val JARS_DOWNLOAD_PATH = "/var/data/jars"
+  private val FILES_DOWNLOAD_PATH = "/var/data/files"
+  private val POD_LABEL = Map("bootstrap" -> "true")
+  private val INIT_CONTAINER_NAME = "init-container"
+  private val DRIVER_CONTAINER_NAME = "driver-container"
+
+  @Mock
+  private var podAndInitContainerBootstrap : InitContainerBootstrap = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    when(podAndInitContainerBootstrap.bootstrapInitContainer(
+      any[PodWithDetachedInitContainer])).thenAnswer(new 
Answer[PodWithDetachedInitContainer] {
+      override def answer(invocation: InvocationOnMock) : 
PodWithDetachedInitContainer = {
+        val pod = invocation.getArgumentAt(0, 
classOf[PodWithDetachedInitContainer])
+        pod.copy(
+          pod = new PodBuilder(pod.pod)
+            .withNewMetadata()
+            .addToLabels("bootstrap", "true")
+            .endMetadata()
+            .withNewSpec().endSpec()
+            .build(),
+          initContainer = new ContainerBuilder()
+            .withName(INIT_CONTAINER_NAME)
+            .build(),
+          mainContainer = new ContainerBuilder()
+            .withName(DRIVER_CONTAINER_NAME)
+            .build()
+        )}})
+  }
+
+  test("additionalDriverSparkConf with mix of remote files and jars") {
+    val baseInitStep = new BasicInitContainerConfigurationStep(
+      SPARK_JARS,
+      SPARK_FILES,
+      JARS_DOWNLOAD_PATH,
+      FILES_DOWNLOAD_PATH,
+      podAndInitContainerBootstrap)
+    val expectedDriverSparkConf = Map(
+      JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH,
+      FILES_DOWNLOAD_LOCATION.key -> FILES_DOWNLOAD_PATH,
+      INIT_CONTAINER_REMOTE_JARS.key -> 
"hdfs://localhost:9000/app/jars/jar1.jar",
+      INIT_CONTAINER_REMOTE_FILES.key -> 
"hdfs://localhost:9000/app/files/file1.txt")
+    val initContainerSpec = InitContainerSpec(
+      Map.empty[String, String],
+      Map.empty[String, String],
+      new Container(),
+      new Container(),
+      new Pod,
+      Seq.empty[HasMetadata])
+    val returnContainerSpec = 
baseInitStep.configureInitContainer(initContainerSpec)
+    assert(expectedDriverSparkConf === returnContainerSpec.properties)
+    assert(returnContainerSpec.initContainer.getName === INIT_CONTAINER_NAME)
+    assert(returnContainerSpec.driverContainer.getName === 
DRIVER_CONTAINER_NAME)
+    assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === 
POD_LABEL)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
new file mode 100644
index 0000000..20f2e5b
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps.initcontainer
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+class InitContainerConfigOrchestratorSuite extends SparkFunSuite {
+
+  private val DOCKER_IMAGE = "init-container"
+  private val SPARK_JARS = Seq(
+    "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
+  private val SPARK_FILES = Seq(
+    "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
+  private val JARS_DOWNLOAD_PATH = "/var/data/jars"
+  private val FILES_DOWNLOAD_PATH = "/var/data/files"
+  private val DOCKER_IMAGE_PULL_POLICY: String = "IfNotPresent"
+  private val CUSTOM_LABEL_KEY = "customLabel"
+  private val CUSTOM_LABEL_VALUE = "customLabelValue"
+  private val INIT_CONTAINER_CONFIG_MAP_NAME = "spark-init-config-map"
+  private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key"
+  private val SECRET_FOO = "foo"
+  private val SECRET_BAR = "bar"
+  private val SECRET_MOUNT_PATH = "/etc/secrets/init-container"
+
+  test("including basic configuration step") {
+    val sparkConf = new SparkConf(true)
+      .set(INIT_CONTAINER_IMAGE, DOCKER_IMAGE)
+      .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", 
CUSTOM_LABEL_VALUE)
+
+    val orchestrator = new InitContainerConfigOrchestrator(
+      SPARK_JARS.take(1),
+      SPARK_FILES,
+      JARS_DOWNLOAD_PATH,
+      FILES_DOWNLOAD_PATH,
+      DOCKER_IMAGE_PULL_POLICY,
+      INIT_CONTAINER_CONFIG_MAP_NAME,
+      INIT_CONTAINER_CONFIG_MAP_KEY,
+      sparkConf)
+    val initSteps = orchestrator.getAllConfigurationSteps
+    assert(initSteps.lengthCompare(1) == 0)
+    assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep])
+  }
+
+  test("including step to mount user-specified secrets") {
+    val sparkConf = new SparkConf(false)
+      .set(INIT_CONTAINER_IMAGE, DOCKER_IMAGE)
+      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
+      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
+
+    val orchestrator = new InitContainerConfigOrchestrator(
+      SPARK_JARS.take(1),
+      SPARK_FILES,
+      JARS_DOWNLOAD_PATH,
+      FILES_DOWNLOAD_PATH,
+      DOCKER_IMAGE_PULL_POLICY,
+      INIT_CONTAINER_CONFIG_MAP_NAME,
+      INIT_CONTAINER_CONFIG_MAP_KEY,
+      sparkConf)
+    val initSteps = orchestrator.getAllConfigurationSteps
+    assert(initSteps.length === 2)
+    assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep])
+    assert(initSteps(1).isInstanceOf[InitContainerMountSecretsStep])
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
new file mode 100644
index 0000000..eab4e17
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps.initcontainer
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.MountSecretsBootstrap
+import org.apache.spark.deploy.k8s.submit.SecretVolumeUtils
+
+class InitContainerMountSecretsStepSuite extends SparkFunSuite {
+
+  private val SECRET_FOO = "foo"
+  private val SECRET_BAR = "bar"
+  private val SECRET_MOUNT_PATH = "/etc/secrets/init-container"
+
+  test("mounts all given secrets") {
+    val baseInitContainerSpec = InitContainerSpec(
+      Map.empty,
+      Map.empty,
+      new ContainerBuilder().build(),
+      new ContainerBuilder().build(),
+      new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
+      Seq.empty)
+    val secretNamesToMountPaths = Map(
+      SECRET_FOO -> SECRET_MOUNT_PATH,
+      SECRET_BAR -> SECRET_MOUNT_PATH)
+
+    val mountSecretsBootstrap = new 
MountSecretsBootstrap(secretNamesToMountPaths)
+    val initContainerMountSecretsStep = new 
InitContainerMountSecretsStep(mountSecretsBootstrap)
+    val configuredInitContainerSpec = 
initContainerMountSecretsStep.configureInitContainer(
+      baseInitContainerSpec)
+
+    val podWithSecretsMounted = configuredInitContainerSpec.driverPod
+    val initContainerWithSecretsMounted = 
configuredInitContainerSpec.initContainer
+
+    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
+      assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, 
volumeName)))
+    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
+      assert(SecretVolumeUtils.containerHasVolume(
+        initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
new file mode 100644
index 0000000..6c557ec
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.rest.k8s
+
+import java.io.File
+import java.util.UUID
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.mockito.Mockito
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mockito.MockitoSugar._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.util.Utils
+
+class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt")
+  private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt")
+
+  private var downloadJarsDir: File = _
+  private var downloadFilesDir: File = _
+  private var downloadJarsSecretValue: String = _
+  private var downloadFilesSecretValue: String = _
+  private var fileFetcher: FileFetcher = _
+
+  override def beforeAll(): Unit = {
+    downloadJarsSecretValue = Files.toString(
+      new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8)
+    downloadFilesSecretValue = Files.toString(
+      new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8)
+  }
+
+  before {
+    downloadJarsDir = Utils.createTempDir()
+    downloadFilesDir = Utils.createTempDir()
+    fileFetcher = mock[FileFetcher]
+  }
+
+  after {
+    downloadJarsDir.delete()
+    downloadFilesDir.delete()
+  }
+
+  test("Downloads from remote server should invoke the file fetcher") {
+    val sparkConf = getSparkConfForRemoteFileDownloads
+    val initContainerUnderTest = new SparkPodInitContainer(sparkConf, 
fileFetcher)
+    initContainerUnderTest.run()
+    Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar";, 
downloadJarsDir)
+    Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", 
downloadJarsDir)
+    Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt";, 
downloadFilesDir)
+  }
+
+  private def getSparkConfForRemoteFileDownloads: SparkConf = {
+    new SparkConf(true)
+      .set(INIT_CONTAINER_REMOTE_JARS,
+        "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar";)
+      .set(INIT_CONTAINER_REMOTE_FILES,
+        "http://localhost:9000/file.txt";)
+      .set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath)
+      .set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath)
+  }
+
+  private def createTempFile(extension: String): String = {
+    val dir = Utils.createTempDir()
+    val file = new File(dir, s"${UUID.randomUUID().toString}.$extension")
+    Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8)
+    file.getAbsolutePath
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
index 3a55d7c..7121a80 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
@@ -18,15 +18,19 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model.{Pod, _}
-import org.mockito.MockitoAnnotations
+import io.fabric8.kubernetes.api.model._
+import org.mockito.{AdditionalAnswers, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{InitContainerBootstrap, 
MountSecretsBootstrap, PodWithDetachedInitContainer}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 
 class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with 
BeforeAndAfterEach {
+
   private val driverPodName: String = "driver-pod"
   private val driverPodUid: String = "driver-uid"
   private val executorPrefix: String = "base"
@@ -54,7 +58,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
   }
 
   test("basic executor pod has reasonable defaults") {
-    val factory = new ExecutorPodFactoryImpl(baseConf)
+    val factory = new ExecutorPodFactory(baseConf, None, None, None)
     val executor = factory.createExecutorPod(
       "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
 
@@ -85,7 +89,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
     conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
       "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")
 
-    val factory = new ExecutorPodFactoryImpl(conf)
+    val factory = new ExecutorPodFactory(conf, None, None, None)
     val executor = factory.createExecutorPod(
       "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
 
@@ -97,7 +101,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
     conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
     conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
 
-    val factory = new ExecutorPodFactoryImpl(conf)
+    val factory = new ExecutorPodFactory(conf, None, None, None)
     val executor = factory.createExecutorPod(
       "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), 
driverPod, Map[String, Int]())
 
@@ -108,6 +112,74 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
     checkOwnerReferences(executor, driverPodUid)
   }
 
+  test("executor secrets get mounted") {
+    val conf = baseConf.clone()
+
+    val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> 
"/var/secret1"))
+    val factory = new ExecutorPodFactory(
+      conf,
+      Some(secretsBootstrap),
+      None,
+      None)
+    val executor = factory.createExecutorPod(
+      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
+
+    assert(executor.getSpec.getContainers.size() === 1)
+    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1)
+    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName
+      === "secret1-volume")
+    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
+      .getMountPath === "/var/secret1")
+
+    // check volume mounted.
+    assert(executor.getSpec.getVolumes.size() === 1)
+    assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === 
"secret1")
+
+    checkOwnerReferences(executor, driverPodUid)
+  }
+
+  test("init-container bootstrap step adds an init container") {
+    val conf = baseConf.clone()
+    val initContainerBootstrap = mock(classOf[InitContainerBootstrap])
+    when(initContainerBootstrap.bootstrapInitContainer(
+      
any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
+
+    val factory = new ExecutorPodFactory(
+      conf,
+      None,
+      Some(initContainerBootstrap),
+      None)
+    val executor = factory.createExecutorPod(
+      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
+
+    assert(executor.getSpec.getInitContainers.size() === 1)
+    checkOwnerReferences(executor, driverPodUid)
+  }
+
+  test("init-container with secrets mount bootstrap") {
+    val conf = baseConf.clone()
+    val initContainerBootstrap = mock(classOf[InitContainerBootstrap])
+    when(initContainerBootstrap.bootstrapInitContainer(
+      
any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
+    val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> 
"/var/secret1"))
+
+    val factory = new ExecutorPodFactory(
+      conf,
+      None,
+      Some(initContainerBootstrap),
+      Some(secretsBootstrap))
+    val executor = factory.createExecutorPod(
+      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
+
+    assert(executor.getSpec.getInitContainers.size() === 1)
+    
assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0).getName
+      === "secret1-volume")
+    assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0)
+      .getMountPath === "/var/secret1")
+
+    checkOwnerReferences(executor, driverPodUid)
+  }
+
   // There is always exactly one controller reference, and it points to the 
driver pod.
   private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit 
= {
     assert(executor.getMetadata.getOwnerReferences.size() === 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile
index 9b682f8..45fbcd9 100644
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile
@@ -22,7 +22,7 @@ FROM spark-base
 # If this docker file is being used in the context of building your images 
from a Spark
 # distribution, the docker build command should be invoked from the top level 
directory
 # of the Spark distribution. E.g.:
-# docker build -t spark-driver:latest -f 
kubernetes/dockerfiles/spark-base/Dockerfile .
+# docker build -t spark-driver:latest -f 
kubernetes/dockerfiles/driver/Dockerfile .
 
 COPY examples /opt/spark/examples
 
@@ -31,4 +31,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
     readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
     if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
     if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+    if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R 
"$SPARK_MOUNTED_FILES_DIR/." .; fi && \
     ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp 
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile
index 168cd4c..0f806cf 100644
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile
@@ -22,7 +22,7 @@ FROM spark-base
 # If this docker file is being used in the context of building your images 
from a Spark
 # distribution, the docker build command should be invoked from the top level 
directory
 # of the Spark distribution. E.g.:
-# docker build -t spark-executor:latest -f 
kubernetes/dockerfiles/spark-base/Dockerfile .
+# docker build -t spark-executor:latest -f 
kubernetes/dockerfiles/executor/Dockerfile .
 
 COPY examples /opt/spark/examples
 
@@ -31,4 +31,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
     readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
     if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
     if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+    if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R 
"$SPARK_MOUNTED_FILES_DIR/." .; fi && \
     ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" 
-Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
$SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores 
$SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname 
$SPARK_EXECUTOR_POD_IP

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
new file mode 100644
index 0000000..0554931
--- /dev/null
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-init:latest -f 
kubernetes/dockerfiles/init-container/Dockerfile .
+
+ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", 
"org.apache.spark.deploy.rest.k8s.SparkPodInitContainer" ]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to