This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5885cc1 [SPARK-33261][K8S] Add a developer API for custom feature steps 5885cc1 is described below commit 5885cc15cae9c9780530e235d2bd4bd6beda5dbb Author: Holden Karau <hka...@apple.com> AuthorDate: Mon Dec 14 12:05:28 2020 -0800 [SPARK-33261][K8S] Add a developer API for custom feature steps ### What changes were proposed in this pull request? Add a developer API for custom driver & executor feature steps. ### Why are the changes needed? While we allow templates for the basis of pod creation, some deployments need more flexibility in how the pods are configured. This adds a developer API for custom deployments. ### Does this PR introduce _any_ user-facing change? New developer API. ### How was this patch tested? Extended tests to verify custom step is applied when configured. Closes #30206 from holdenk/SPARK-33261-allow-people-to-extend-pod-feature-steps. Authored-by: Holden Karau <hka...@apple.com> Signed-off-by: Holden Karau <hka...@apple.com> --- .../scala/org/apache/spark/deploy/k8s/Config.scala | 20 ++++++ .../org/apache/spark/deploy/k8s/SparkPod.scala | 11 +++- .../k8s/features/KubernetesFeatureConfigStep.scala | 7 +- .../k8s/submit/KubernetesDriverBuilder.scala | 8 ++- .../cluster/k8s/KubernetesExecutorBuilder.scala | 8 ++- .../apache/spark/deploy/k8s/PodBuilderSuite.scala | 76 ++++++++++++++++++++++ .../k8s/submit/KubernetesDriverBuilderSuite.scala | 5 +- .../k8s/KubernetesExecutorBuilderSuite.scala | 4 ++ 8 files changed, 134 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index c28d6fd..40609ae 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -219,6 +219,26 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_DRIVER_POD_FEATURE_STEPS = + ConfigBuilder("spark.kubernetes.driver.pod.featureSteps") + .doc("Class names of an extra driver pod feature step implementing " + + "KubernetesFeatureConfigStep. This is a developer API. Comma separated. " + + "Runs after all of Spark internal feature steps.") + .version("3.2.0") + .stringConf + .toSequence + .createWithDefault(Nil) + + val KUBERNETES_EXECUTOR_POD_FEATURE_STEPS = + ConfigBuilder("spark.kubernetes.executor.pod.featureSteps") + .doc("Class name of an extra executor pod feature step implementing " + + "KubernetesFeatureConfigStep. This is a developer API. Comma separated. " + + "Runs after all of Spark internal feature steps.") + .version("3.2.0") + .stringConf + .toSequence + .createWithDefault(Nil) + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala index fd11963..c2298e7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala @@ -18,7 +18,16 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} -private[spark] case class SparkPod(pod: Pod, container: Container) { +import org.apache.spark.annotation.{DeveloperApi, Unstable} + +/** + * :: DeveloperApi :: + * + * Represents a SparkPod consisting of pod and the container within the pod. + */ +@Unstable +@DeveloperApi +case class SparkPod(pod: Pod, container: Container) { /** * Convenience method to apply a series of chained transformations to a pod. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala index 58cdaa3..3fec926 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala @@ -18,13 +18,18 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.HasMetadata +import org.apache.spark.annotation.{DeveloperApi, Unstable} import org.apache.spark.deploy.k8s.SparkPod /** + * :: DeveloperApi :: + * * A collection of functions that together represent a "feature" in pods that are launched for * Spark drivers and executors. */ -private[spark] trait KubernetesFeatureConfigStep { +@Unstable +@DeveloperApi +trait KubernetesFeatureConfigStep { /** * Apply modifications on the given pod in accordance to this feature. This can include attaching diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 43639a3..3b38dd6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -22,6 +22,7 @@ import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ +import org.apache.spark.util.Utils private[spark] class KubernetesDriverBuilder { @@ -37,6 +38,11 @@ private[spark] class KubernetesDriverBuilder { } .getOrElse(SparkPod.initialPod()) + val userFeatures = conf.get(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS) + .map { className => + Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep] + } + val features = Seq( new BasicDriverFeatureStep(conf), new DriverKubernetesCredentialsFeatureStep(conf), @@ -48,7 +54,7 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) + new LocalDirsFeatureStep(conf)) ++ userFeatures val spec = KubernetesDriverSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 5388d18..43328c7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -24,6 +24,7 @@ import org.apache.spark.SecurityManager import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ import org.apache.spark.resource.ResourceProfile +import org.apache.spark.util.Utils private[spark] class KubernetesExecutorBuilder { @@ -41,13 +42,18 @@ private[spark] class KubernetesExecutorBuilder { } .getOrElse(SparkPod.initialPod()) + val userFeatures = conf.get(Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS) + .map { className => + Utils.classForName(className).newInstance().asInstanceOf[KubernetesFeatureConfigStep] + } + val features = Seq( new BasicExecutorFeatureStep(conf, secMgr, resourceProfile), new ExecutorKubernetesCredentialsFeatureStep(conf), new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), - new LocalDirsFeatureStep(conf)) + new LocalDirsFeatureStep(conf)) ++ userFeatures val spec = KubernetesExecutorSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala index 4d4c4ba..21a5b7a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala @@ -26,12 +26,15 @@ import org.mockito.Mockito.{mock, never, verify, when} import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep import org.apache.spark.internal.config.ConfigEntry abstract class PodBuilderSuite extends SparkFunSuite { protected def templateFileConf: ConfigEntry[_] + protected def userFeatureStepsConf: ConfigEntry[_] + protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod private val baseConf = new SparkConf(false) @@ -50,6 +53,19 @@ abstract class PodBuilderSuite extends SparkFunSuite { verifyPod(pod) } + test("configure a custom test step") { + val client = mockKubernetesClient() + val sparkConf = baseConf.clone() + .set(userFeatureStepsConf.key, + "org.apache.spark.deploy.k8s.TestStepTwo," + + "org.apache.spark.deploy.k8s.TestStep") + .set(templateFileConf.key, "template-file.yaml") + val pod = buildPod(sparkConf, client) + verifyPod(pod) + assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long")) + assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "so_long_two")) + } + test("complain about misconfigured pod template") { val client = mockKubernetesClient( new PodBuilder() @@ -173,3 +189,63 @@ abstract class PodBuilderSuite extends SparkFunSuite { } } + +/** + * A test user feature step. + */ +class TestStep extends KubernetesFeatureConfigStep { + import io.fabric8.kubernetes.api.model._ + + override def configurePod(pod: SparkPod): SparkPod = { + val localDirVolumes = Seq(new VolumeBuilder().withName("so_long").build()) + val localDirVolumeMounts = Seq( + new VolumeMountBuilder().withName("so_long") + .withMountPath("and_thanks_for_all_the_fish") + .build() + ) + + val podWithLocalDirVolumes = new PodBuilder(pod.pod) + .editSpec() + .addToVolumes(localDirVolumes: _*) + .endSpec() + .build() + val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) + .addNewEnv() + .withName("CUSTOM_SPARK_LOCAL_DIRS") + .withValue("fishyfishyfishy") + .endEnv() + .addToVolumeMounts(localDirVolumeMounts: _*) + .build() + SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) + } +} + +/** + * A test user feature step. + */ +class TestStepTwo extends KubernetesFeatureConfigStep { + import io.fabric8.kubernetes.api.model._ + + override def configurePod(pod: SparkPod): SparkPod = { + val localDirVolumes = Seq(new VolumeBuilder().withName("so_long_two").build()) + val localDirVolumeMounts = Seq( + new VolumeMountBuilder().withName("so_long_two") + .withMountPath("and_thanks_for_all_the_fish_eh") + .build() + ) + + val podWithLocalDirVolumes = new PodBuilder(pod.pod) + .editSpec() + .addToVolumes(localDirVolumes: _*) + .endSpec() + .build() + val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) + .addNewEnv() + .withName("CUSTOM_SPARK_LOCAL_DIRS_TWO") + .withValue("fishyfishyfishyTWO") + .endEnv() + .addToVolumeMounts(localDirVolumeMounts: _*) + .build() + SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 6518c91..f9802ff96 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -28,9 +28,12 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite { Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE } + override protected def userFeatureStepsConf: ConfigEntry[_] = { + Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS + } + override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = { val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod } - } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index c64b733..ec60c6f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -29,6 +29,10 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite { Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE } + override protected def userFeatureStepsConf: ConfigEntry[_] = { + Config.KUBERNETES_EXECUTOR_POD_FEATURE_STEPS + } + override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = { sparkConf.set("spark.driver.host", "https://driver.host.com") val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org