This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5885cc1  [SPARK-33261][K8S] Add a developer API for custom feature 
steps
5885cc1 is described below

commit 5885cc15cae9c9780530e235d2bd4bd6beda5dbb
Author: Holden Karau <hka...@apple.com>
AuthorDate: Mon Dec 14 12:05:28 2020 -0800

    [SPARK-33261][K8S] Add a developer API for custom feature steps
    
    ### What changes were proposed in this pull request?
    
    Add a developer API for custom driver & executor feature steps.
    
    ### Why are the changes needed?
    
    While we allow templates for the basis of pod creation, some deployments 
need more flexibility in how the pods are configured. This adds a developer API 
for custom deployments.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New developer API.
    
    ### How was this patch tested?
    
    Extended tests to verify custom step is applied when configured.
    
    Closes #30206 from 
holdenk/SPARK-33261-allow-people-to-extend-pod-feature-steps.
    
    Authored-by: Holden Karau <hka...@apple.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 20 ++++++
 .../org/apache/spark/deploy/k8s/SparkPod.scala     | 11 +++-
 .../k8s/features/KubernetesFeatureConfigStep.scala |  7 +-
 .../k8s/submit/KubernetesDriverBuilder.scala       |  8 ++-
 .../cluster/k8s/KubernetesExecutorBuilder.scala    |  8 ++-
 .../apache/spark/deploy/k8s/PodBuilderSuite.scala  | 76 ++++++++++++++++++++++
 .../k8s/submit/KubernetesDriverBuilderSuite.scala  |  5 +-
 .../k8s/KubernetesExecutorBuilderSuite.scala       |  4 ++
 8 files changed, 134 insertions(+), 5 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index c28d6fd..40609ae 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -219,6 +219,26 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
+  val KUBERNETES_DRIVER_POD_FEATURE_STEPS =
+    ConfigBuilder("spark.kubernetes.driver.pod.featureSteps")
+      .doc("Class names of an extra driver pod feature step implementing " +
+        "KubernetesFeatureConfigStep. This is a developer API. Comma 
separated. " +
+        "Runs after all of Spark internal feature steps.")
+      .version("3.2.0")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
+
+  val KUBERNETES_EXECUTOR_POD_FEATURE_STEPS =
+    ConfigBuilder("spark.kubernetes.executor.pod.featureSteps")
+      .doc("Class name of an extra executor pod feature step implementing " +
+        "KubernetesFeatureConfigStep. This is a developer API. Comma 
separated. " +
+        "Runs after all of Spark internal feature steps.")
+      .version("3.2.0")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
+
   val KUBERNETES_ALLOCATION_BATCH_SIZE =
     ConfigBuilder("spark.kubernetes.allocation.batch.size")
       .doc("Number of pods to launch at once in each round of executor 
allocation.")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
index fd11963..c2298e7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
@@ -18,7 +18,16 @@ package org.apache.spark.deploy.k8s
 
 import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, 
PodBuilder}
 
