Repository: spark Updated Branches: refs/heads/master 2512a1d42 -> 3df307aa5
[SPARK-25960][K8S] Support subpath mounting with Kubernetes ## What changes were proposed in this pull request? This PR adds configurations to use subpaths with Spark on k8s. Subpaths (https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath) allow the user to specify a path within a volume to use instead of the volume's root. ## How was this patch tested? Added unit tests. Ran SparkPi on a cluster with event logging pointed at a subpath-mount and verified the driver host created and used the subpath. Closes #23026 from NiharS/k8s_subpath. Authored-by: Nihar Sheth <niharrsh...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3df307aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3df307aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3df307aa Branch: refs/heads/master Commit: 3df307aa515b3564686e75d1b71754bbcaaf2dec Parents: 2512a1d Author: Nihar Sheth <niharrsh...@gmail.com> Authored: Mon Nov 26 11:06:02 2018 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Mon Nov 26 11:06:02 2018 -0800 ---------------------------------------------------------------------- docs/running-on-kubernetes.md | 17 +++++ .../org/apache/spark/deploy/k8s/Config.scala | 1 + .../spark/deploy/k8s/KubernetesVolumeSpec.scala | 1 + .../deploy/k8s/KubernetesVolumeUtils.scala | 2 + .../k8s/features/MountVolumesFeatureStep.scala | 1 + .../deploy/k8s/KubernetesVolumeUtilsSuite.scala | 12 +++ .../features/MountVolumesFeatureStepSuite.scala | 79 ++++++++++++++++++++ .../submit/KubernetesDriverBuilderSuite.scala | 34 +++++++++ .../k8s/KubernetesExecutorBuilderSuite.scala | 1 + 9 files changed, 148 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/docs/running-on-kubernetes.md ---------------------------------------------------------------------- diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index e940d9a..2c01e1e 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -245,6 +245,7 @@ To mount a volume of any of the types above into the driver pod, use the followi ``` --conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path=<mount path> --conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly=<true|false> +--conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath=<mount subPath> ``` Specifically, `VolumeType` can be one of the following values: `hostPath`, `emptyDir`, and `persistentVolumeClaim`. `VolumeName` is the name you want to use for the volume under the `volumes` field in the pod specification. @@ -807,6 +808,14 @@ specific to Spark on Kubernetes. </td> </tr> <tr> + <td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath</code></td> + <td>(none)</td> + <td> + Specifies a <a href="https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath">subpath</a> to be mounted from the volume into the driver pod. + <code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint</code>. + </td> +</tr> +<tr> <td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td> <td>(none)</td> <td> @@ -831,6 +840,14 @@ specific to Spark on Kubernetes. </td> </tr> <tr> + <td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPath</code></td> + <td>(none)</td> + <td> + Specifies a <a href="https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath">subpath</a> to be mounted from the volume into the executor pod. + <code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint</code>. + </td> +</tr> +<tr> <td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td> <td>false</td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index a32bd93..724acd2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -297,6 +297,7 @@ private[spark] object Config extends Logging { val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim" val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir" val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path" + val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath" val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly" val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path" val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName" http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index b1762d1..1a214fa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -34,5 +34,6 @@ private[spark] case class KubernetesEmptyDirVolumeConf( private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf]( volumeName: String, mountPath: String, + mountSubPath: String, mountReadOnly: Boolean, volumeConf: T) http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index 713df5f..1553264 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -39,6 +39,7 @@ private[spark] object KubernetesVolumeUtils { getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) => val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY" val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY" + val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY" for { path <- properties.getTry(pathKey) @@ -46,6 +47,7 @@ private[spark] object KubernetesVolumeUtils { } yield KubernetesVolumeSpec( volumeName = volumeName, mountPath = path, + mountSubPath = properties.get(subPathKey).getOrElse(""), mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean), volumeConf = volumeConf ) http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index e60259c..1473a7d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -51,6 +51,7 @@ private[spark] class MountVolumesFeatureStep( val volumeMount = new VolumeMountBuilder() .withMountPath(spec.mountPath) .withReadOnly(spec.mountReadOnly) + .withSubPath(spec.mountSubPath) .withName(spec.volumeName) .build() http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala index d795d15..de79a58 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -33,6 +33,18 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { KubernetesHostPathVolumeConf("/hostPath")) } + test("Parses subPath correctly") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.emptyDir.volumeName.mount.path", "/path") + sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true") + sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + assert(volumeSpec.volumeName === "volumeName") + assert(volumeSpec.mountPath === "/path") + assert(volumeSpec.mountSubPath === "subPath") + } + test("Parses persistentVolumeClaim volumes correctly") { val sparkConf = new SparkConf(false) sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path") http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 2a95746..aadbf16 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -43,6 +43,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { val volumeConf = KubernetesVolumeSpec( "testVolume", "/tmp", + "", false, KubernetesHostPathVolumeConf("/hostPath/tmp") ) @@ -62,6 +63,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { val volumeConf = KubernetesVolumeSpec( "testVolume", "/tmp", + "", true, KubernetesPVCVolumeConf("pvcClaim") ) @@ -83,6 +85,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { val volumeConf = KubernetesVolumeSpec( "testVolume", "/tmp", + "", false, KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G")) ) @@ -104,6 +107,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { val volumeConf = KubernetesVolumeSpec( "testVolume", "/tmp", + "", false, KubernetesEmptyDirVolumeConf(None, None) ) @@ -125,12 +129,14 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { val hpVolumeConf = KubernetesVolumeSpec( "hpVolume", "/tmp", + "", false, KubernetesHostPathVolumeConf("/hostPath/tmp") ) val pvcVolumeConf = KubernetesVolumeSpec( "checkpointVolume", "/checkpoints", + "", true, KubernetesPVCVolumeConf("pvcClaim") ) @@ -142,4 +148,77 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(configuredPod.pod.getSpec.getVolumes.size() === 2) assert(configuredPod.container.getVolumeMounts.size() === 2) } + + test("Mounts subpath on emptyDir") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "foo", + false, + KubernetesEmptyDirVolumeConf(None, None) + ) + val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + val emptyDirMount = configuredPod.container.getVolumeMounts.get(0) + assert(emptyDirMount.getMountPath === "/tmp") + assert(emptyDirMount.getName === "testVolume") + assert(emptyDirMount.getSubPath === "foo") + } + + test("Mounts subpath on persistentVolumeClaims") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "bar", + true, + KubernetesPVCVolumeConf("pvcClaim") + ) + val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(pvcClaim.getClaimName === "pvcClaim") + assert(configuredPod.container.getVolumeMounts.size() === 1) + val pvcMount = configuredPod.container.getVolumeMounts.get(0) + assert(pvcMount.getMountPath === "/tmp") + assert(pvcMount.getName === "testVolume") + assert(pvcMount.getSubPath === "bar") + } + + test("Mounts multiple subpaths") { + val volumeConf = KubernetesEmptyDirVolumeConf(None, None) + val emptyDirSpec = KubernetesVolumeSpec( + "testEmptyDir", + "/tmp/foo", + "foo", + true, + KubernetesEmptyDirVolumeConf(None, None) + ) + val pvcSpec = KubernetesVolumeSpec( + "testPVC", + "/tmp/bar", + "bar", + true, + KubernetesEmptyDirVolumeConf(None, None) + ) + val kubernetesConf = emptyKubernetesConf.copy( + roleVolumes = emptyDirSpec :: pvcSpec :: Nil) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 2) + val mounts = configuredPod.container.getVolumeMounts + assert(mounts.size() === 2) + assert(mounts.get(0).getName === "testEmptyDir") + assert(mounts.get(0).getMountPath === "/tmp/foo") + assert(mounts.get(0).getSubPath === "foo") + assert(mounts.get(1).getName === "testPVC") + assert(mounts.get(1).getMountPath === "/tmp/bar") + assert(mounts.get(1).getSubPath === "bar") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index fe900fd..3708864 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -140,6 +140,40 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val volumeSpec = KubernetesVolumeSpec( "volume", "/tmp", + "", + false, + KubernetesHostPathVolumeConf("/path")) + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + JavaMainAppResource(None), + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + volumeSpec :: Nil, + hadoopConfSpec = None) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + MOUNT_VOLUMES_STEP_TYPE, + DRIVER_CMD_STEP_TYPE) + } + + test("Apply volumes step if a mount subpath is present.") { + val volumeSpec = KubernetesVolumeSpec( + "volume", + "/tmp", + "foo", false, KubernetesHostPathVolumeConf("/path")) val conf = KubernetesConf( http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 1fea08c..a59f6d0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -107,6 +107,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { val volumeSpec = KubernetesVolumeSpec( "volume", "/tmp", + "", false, KubernetesHostPathVolumeConf("/checkpoint")) val conf = KubernetesConf( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org