Repository: spark
Updated Branches:
  refs/heads/master a35523653 -> cccaaa14a


[SPARK-23668][K8S] Add config option for passing through k8s 
Pod.spec.imagePullSecrets

## What changes were proposed in this pull request?

Pass through the `imagePullSecrets` option to the k8s pod in order to allow 
user to access private image registries.

See 
https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/

## How was this patch tested?

Unit tests + manual testing.

Manual testing procedure:
1. Have private image registry.
2. Spark-submit application with no `spark.kubernetes.imagePullSecret` set. Do 
`kubectl describe pod ...`. See the error message:
```
Error syncing pod, skipping: failed to "StartContainer" for 
"spark-kubernetes-driver" with ErrImagePull: "rpc error: code = 2 desc = Error: 
Status 400 trying to pull repository ...: \"{\\n  \\\"errors\\\" : [ {\\n    
\\\"status\\\" : 400,\\n    \\\"message\\\" : \\\"Unsupported docker v1 
repository request for '...'\\\"\\n  } ]\\n}\""
```
3. Create secret `kubectl create secret docker-registry ...`
4. Spark-submit with `spark.kubernetes.imagePullSecret` set to the new secret. 
See that deployment was successful.

Author: Andrew Korzhuev <andrew.korzh...@klarna.com>
Author: Andrew Korzhuev <korzh...@andrusha.me>

Closes #20811 from andrusha/spark-23668-image-pull-secrets.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cccaaa14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cccaaa14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cccaaa14

