Repository: spark
Updated Branches:
  refs/heads/master c235b5f97 -> 3f4060c34


http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
new file mode 100644
index 0000000..ccc1890
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, 
SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use such 
mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+    submissionSparkConf: SparkConf,
+    kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+    
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+    
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+    
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+    val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+    val oauthTokenBase64 = submissionSparkConf
+      
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+      .map { token =>
+        BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+      }
+    val caCertDataBase64 = safeFileConfToBase64(
+      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+      "Driver CA cert file")
+    val clientKeyDataBase64 = safeFileConfToBase64(
+      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+      "Driver client key file")
+    val clientCertDataBase64 = safeFileConfToBase64(
+      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+      "Driver client cert file")
+
+    val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+      driverSparkConf,
+      oauthTokenBase64,
+      caCertDataBase64,
+      clientKeyDataBase64,
+      clientCertDataBase64)
+
+    val kubernetesCredentialsSecret = createCredentialsSecret(
+      oauthTokenBase64,
+      caCertDataBase64,
+      clientKeyDataBase64,
+      clientCertDataBase64)
+
+    val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+      new PodBuilder(driverSpec.driverPod)
+        .editOrNewSpec()
+          .addNewVolume()
+            .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+            
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+            .endVolume()
+          .endSpec()
+        .build()
+    }.getOrElse(
+      driverServiceAccount.map { account =>
+        new PodBuilder(driverSpec.driverPod)
+          .editOrNewSpec()
+          .withServiceAccount(account)
+          .withServiceAccountName(account)
+          .endSpec()
+          .build()
+      }.getOrElse(driverSpec.driverPod)
+    )
+
+    val driverContainerWithMountedSecretVolume = 
kubernetesCredentialsSecret.map { secret =>
+      new ContainerBuilder(driverSpec.driverContainer)
+        .addNewVolumeMount()
+          .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+          .withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
+          .endVolumeMount()
+        .build()
+    }.getOrElse(driverSpec.driverContainer)
+
+    driverSpec.copy(
+      driverPod = driverPodWithMountedKubernetesCredentials,
+      otherKubernetesResources =
+        driverSpec.otherKubernetesResources ++ 
kubernetesCredentialsSecret.toSeq,
+      driverSparkConf = driverSparkConfWithCredentialsLocations,
+      driverContainer = driverContainerWithMountedSecretVolume)
+  }
+
+  private def createCredentialsSecret(
+      driverOAuthTokenBase64: Option[String],
+      driverCaCertDataBase64: Option[String],
+      driverClientKeyDataBase64: Option[String],
+      driverClientCertDataBase64: Option[String]): Option[Secret] = {
+    val allSecretData =
+      resolveSecretData(
+        driverClientKeyDataBase64,
+        DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
+      resolveSecretData(
+        driverClientCertDataBase64,
+        DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
+      resolveSecretData(
+        driverCaCertDataBase64,
+        DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
+      resolveSecretData(
+        driverOAuthTokenBase64,
+        DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
+
+    if (allSecretData.isEmpty) {
+      None
+    } else {
+      Some(new SecretBuilder()
+        .withNewMetadata()
+          .withName(s"$kubernetesResourceNamePrefix-kubernetes-credentials")
+          .endMetadata()
+        .withData(allSecretData.asJava)
+        .build())
+    }
+  }
+
+  private def setDriverPodKubernetesCredentialLocations(
+      driverSparkConf: SparkConf,
+      driverOauthTokenBase64: Option[String],
+      driverCaCertDataBase64: Option[String],
+      driverClientKeyDataBase64: Option[String],
+      driverClientCertDataBase64: Option[String]): SparkConf = {
+    val resolvedMountedOAuthTokenFile = resolveSecretLocation(
+      maybeMountedOAuthTokenFile,
+      driverOauthTokenBase64,
+      DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
+    val resolvedMountedClientKeyFile = resolveSecretLocation(
+      maybeMountedClientKeyFile,
+      driverClientKeyDataBase64,
+      DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
+    val resolvedMountedClientCertFile = resolveSecretLocation(
+      maybeMountedClientCertFile,
+      driverClientCertDataBase64,
+      DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
+    val resolvedMountedCaCertFile = resolveSecretLocation(
+      maybeMountedCaCertFile,
+      driverCaCertDataBase64,
+      DRIVER_CREDENTIALS_CA_CERT_PATH)
+
+    val sparkConfWithCredentialLocations = driverSparkConf
+      .setOption(
+        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+        resolvedMountedCaCertFile)
+      .setOption(
+        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+        resolvedMountedClientKeyFile)
+      .setOption(
+        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+        resolvedMountedClientCertFile)
+      .setOption(
+        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
+        resolvedMountedOAuthTokenFile)
+
+    // Redact all OAuth token values
+    sparkConfWithCredentialLocations
+      .getAll
+      .filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX)).map(_._1)
+      .foreach {
+        sparkConfWithCredentialLocations.set(_, "<present_but_redacted>")
+      }
+    sparkConfWithCredentialLocations
+  }
+
+  private def safeFileConfToBase64(conf: String, fileType: String): 
Option[String] = {
+    submissionSparkConf.getOption(conf)
+      .map(new File(_))
+      .map { file =>
+        require(file.isFile, String.format("%s provided at %s does not exist 
or is not a file.",
+          fileType, file.getAbsolutePath))
+        BaseEncoding.base64().encode(Files.toByteArray(file))
+      }
+  }
+
+  private def resolveSecretLocation(
+      mountedUserSpecified: Option[String],
+      valueMountedFromSubmitter: Option[String],
+      mountedCanonicalLocation: String): Option[String] = {
+    mountedUserSpecified.orElse(valueMountedFromSubmitter.map { _ =>
+      mountedCanonicalLocation
+    })
+  }
+
+  /**
+   * Resolve a Kubernetes secret data entry from an optional client credential 
used by the
+   * driver to talk to the Kubernetes API server.
+   *
+   * @param userSpecifiedCredential the optional user-specified client 
credential.
+   * @param secretName name of the Kubernetes secret storing the client 
credential.
+   * @return a secret data entry in the form of a map from the secret name to 
the secret data,
+   *         which may be empty if the user-specified credential is empty.
+   */
+  private def resolveSecretData(
+      userSpecifiedCredential: Option[String],
+      secretName: String): Map[String, String] = {
+    userSpecifiedCredential.map { valueBase64 =>
+      Map(secretName -> valueBase64)
+    }.getOrElse(Map.empty[String, String])
+  }
+
+  private implicit def augmentSparkConf(sparkConf: SparkConf): 
OptionSettableSparkConf = {
+    new OptionSettableSparkConf(sparkConf)
+  }
+}
+
+private class OptionSettableSparkConf(sparkConf: SparkConf) {
+  def setOption(configEntry: String, option: Option[String]): SparkConf = {
+    option.foreach { opt =>
+      sparkConf.set(configEntry, opt)
+    }
+    sparkConf
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
new file mode 100644
index 0000000..696d11f
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the pod 
at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+    kubernetesResourceNamePrefix: String,
+    driverLabels: Map[String, String],
+    submissionSparkConf: SparkConf,
+    clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+    require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+      s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the 
driver's bind " +
+      "address is managed and set to the driver pod's IP address.")
+    require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+      s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's 
hostname will be " +
+      "managed via a Kubernetes service.")
+
+    val preferredServiceName = 
s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+    val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
+      preferredServiceName
+    } else {
+      val randomServiceId = clock.getTimeMillis()
+      val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+      logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
+        s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling 
back to use " +
+        s"$shorterServiceName as the driver service's name.")
+      shorterServiceName
+    }
+
+    val driverPort = submissionSparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT)
+    val driverBlockManagerPort = submissionSparkConf.getInt(
+        org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, 
DEFAULT_BLOCKMANAGER_PORT)
+    val driverService = new ServiceBuilder()
+      .withNewMetadata()
+        .withName(resolvedServiceName)
+        .endMetadata()
+      .withNewSpec()
+        .withClusterIP("None")
+        .withSelector(driverLabels.asJava)
+        .addNewPort()
+          .withName(DRIVER_PORT_NAME)
+          .withPort(driverPort)
+          .withNewTargetPort(driverPort)
+          .endPort()
+        .addNewPort()
+          .withName(BLOCK_MANAGER_PORT_NAME)
+          .withPort(driverBlockManagerPort)
+          .withNewTargetPort(driverBlockManagerPort)
+          .endPort()
+        .endSpec()
+      .build()
+
+    val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
+    val driverHostname = 
s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
+    val resolvedSparkConf = driverSpec.driverSparkConf.clone()
+      .set(DRIVER_HOST_KEY, driverHostname)
+      .set("spark.driver.port", driverPort.toString)
+      .set(
+        org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 
driverBlockManagerPort)
+
+    driverSpec.copy(
+      driverSparkConf = resolvedSparkConf,
+      otherKubernetesResources = driverSpec.otherKubernetesResources ++ 
Seq(driverService))
+  }
+}
+
+private[spark] object DriverServiceBootstrapStep {
+  val DRIVER_BIND_ADDRESS_KEY = 
org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
+  val DRIVER_HOST_KEY = 
org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
+  val DRIVER_SVC_POSTFIX = "-driver-svc"
+  val MAX_SERVICE_NAME_LENGTH = 63
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
index f79155b..9d8f3b9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.ConfigurationUtils
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
 import org.apache.spark.util.Utils
 
 /**
@@ -46,8 +47,7 @@ private[spark] trait ExecutorPodFactory {
 private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
   extends ExecutorPodFactory {
 
-  private val executorExtraClasspath =
-    sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
 
   private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
     sparkConf,
@@ -81,13 +81,12 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: 
SparkConf)
 
   private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
 
-  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY)
   private val executorMemoryString = sparkConf.get(
-    org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
-    org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+    EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
 
   private val memoryOverheadMiB = sparkConf
-    .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+    .get(EXECUTOR_MEMORY_OVERHEAD)
     .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
       MEMORY_OVERHEAD_MIN_MIB))
   private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
@@ -129,7 +128,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: 
SparkConf)
         .build()
     }
     val executorExtraJavaOptionsEnv = sparkConf
-      .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
+      .get(EXECUTOR_JAVA_OPTIONS)
       .map { opts =>
         val delimitedOpts = Utils.splitCommandString(opts)
         delimitedOpts.zipWithIndex.map {

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 68ca6a7..b8bb152 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -20,7 +20,7 @@ import java.io.File
 
 import io.fabric8.kubernetes.client.Config
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
@@ -33,6 +33,10 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
   override def canCreate(masterURL: String): Boolean = 
masterURL.startsWith("k8s")
 
   override def createTaskScheduler(sc: SparkContext, masterURL: String): 
TaskScheduler = {
+    if (masterURL.startsWith("k8s") && sc.deployMode == "client") {
+      throw new SparkException("Client mode is currently not supported for 
Kubernetes.")
+    }
+
     new TaskSchedulerImpl(sc)
   }
 
@@ -45,7 +49,7 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
     val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
       KUBERNETES_MASTER_INTERNAL_URL,
       Some(sparkConf.get(KUBERNETES_NAMESPACE)),
-      APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
+      KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
       sparkConf,
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
new file mode 100644
index 0000000..bf4ec04
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.Iterables
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
+import io.fabric8.kubernetes.client.dsl.{MixedOperation, 
NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.Mockito.{doReturn, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mockito.MockitoSugar._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+
+class ClientSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val DRIVER_POD_UID = "pod-id"
+  private val DRIVER_POD_API_VERSION = "v1"
+  private val DRIVER_POD_KIND = "pod"
+
+  private type ResourceList = 
NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
+      HasMetadata, Boolean]
+  private type Pods = MixedOperation[Pod, PodList, DoneablePod, 
PodResource[Pod, DoneablePod]]
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var podOperations: Pods = _
+
+  @Mock
+  private var namedPods: PodResource[Pod, DoneablePod] = _
+
+  @Mock
+  private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
+
+  @Mock
+  private var resourceList: ResourceList = _
+
+  private val submissionSteps = Seq(FirstTestConfigurationStep, 
SecondTestConfigurationStep)
+  private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _
+  private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    
when(podOperations.withName(FirstTestConfigurationStep.podName)).thenReturn(namedPods)
+
+    createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod])
+    createdResourcesArgumentCaptor = 
ArgumentCaptor.forClass(classOf[HasMetadata])
+    
when(podOperations.create(createdPodArgumentCaptor.capture())).thenAnswer(new 
Answer[Pod] {
+      override def answer(invocation: InvocationOnMock): Pod = {
+        new PodBuilder(invocation.getArgumentAt(0, classOf[Pod]))
+          .editMetadata()
+            .withUid(DRIVER_POD_UID)
+            .endMetadata()
+          .withApiVersion(DRIVER_POD_API_VERSION)
+          .withKind(DRIVER_POD_KIND)
+          .build()
+      }
+    })
+    
when(podOperations.withName(FirstTestConfigurationStep.podName)).thenReturn(namedPods)
+    when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
+    doReturn(resourceList)
+      .when(kubernetesClient)
+      .resourceList(createdResourcesArgumentCaptor.capture())
+  }
+
+  test("The client should configure the pod with the submission steps.") {
+    val submissionClient = new Client(
+      submissionSteps,
+      new SparkConf(false),
+      kubernetesClient,
+      false,
+      "spark",
+      loggingPodStatusWatcher)
+    submissionClient.run()
+    val createdPod = createdPodArgumentCaptor.getValue
+    assert(createdPod.getMetadata.getName === 
FirstTestConfigurationStep.podName)
+    assert(createdPod.getMetadata.getLabels.asScala ===
+      Map(FirstTestConfigurationStep.labelKey -> 
FirstTestConfigurationStep.labelValue))
+    assert(createdPod.getMetadata.getAnnotations.asScala ===
+      Map(SecondTestConfigurationStep.annotationKey ->
+        SecondTestConfigurationStep.annotationValue))
+    assert(createdPod.getSpec.getContainers.size() === 1)
+    assert(createdPod.getSpec.getContainers.get(0).getName ===
+      SecondTestConfigurationStep.containerName)
+  }
+
+  test("The client should create the secondary Kubernetes resources.") {
+    val submissionClient = new Client(
+      submissionSteps,
+      new SparkConf(false),
+      kubernetesClient,
+      false,
+      "spark",
+      loggingPodStatusWatcher)
+    submissionClient.run()
+    val createdPod = createdPodArgumentCaptor.getValue
+    val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
+    assert(otherCreatedResources.size === 1)
+    val createdResource = 
Iterables.getOnlyElement(otherCreatedResources).asInstanceOf[Secret]
+    assert(createdResource.getMetadata.getName === 
FirstTestConfigurationStep.secretName)
+    assert(createdResource.getData.asScala ===
+      Map(FirstTestConfigurationStep.secretKey -> 
FirstTestConfigurationStep.secretData))
+    val ownerReference = 
Iterables.getOnlyElement(createdResource.getMetadata.getOwnerReferences)
+    assert(ownerReference.getName === createdPod.getMetadata.getName)
+    assert(ownerReference.getKind === DRIVER_POD_KIND)
+    assert(ownerReference.getUid === DRIVER_POD_UID)
+    assert(ownerReference.getApiVersion === DRIVER_POD_API_VERSION)
+  }
+
+  test("The client should attach the driver container with the appropriate JVM 
options.") {
+    val sparkConf = new SparkConf(false)
+      .set("spark.logConf", "true")
+      .set(
+        org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS,
+          "-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails")
+    val submissionClient = new Client(
+      submissionSteps,
+      sparkConf,
+      kubernetesClient,
+      false,
+      "spark",
+      loggingPodStatusWatcher)
+    submissionClient.run()
+    val createdPod = createdPodArgumentCaptor.getValue
+    val driverContainer = 
Iterables.getOnlyElement(createdPod.getSpec.getContainers)
+    assert(driverContainer.getName === 
SecondTestConfigurationStep.containerName)
+    val driverJvmOptsEnvs = driverContainer.getEnv.asScala.filter { env =>
+      env.getName.startsWith(ENV_JAVA_OPT_PREFIX)
+    }.sortBy(_.getName)
+    assert(driverJvmOptsEnvs.size === 4)
+
+    val expectedJvmOptsValues = Seq(
+      "-Dspark.logConf=true",
+      s"-D${SecondTestConfigurationStep.sparkConfKey}=" +
+        s"${SecondTestConfigurationStep.sparkConfValue}",
+      "-XX:+HeapDumpOnOutOfMemoryError",
+      "-XX:+PrintGCDetails")
+    driverJvmOptsEnvs.zip(expectedJvmOptsValues).zipWithIndex.foreach {
+      case ((resolvedEnv, expectedJvmOpt), index) =>
+        assert(resolvedEnv.getName === s"$ENV_JAVA_OPT_PREFIX$index")
+        assert(resolvedEnv.getValue === expectedJvmOpt)
+    }
+  }
+
+  test("Waiting for app completion should stall on the watcher") {
+    val submissionClient = new Client(
+      submissionSteps,
+      new SparkConf(false),
+      kubernetesClient,
+      true,
+      "spark",
+      loggingPodStatusWatcher)
+    submissionClient.run()
+    verify(loggingPodStatusWatcher).awaitCompletion()
+  }
+
+}
+
+private object FirstTestConfigurationStep extends DriverConfigurationStep {
+
+  val podName = "test-pod"
+  val secretName = "test-secret"
+  val labelKey = "first-submit"
+  val labelValue = "true"
+  val secretKey = "secretKey"
+  val secretData = "secretData"
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+    val modifiedPod = new PodBuilder(driverSpec.driverPod)
+      .editMetadata()
+      .withName(podName)
+      .addToLabels(labelKey, labelValue)
+      .endMetadata()
+      .build()
+    val additionalResource = new SecretBuilder()
+      .withNewMetadata()
+      .withName(secretName)
+      .endMetadata()
+      .addToData(secretKey, secretData)
+      .build()
+    driverSpec.copy(
+      driverPod = modifiedPod,
+      otherKubernetesResources = driverSpec.otherKubernetesResources ++ 
Seq(additionalResource))
+  }
+}
+
+private object SecondTestConfigurationStep extends DriverConfigurationStep {
+
+  val annotationKey = "second-submit"
+  val annotationValue = "submitted"
+  val sparkConfKey = "spark.custom-conf"
+  val sparkConfValue = "custom-conf-value"
+  val containerName = "driverContainer"
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+    val modifiedPod = new PodBuilder(driverSpec.driverPod)
+      .editMetadata()
+        .addToAnnotations(annotationKey, annotationValue)
+        .endMetadata()
+      .build()
+    val resolvedSparkConf = 
driverSpec.driverSparkConf.clone().set(sparkConfKey, sparkConfValue)
+    val modifiedContainer = new ContainerBuilder(driverSpec.driverContainer)
+      .withName(containerName)
+      .build()
+    driverSpec.copy(
+      driverPod = modifiedPod,
+      driverSparkConf = resolvedSparkConf,
+      driverContainer = modifiedContainer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
new file mode 100644
index 0000000..c7291d4
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config.DRIVER_DOCKER_IMAGE
+import org.apache.spark.deploy.k8s.submit.steps._
+
+class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
+
+  private val NAMESPACE = "default"
+  private val DRIVER_IMAGE = "driver-image"
+  private val APP_ID = "spark-app-id"
+  private val LAUNCH_TIME = 975256L
+  private val APP_NAME = "spark"
+  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
+  private val APP_ARGS = Array("arg1", "arg2")
+
+  test("Base submission steps with a main app resource.") {
+    val sparkConf = new SparkConf(false)
+      .set(DRIVER_DOCKER_IMAGE, DRIVER_IMAGE)
+    val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
+    val orchestrator = new DriverConfigurationStepsOrchestrator(
+      NAMESPACE,
+      APP_ID,
+      LAUNCH_TIME,
+      Some(mainAppResource),
+      APP_NAME,
+      MAIN_CLASS,
+      APP_ARGS,
+      sparkConf)
+    validateStepTypes(
+      orchestrator,
+      classOf[BaseDriverConfigurationStep],
+      classOf[DriverServiceBootstrapStep],
+      classOf[DriverKubernetesCredentialsStep],
+      classOf[DependencyResolutionStep]
+    )
+  }
+
+  test("Base submission steps without a main app resource.") {
+    val sparkConf = new SparkConf(false)
+      .set(DRIVER_DOCKER_IMAGE, DRIVER_IMAGE)
+    val orchestrator = new DriverConfigurationStepsOrchestrator(
+      NAMESPACE,
+      APP_ID,
+      LAUNCH_TIME,
+      Option.empty,
+      APP_NAME,
+      MAIN_CLASS,
+      APP_ARGS,
+      sparkConf)
+    validateStepTypes(
+      orchestrator,
+      classOf[BaseDriverConfigurationStep],
+      classOf[DriverServiceBootstrapStep],
+      classOf[DriverKubernetesCredentialsStep]
+    )
+  }
+
+  private def validateStepTypes(
+      orchestrator: DriverConfigurationStepsOrchestrator,
+      types: Class[_ <: DriverConfigurationStep]*): Unit = {
+    val steps = orchestrator.getAllConfigurationSteps()
+    assert(steps.size === types.size)
+    assert(steps.map(_.getClass) === types)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala
new file mode 100644
index 0000000..83c5f98
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+class BaseDriverConfigurationStepSuite extends SparkFunSuite {
+
+  private val APP_ID = "spark-app-id"
+  private val RESOURCE_NAME_PREFIX = "spark"
+  private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
+  private val DOCKER_IMAGE_PULL_POLICY = "IfNotPresent"
+  private val APP_NAME = "spark-test"
+  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
+  private val APP_ARGS = Array("arg1", "arg2", "arg 3")
+  private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
+  private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
+  private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
+  private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"
+
+  test("Set all possible configurations from the user.") {
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+      .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, 
"/opt/spark/spark-examples.jar")
+      .set("spark.driver.cores", "2")
+      .set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
+      .set(DRIVER_DOCKER_IMAGE, "spark-driver:latest")
+      .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", 
CUSTOM_ANNOTATION_VALUE)
+      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", 
"customDriverEnv1")
+      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", 
"customDriverEnv2")
+
+    val submissionStep = new BaseDriverConfigurationStep(
+      APP_ID,
+      RESOURCE_NAME_PREFIX,
+      DRIVER_LABELS,
+      DOCKER_IMAGE_PULL_POLICY,
+      APP_NAME,
+      MAIN_CLASS,
+      APP_ARGS,
+      sparkConf)
+    val basePod = new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
+    val baseDriverSpec = KubernetesDriverSpec(
+      driverPod = basePod,
+      driverContainer = new ContainerBuilder().build(),
+      driverSparkConf = new SparkConf(false),
+      otherKubernetesResources = Seq.empty[HasMetadata])
+    val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)
+
+    assert(preparedDriverSpec.driverContainer.getName === 
DRIVER_CONTAINER_NAME)
+    assert(preparedDriverSpec.driverContainer.getImage === 
"spark-driver:latest")
+    assert(preparedDriverSpec.driverContainer.getImagePullPolicy === 
DOCKER_IMAGE_PULL_POLICY)
+
+    assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
+    val envs = preparedDriverSpec.driverContainer
+      .getEnv
+      .asScala
+      .map(env => (env.getName, env.getValue))
+      .toMap
+    assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === 
"/opt/spark/spark-examples.jar")
+    assert(envs(ENV_DRIVER_MEMORY) === "256M")
+    assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
+    assert(envs(ENV_DRIVER_ARGS) === "\"arg1\" \"arg2\" \"arg 3\"")
+    assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
+    assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
+
+    assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar =>
+      envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
+        envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
+        envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))
+
+    val resourceRequirements = preparedDriverSpec.driverContainer.getResources
+    val requests = resourceRequirements.getRequests.asScala
+    assert(requests("cpu").getAmount === "2")
+    assert(requests("memory").getAmount === "256Mi")
+    val limits = resourceRequirements.getLimits.asScala
+    assert(limits("memory").getAmount === "456Mi")
+    assert(limits("cpu").getAmount === "4")
+
+    val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata
+    assert(driverPodMetadata.getName === "spark-driver-pod")
+    assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
+    val expectedAnnotations = Map(
+      CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
+      SPARK_APP_NAME_ANNOTATION -> APP_NAME)
+    assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
+    assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
+
+    val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
+    val expectedSparkConf = Map(
+      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
+      "spark.app.id" -> APP_ID,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX)
+    assert(resolvedSparkConf === expectedSparkConf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
new file mode 100644
index 0000000..991b03c
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+class DependencyResolutionStepSuite extends SparkFunSuite {
+
+  private val SPARK_JARS = Seq(
+    "hdfs://localhost:9000/apps/jars/jar1.jar",
+    "file:///home/user/apps/jars/jar2.jar",
+    "local:///var/apps/jars/jar3.jar")
+
+  private val SPARK_FILES = Seq(
+    "file:///home/user/apps/files/file1.txt",
+    "hdfs://localhost:9000/apps/files/file2.txt",
+    "local:///var/apps/files/file3.txt")
+
+  private val JARS_DOWNLOAD_PATH = "/mnt/spark-data/jars"
+  private val FILES_DOWNLOAD_PATH = "/mnt/spark-data/files"
+
+  test("Added dependencies should be resolved in Spark configuration and 
environment") {
+    val dependencyResolutionStep = new DependencyResolutionStep(
+      SPARK_JARS,
+      SPARK_FILES,
+      JARS_DOWNLOAD_PATH,
+      FILES_DOWNLOAD_PATH)
+    val driverPod = new PodBuilder().build()
+    val baseDriverSpec = KubernetesDriverSpec(
+      driverPod = driverPod,
+      driverContainer = new ContainerBuilder().build(),
+      driverSparkConf = new SparkConf(false),
+      otherKubernetesResources = Seq.empty[HasMetadata])
+    val preparedDriverSpec = 
dependencyResolutionStep.configureDriver(baseDriverSpec)
+    assert(preparedDriverSpec.driverPod === driverPod)
+    assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
+    val resolvedSparkJars = 
preparedDriverSpec.driverSparkConf.get("spark.jars").split(",").toSet
+    val expectedResolvedSparkJars = Set(
+      "hdfs://localhost:9000/apps/jars/jar1.jar",
+      s"$JARS_DOWNLOAD_PATH/jar2.jar",
+      "/var/apps/jars/jar3.jar")
+    assert(resolvedSparkJars === expectedResolvedSparkJars)
+    val resolvedSparkFiles = 
preparedDriverSpec.driverSparkConf.get("spark.files").split(",").toSet
+    val expectedResolvedSparkFiles = Set(
+      s"$FILES_DOWNLOAD_PATH/file1.txt",
+      s"hdfs://localhost:9000/apps/files/file2.txt",
+      s"/var/apps/files/file3.txt")
+    assert(resolvedSparkFiles === expectedResolvedSparkFiles)
+    val driverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
+    assert(driverEnv.size === 1)
+    assert(driverEnv.head.getName === ENV_MOUNTED_CLASSPATH)
+    val resolvedDriverClasspath = 
driverEnv.head.getValue.split(File.pathSeparator).toSet
+    val expectedResolvedDriverClasspath = Set(
+      s"$JARS_DOWNLOAD_PATH/jar1.jar",
+      s"$JARS_DOWNLOAD_PATH/jar2.jar",
+      "/var/apps/jars/jar3.jar")
+    assert(resolvedDriverClasspath === expectedResolvedDriverClasspath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
new file mode 100644
index 0000000..64553d2
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Charsets
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder, Secret}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.util.Utils
+
+class DriverKubernetesCredentialsStepSuite extends SparkFunSuite with 
BeforeAndAfter {
+
+  private val KUBERNETES_RESOURCE_NAME_PREFIX = "spark"
+  private var credentialsTempDirectory: File = _
+  private val BASE_DRIVER_SPEC = new KubernetesDriverSpec(
+    driverPod = new PodBuilder().build(),
+    driverContainer = new ContainerBuilder().build(),
+    driverSparkConf = new SparkConf(false),
+    otherKubernetesResources = Seq.empty[HasMetadata])
+
+  before {
+    credentialsTempDirectory = Utils.createTempDir()
+  }
+
+  after {
+    credentialsTempDirectory.delete()
+  }
+
+  test("Don't set any credentials") {
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
+        new SparkConf(false), KUBERNETES_RESOURCE_NAME_PREFIX)
+    val preparedDriverSpec = 
kubernetesCredentialsStep.configureDriver(BASE_DRIVER_SPEC)
+    assert(preparedDriverSpec.driverPod === BASE_DRIVER_SPEC.driverPod)
+    assert(preparedDriverSpec.driverContainer === 
BASE_DRIVER_SPEC.driverContainer)
+    assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
+    assert(preparedDriverSpec.driverSparkConf.getAll.isEmpty)
+  }
+
+  test("Only set credentials that are manually mounted.") {
+    val submissionSparkConf = new SparkConf(false)
+      .set(
+        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-token.txt")
+      .set(
+        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-key.pem")
+      .set(
+        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-cert.pem")
+      .set(
+        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-ca.pem")
+
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
+      submissionSparkConf, KUBERNETES_RESOURCE_NAME_PREFIX)
+    val preparedDriverSpec = 
kubernetesCredentialsStep.configureDriver(BASE_DRIVER_SPEC)
+    assert(preparedDriverSpec.driverPod === BASE_DRIVER_SPEC.driverPod)
+    assert(preparedDriverSpec.driverContainer === 
BASE_DRIVER_SPEC.driverContainer)
+    assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
+    assert(preparedDriverSpec.driverSparkConf.getAll.toMap === 
submissionSparkConf.getAll.toMap)
+  }
+
+  test("Mount credentials from the submission client as a secret.") {
+    val caCertFile = writeCredentials("ca.pem", "ca-cert")
+    val clientKeyFile = writeCredentials("key.pem", "key")
+    val clientCertFile = writeCredentials("cert.pem", "cert")
+    val submissionSparkConf = new SparkConf(false)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX",
+        "token")
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+        clientKeyFile.getAbsolutePath)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+        clientCertFile.getAbsolutePath)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+        caCertFile.getAbsolutePath)
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
+      submissionSparkConf, KUBERNETES_RESOURCE_NAME_PREFIX)
+    val preparedDriverSpec = kubernetesCredentialsStep.configureDriver(
+      BASE_DRIVER_SPEC.copy(driverSparkConf = submissionSparkConf))
+    val expectedSparkConf = Map(
+      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX" -> 
"<present_but_redacted>",
+      
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH,
+      
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_CLIENT_KEY_PATH,
+      
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_CLIENT_CERT_PATH,
+      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" 
->
+        DRIVER_CREDENTIALS_CA_CERT_PATH,
+      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
+        clientKeyFile.getAbsolutePath,
+      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
+        clientCertFile.getAbsolutePath,
+      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
+        caCertFile.getAbsolutePath)
+    assert(preparedDriverSpec.driverSparkConf.getAll.toMap === 
expectedSparkConf)
+    assert(preparedDriverSpec.otherKubernetesResources.size === 1)
+    val credentialsSecret = 
preparedDriverSpec.otherKubernetesResources.head.asInstanceOf[Secret]
+    assert(credentialsSecret.getMetadata.getName ===
+      s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials")
+    val decodedSecretData = credentialsSecret.getData.asScala.map { data =>
+      (data._1, new String(BaseEncoding.base64().decode(data._2), 
Charsets.UTF_8))
+    }
+    val expectedSecretData = Map(
+      DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME -> "ca-cert",
+      DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME -> "token",
+      DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME -> "key",
+      DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME -> "cert")
+    assert(decodedSecretData === expectedSecretData)
+    val driverPodVolumes = 
preparedDriverSpec.driverPod.getSpec.getVolumes.asScala
+    assert(driverPodVolumes.size === 1)
+    assert(driverPodVolumes.head.getName === 
DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+    assert(driverPodVolumes.head.getSecret != null)
+    assert(driverPodVolumes.head.getSecret.getSecretName === 
credentialsSecret.getMetadata.getName)
+    val driverContainerVolumeMount = 
preparedDriverSpec.driverContainer.getVolumeMounts.asScala
+    assert(driverContainerVolumeMount.size === 1)
+    assert(driverContainerVolumeMount.head.getName === 
DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+    assert(driverContainerVolumeMount.head.getMountPath === 
DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
+  }
+
+  private def writeCredentials(credentialsFileName: String, 
credentialsContents: String): File = {
+    val credentialsFile = new File(credentialsTempDirectory, 
credentialsFileName)
+    Files.write(credentialsContents, credentialsFile, Charsets.UTF_8)
+    credentialsFile
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
new file mode 100644
index 0000000..006ce26
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.Service
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.util.Clock
+
+class DriverServiceBootstrapStepSuite extends SparkFunSuite with 
BeforeAndAfter {
+
+  private val SHORT_RESOURCE_NAME_PREFIX =
+    "a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH -
+      DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length)
+
+  private val LONG_RESOURCE_NAME_PREFIX =
+    "a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH -
+      DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length + 1)
+  private val DRIVER_LABELS = Map(
+    "label1key" -> "label1value",
+    "label2key" -> "label2value")
+
+  @Mock
+  private var clock: Clock = _
+
+  private var sparkConf: SparkConf = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    sparkConf = new SparkConf(false)
+  }
+
+  test("Headless service has a port for the driver RPC and the block 
manager.") {
+    val configurationStep = new DriverServiceBootstrapStep(
+      SHORT_RESOURCE_NAME_PREFIX,
+      DRIVER_LABELS,
+      sparkConf
+        .set("spark.driver.port", "9000")
+        .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080),
+      clock)
+    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
+    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
+    assert(resolvedDriverSpec.otherKubernetesResources.size === 1)
+    
assert(resolvedDriverSpec.otherKubernetesResources.head.isInstanceOf[Service])
+    val driverService = 
resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
+    verifyService(
+      9000,
+      8080,
+      
s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}",
+      driverService)
+  }
+
+  test("Hostname and ports are set according to the service name.") {
+    val configurationStep = new DriverServiceBootstrapStep(
+      SHORT_RESOURCE_NAME_PREFIX,
+      DRIVER_LABELS,
+      sparkConf
+        .set("spark.driver.port", "9000")
+        .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
+        .set(KUBERNETES_NAMESPACE, "my-namespace"),
+      clock)
+    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
+    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
+    val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
+      DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX
+    val expectedHostName = 
s"$expectedServiceName.my-namespace.svc.cluster.local"
+    verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, 
expectedHostName)
+  }
+
+  test("Ports should resolve to defaults in SparkConf and in the service.") {
+    val configurationStep = new DriverServiceBootstrapStep(
+      SHORT_RESOURCE_NAME_PREFIX,
+      DRIVER_LABELS,
+      sparkConf,
+      clock)
+    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
+    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
+    verifyService(
+      DEFAULT_DRIVER_PORT,
+      DEFAULT_BLOCKMANAGER_PORT,
+      
s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}",
+      resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service])
+    assert(resolvedDriverSpec.driverSparkConf.get("spark.driver.port") ===
+      DEFAULT_DRIVER_PORT.toString)
+    assert(resolvedDriverSpec.driverSparkConf.get(
+      org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT) === 
DEFAULT_BLOCKMANAGER_PORT)
+  }
+
+  test("Long prefixes should switch to using a generated name.") {
+    val configurationStep = new DriverServiceBootstrapStep(
+      LONG_RESOURCE_NAME_PREFIX,
+      DRIVER_LABELS,
+      sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
+      clock)
+    when(clock.getTimeMillis()).thenReturn(10000)
+    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
+    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
+    val driverService = 
resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
+    val expectedServiceName = 
s"spark-10000${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}"
+    assert(driverService.getMetadata.getName === expectedServiceName)
+    val expectedHostName = 
s"$expectedServiceName.my-namespace.svc.cluster.local"
+    verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, 
expectedHostName)
+  }
+
+  test("Disallow bind address and driver host to be set explicitly.") {
+    val configurationStep = new DriverServiceBootstrapStep(
+      LONG_RESOURCE_NAME_PREFIX,
+      DRIVER_LABELS,
+      sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, 
"host"),
+      clock)
+    try {
+      
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
+      fail("The driver bind address should not be allowed.")
+    } catch {
+      case e: Throwable =>
+        assert(e.getMessage ===
+          s"requirement failed: 
${DriverServiceBootstrapStep.DRIVER_BIND_ADDRESS_KEY} is" +
+          " not supported in Kubernetes mode, as the driver's bind address is 
managed" +
+          " and set to the driver pod's IP address.")
+    }
+    sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS)
+    sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host")
+    try {
+      
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
+      fail("The driver host address should not be allowed.")
+    } catch {
+      case e: Throwable =>
+        assert(e.getMessage ===
+          s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_HOST_KEY} 
is" +
+          " not supported in Kubernetes mode, as the driver's hostname will be 
managed via" +
+          " a Kubernetes service.")
+    }
+  }
+
+  private def verifyService(
+      driverPort: Int,
+      blockManagerPort: Int,
+      expectedServiceName: String,
+      service: Service): Unit = {
+    assert(service.getMetadata.getName === expectedServiceName)
+    assert(service.getSpec.getClusterIP === "None")
+    assert(service.getSpec.getSelector.asScala === DRIVER_LABELS)
+    assert(service.getSpec.getPorts.size() === 2)
+    val driverServicePorts = service.getSpec.getPorts.asScala
+    assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
+    assert(driverServicePorts.head.getPort.intValue() === driverPort)
+    assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort)
+    assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
+    assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
+    assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
+  }
+
+  private def verifySparkConfHostNames(
+      driverSparkConf: SparkConf, expectedHostName: String): Unit = {
+    assert(driverSparkConf.get(
+      org.apache.spark.internal.config.DRIVER_HOST_ADDRESS) === 
expectedHostName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile
new file mode 100644
index 0000000..d163495
--- /dev/null
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# Before building the docker image, first build and make a Spark distribution 
following
+# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
+# If this docker file is being used in the context of building your images 
from a Spark
+# distribution, the docker build command should be invoked from the top level 
directory
+# of the Spark distribution. E.g.:
+# docker build -t spark-driver:latest -f dockerfiles/spark-base/Dockerfile .
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+    env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+    readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
+    if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+    if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+    ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp 
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile
new file mode 100644
index 0000000..0e38169
--- /dev/null
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# Before building the docker image, first build and make a Spark distribution 
following
+# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
+# If this docker file is being used in the context of building your images 
from a Spark
+# distribution, the docker build command should be invoked from the top level 
directory
+# of the Spark distribution. E.g.:
+# docker build -t spark-executor:latest -f dockerfiles/spark-base/Dockerfile .
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+    env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+    readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
+    if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+    if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+    ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" 
-Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
$SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores 
$SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname 
$SPARK_EXECUTOR_POD_IP

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile
 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile
new file mode 100644
index 0000000..20316c9
--- /dev/null
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+# Before building the docker image, first build and make a Spark distribution 
following
+# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
+# If this docker file is being used in the context of building your images 
from a Spark
+# distribution, the docker build command should be invoked from the top level 
directory
+# of the Spark distribution. E.g.:
+# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .
+
+RUN set -ex && \
+    apk upgrade --no-cache && \
+    apk add --no-cache bash tini libc6-compat && \
+    mkdir -p /opt/spark && \
+    mkdir -p /opt/spark/work-dir \
+    touch /opt/spark/RELEASE && \
+    rm /bin/sh && \
+    ln -sv /bin/bash /bin/sh && \
+    chgrp root /etc/passwd && chmod ug+rw /etc/passwd
+
+COPY jars /opt/spark/jars
+COPY bin /opt/spark/bin
+COPY sbin /opt/spark/sbin
+COPY conf /opt/spark/conf
+COPY dockerfiles/spark-base/entrypoint.sh /opt/
+
+ENV SPARK_HOME /opt/spark
+
+WORKDIR /opt/spark/work-dir
+
+ENTRYPOINT [ "/opt/entrypoint.sh" ]

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
new file mode 100644
index 0000000..8255988
--- /dev/null
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# echo commands to the terminal output
+set -ex
+
+# Check whether there is a passwd entry for the container UID
+myuid=$(id -u)
+mygid=$(id -g)
+uidentry=$(getent passwd $myuid)
+
+# If there is no passwd entry for the container UID, attempt to create one
+if [ -z "$uidentry" ] ; then
+    if [ -w /etc/passwd ] ; then
+        echo "$myuid:x:$myuid:$mygid:anonymous uid:$SPARK_HOME:/bin/false" >> 
/etc/passwd
+    else
+        echo "Container ENTRYPOINT failed to add passwd entry for anonymous 
UID"
+    fi
+fi
+
+# Execute the container CMD under tini for better hygiene
+/sbin/tini -s -- "$@"

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4060c3/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index e1af8ba..3ba3ae5 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -217,20 +217,12 @@ package object config {
     .intConf
     .createWithDefault(1)
 
-  private[spark] val DRIVER_MEMORY_OVERHEAD = 
ConfigBuilder("spark.yarn.driver.memoryOverhead")
-    .bytesConf(ByteUnit.MiB)
-    .createOptional
-
   /* Executor configuration. */
 
   private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
     .intConf
     .createWithDefault(1)
 
-  private[spark] val EXECUTOR_MEMORY_OVERHEAD = 
ConfigBuilder("spark.yarn.executor.memoryOverhead")
-    .bytesConf(ByteUnit.MiB)
-    .createOptional
-
   private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
     ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
       .doc("Node label expression for executors.")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to