-private[spark] case class SparkPod(pod: Pod, container: Container) {
+import org.apache.spark.annotation.{DeveloperApi, Unstable}
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Represents a SparkPod consisting of pod and the container within the pod.
+ */
+@Unstable
+@DeveloperApi
+case class SparkPod(pod: Pod, container: Container) {
 
   /**
    * Convenience method to apply a series of chained transformations to a pod.
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
index 58cdaa3..3fec926 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
@@ -18,13 +18,18 @@ package org.apache.spark.deploy.k8s.features
 
 import io.fabric8.kubernetes.api.model.HasMetadata
 
+import org.apache.spark.annotation.{DeveloperApi, Unstable}
 import org.apache.spark.deploy.k8s.SparkPod
 
 /**
+ * :: DeveloperApi ::
+ *
  * A collection of functions that together represent a "feature" in pods that 
are launched for
  * Spark drivers and executors.
  */
-private[spark] trait KubernetesFeatureConfigStep {
+@Unstable
+@DeveloperApi
+trait KubernetesFeatureConfigStep {
 
   /**
    * Apply modifications on the given pod in accordance to this feature. This 
can include attaching
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index 43639a3..3b38dd6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -22,6 +22,7 @@ import io.fabric8.kubernetes.client.KubernetesClient
 
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.features._
+import org.apache.spark.util.Utils
 
 private[spark] class KubernetesDriverBuilder {
 
@@ -37,6 +38,11 @@ private[spark] class KubernetesDriverBuilder {
       }
       .getOrElse(SparkPod.initialPod())
 
+    val userFeatures = conf.get(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS)
+      .map { className =>
+        
Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep]
+      }
+
     val features = Seq(
       new BasicDriverFeatureStep(conf),
       new DriverKubernetesCredentialsFeatureStep(conf),
@@ -48,7 +54,7 @@ private[spark] class KubernetesDriverBuilder {
       new HadoopConfDriverFeatureStep(conf),
       new KerberosConfDriverFeatureStep(conf),
       new PodTemplateConfigMapStep(conf),
-      new LocalDirsFeatureStep(conf))
+      new LocalDirsFeatureStep(conf)) ++ userFeatures
 
     val spec = KubernetesDriverSpec(
       initialPod,
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 5388d18..43328c7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SecurityManager
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.features._
 import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.util.Utils
 
 private[spark] class KubernetesExecutorBuilder {
 
@@ -41,13 +42,18 @@ private[spark] class KubernetesExecutorBuilder {
       }
       .getOrElse(SparkPod.initialPod())
 
+    val userFeatures = conf.get(Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS)
+      .map { className =>
+        
Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep]
+      }
+
     val features = Seq(
       new BasicExecutorFeatureStep(conf, secMgr, resourceProfile),
       new ExecutorKubernetesCredentialsFeatureStep(conf),
       new MountSecretsFeatureStep(conf),
       new EnvSecretsFeatureStep(conf),
       new MountVolumesFeatureStep(conf),
-      new LocalDirsFeatureStep(conf))
+      new LocalDirsFeatureStep(conf)) ++ userFeatures
 
     val spec = KubernetesExecutorSpec(
       initialPod,
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
index 4d4c4ba..21a5b7a 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
@@ -26,12 +26,15 @@ import org.mockito.Mockito.{mock, never, verify, when}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
 import org.apache.spark.internal.config.ConfigEntry
 
 abstract class PodBuilderSuite extends SparkFunSuite {
 
   protected def templateFileConf: ConfigEntry[_]
 
+  protected def userFeatureStepsConf: ConfigEntry[_]
+
   protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): 
SparkPod
 
   private val baseConf = new SparkConf(false)
@@ -50,6 +53,19 @@ abstract class PodBuilderSuite extends SparkFunSuite {
     verifyPod(pod)
   }
 
+  test("configure a custom test step") {
+    val client = mockKubernetesClient()
+    val sparkConf = baseConf.clone()
+      .set(userFeatureStepsConf.key,
+        "org.apache.spark.deploy.k8s.TestStepTwo," +
+        "org.apache.spark.deploy.k8s.TestStep")
+      .set(templateFileConf.key, "template-file.yaml")
+    val pod = buildPod(sparkConf, client)
+    verifyPod(pod)
+    assert(pod.container.getVolumeMounts.asScala.exists(_.getName == 
"so_long"))
+    assert(pod.container.getVolumeMounts.asScala.exists(_.getName == 
"so_long_two"))
+  }
+
   test("complain about misconfigured pod template") {
     val client = mockKubernetesClient(
       new PodBuilder()
@@ -173,3 +189,63 @@ abstract class PodBuilderSuite extends SparkFunSuite {
   }
 
 }
+
+/**
+ * A test user feature step.
+ */
+class TestStep extends KubernetesFeatureConfigStep {
+  import io.fabric8.kubernetes.api.model._
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val localDirVolumes = Seq(new VolumeBuilder().withName("so_long").build())
+    val localDirVolumeMounts = Seq(
+      new VolumeMountBuilder().withName("so_long")
+        .withMountPath("and_thanks_for_all_the_fish")
+        .build()
+    )
+
+    val podWithLocalDirVolumes = new PodBuilder(pod.pod)
+      .editSpec()
+        .addToVolumes(localDirVolumes: _*)
+        .endSpec()
+      .build()
+    val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
+      .addNewEnv()
+        .withName("CUSTOM_SPARK_LOCAL_DIRS")
+        .withValue("fishyfishyfishy")
+        .endEnv()
+      .addToVolumeMounts(localDirVolumeMounts: _*)
+      .build()
+    SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
+  }
+}
+
+/**
+ * A test user feature step.
+ */
+class TestStepTwo extends KubernetesFeatureConfigStep {
+  import io.fabric8.kubernetes.api.model._
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val localDirVolumes = Seq(new 
VolumeBuilder().withName("so_long_two").build())
+    val localDirVolumeMounts = Seq(
+      new VolumeMountBuilder().withName("so_long_two")
+        .withMountPath("and_thanks_for_all_the_fish_eh")
+        .build()
+    )
+
+    val podWithLocalDirVolumes = new PodBuilder(pod.pod)
+      .editSpec()
+        .addToVolumes(localDirVolumes: _*)
+        .endSpec()
+      .build()
+    val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
+      .addNewEnv()
+        .withName("CUSTOM_SPARK_LOCAL_DIRS_TWO")
+        .withValue("fishyfishyfishyTWO")
+        .endEnv()
+      .addToVolumeMounts(localDirVolumeMounts: _*)
+      .build()
+    SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index 6518c91..f9802ff96 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -28,9 +28,12 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite {
     Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE
   }
 
+  override protected def userFeatureStepsConf: ConfigEntry[_] = {
+    Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS
+  }
+
   override protected def buildPod(sparkConf: SparkConf, client: 
KubernetesClient): SparkPod = {
     val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
     new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod
   }
-
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index c64b733..ec60c6f 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -29,6 +29,10 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite 
{
     Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE
   }
 
+  override protected def userFeatureStepsConf: ConfigEntry[_] = {
+    Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS
+  }
+
   override protected def buildPod(sparkConf: SparkConf, client: 
KubernetesClient): SparkPod = {
     sparkConf.set("spark.driver.host", "https://driver.host.com";)
     val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to