Branch: refs/heads/master
Commit: cccaaa14ad775fb981e501452ba2cc06ff5c0f0a
Parents: a355236
Author: Andrew Korzhuev <andrew.korzh...@klarna.com>
Authored: Wed Apr 4 12:30:52 2018 -0700
Committer: Anirudh Ramanathan <ramanath...@google.com>
Committed: Wed Apr 4 12:30:52 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/k8s/Config.scala    |  7 ++++
 .../spark/deploy/k8s/KubernetesUtils.scala      | 13 +++++++
 .../steps/BasicDriverConfigurationStep.scala    |  7 +++-
 .../cluster/k8s/ExecutorPodFactory.scala        |  4 +++
 .../spark/deploy/k8s/KubernetesUtilsTest.scala  | 36 ++++++++++++++++++++
 .../BasicDriverConfigurationStepSuite.scala     |  8 ++++-
 .../cluster/k8s/ExecutorPodFactorySuite.scala   |  5 +++
 7 files changed, 78 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cccaaa14/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 405ea47..82f6c71 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -54,6 +54,13 @@ private[spark] object Config extends Logging {
       .checkValues(Set("Always", "Never", "IfNotPresent"))
       .createWithDefault("IfNotPresent")
 
+  val IMAGE_PULL_SECRETS =
+    ConfigBuilder("spark.kubernetes.container.image.pullSecrets")
+      .doc("Comma separated list of the Kubernetes secrets used " +
+        "to access private image registries.")
+      .stringConf
+      .createOptional
+
   val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
       "spark.kubernetes.authenticate.driver"
   val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =

http://git-wip-us.apache.org/repos/asf/spark/blob/cccaaa14/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 5bc0701..5b2bb81 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.deploy.k8s
 
+import io.fabric8.kubernetes.api.model.LocalObjectReference
+
 import org.apache.spark.SparkConf
 import org.apache.spark.util.Utils
 
@@ -35,6 +37,17 @@ private[spark] object KubernetesUtils {
     sparkConf.getAllWithPrefix(prefix).toMap
   }
 
+  /**
+   * Parses comma-separated list of imagePullSecrets into K8s-understandable 
format
+   */
+  def parseImagePullSecrets(imagePullSecrets: Option[String]): 
List[LocalObjectReference] = {
+    imagePullSecrets match {
+      case Some(secretsCommaSeparated) =>
+        secretsCommaSeparated.split(',').map(_.trim).map(new 
LocalObjectReference(_)).toList
+      case None => Nil
+    }
+  }
+
   def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: 
String): Unit = {
     opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cccaaa14/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
index b811db3..fcb1db8 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.steps
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+import io.fabric8.kubernetes.api.model._
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.k8s.Config._
@@ -51,6 +51,8 @@ private[spark] class BasicDriverConfigurationStep(
     .get(DRIVER_CONTAINER_IMAGE)
     .getOrElse(throw new SparkException("Must specify the driver container 
image"))
 
+  private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
+
   // CPU settings
   private val driverCpuCores = 
sparkConf.getOption("spark.driver.cores").getOrElse("1")
   private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
@@ -129,6 +131,8 @@ private[spark] class BasicDriverConfigurationStep(
       case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build()
     }
 
+    val parsedImagePullSecrets = 
KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
+
     val baseDriverPod = new PodBuilder(driverSpec.driverPod)
       .editOrNewMetadata()
         .withName(driverPodName)
@@ -138,6 +142,7 @@ private[spark] class BasicDriverConfigurationStep(
       .withNewSpec()
         .withRestartPolicy("Never")
         .withNodeSelector(nodeSelector.asJava)
+        .withImagePullSecrets(parsedImagePullSecrets.asJava)
         .endSpec()
       .build()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cccaaa14/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
index 7143f7a..8607d6f 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -68,6 +68,7 @@ private[spark] class ExecutorPodFactory(
     .get(EXECUTOR_CONTAINER_IMAGE)
     .getOrElse(throw new SparkException("Must specify the executor container 
image"))
   private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+  private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
   private val blockManagerPort = sparkConf
     .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
 
@@ -103,6 +104,8 @@ private[spark] class ExecutorPodFactory(
       nodeToLocalTaskCount: Map[String, Int]): Pod = {
     val name = s"$executorPodNamePrefix-exec-$executorId"
 
+    val parsedImagePullSecrets = 
KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
+
     // hostname must be no longer than 63 characters, so take the last 63 
characters of the pod
     // name as the hostname.  This preserves uniqueness since the end of name 
contains
     // executorId
@@ -194,6 +197,7 @@ private[spark] class ExecutorPodFactory(
         .withHostname(hostname)
         .withRestartPolicy("Never")
         .withNodeSelector(nodeSelector.asJava)
+        .withImagePullSecrets(parsedImagePullSecrets.asJava)
         .endSpec()
       .build()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cccaaa14/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
new file mode 100644
index 0000000..cf41b22
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.LocalObjectReference
+
+import org.apache.spark.SparkFunSuite
+
+class KubernetesUtilsTest extends SparkFunSuite {
+
+  test("testParseImagePullSecrets") {
+    val noSecrets = KubernetesUtils.parseImagePullSecrets(None)
+    assert(noSecrets === Nil)
+
+    val oneSecret = 
KubernetesUtils.parseImagePullSecrets(Some("imagePullSecret"))
+    assert(oneSecret === new LocalObjectReference("imagePullSecret") :: Nil)
+
+    val commaSeparatedSecrets = 
KubernetesUtils.parseImagePullSecrets(Some("s1, s2  , s3,s4"))
+    assert(commaSeparatedSecrets.map(_.getName) === "s1" :: "s2" :: "s3" :: 
"s4" :: Nil)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cccaaa14/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
index e59c6d2..ee450ff 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
@@ -51,6 +51,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite 
{
       .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", 
CUSTOM_ANNOTATION_VALUE)
       .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", 
"customDriverEnv1")
       .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", 
"customDriverEnv2")
+      .set(IMAGE_PULL_SECRETS, "imagePullSecret1, imagePullSecret2")
 
     val submissionStep = new BasicDriverConfigurationStep(
       APP_ID,
@@ -103,7 +104,12 @@ class BasicDriverConfigurationStepSuite extends 
SparkFunSuite {
       CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
       SPARK_APP_NAME_ANNOTATION -> APP_NAME)
     assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
-    assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
+
+    val driverPodSpec = preparedDriverSpec.driverPod.getSpec
+    assert(driverPodSpec.getRestartPolicy === "Never")
+    assert(driverPodSpec.getImagePullSecrets.size() === 2)
+    assert(driverPodSpec.getImagePullSecrets.get(0).getName === 
"imagePullSecret1")
+    assert(driverPodSpec.getImagePullSecrets.get(1).getName === 
"imagePullSecret2")
 
     val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
     val expectedSparkConf = Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/cccaaa14/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
index a71a2a1..d73df20 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
@@ -33,6 +33,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
   private val driverPodUid: String = "driver-uid"
   private val executorPrefix: String = "base"
   private val executorImage: String = "executor-image"
+  private val imagePullSecrets: String = "imagePullSecret1, imagePullSecret2"
   private val driverPod = new PodBuilder()
     .withNewMetadata()
     .withName(driverPodName)
@@ -54,6 +55,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
       .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
       .set(CONTAINER_IMAGE, executorImage)
       .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
+      .set(IMAGE_PULL_SECRETS, imagePullSecrets)
   }
 
   test("basic executor pod has reasonable defaults") {
@@ -76,6 +78,9 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
       .getRequests.get("memory").getAmount === "1408Mi")
     assert(executor.getSpec.getContainers.get(0).getResources
       .getLimits.get("memory").getAmount === "1408Mi")
+    assert(executor.getSpec.getImagePullSecrets.size() === 2)
+    assert(executor.getSpec.getImagePullSecrets.get(0).getName === 
"imagePullSecret1")
+    assert(executor.getSpec.getImagePullSecrets.get(1).getName === 
"imagePullSecret2")
 
     // The pod has no node selector, volumes.
     assert(executor.getSpec.getNodeSelector.isEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to