Repository: spark
Updated Branches:
  refs/heads/master 2512a1d42 -> 3df307aa5


[SPARK-25960][K8S] Support subpath mounting with Kubernetes

## What changes were proposed in this pull request?

This PR adds configurations to use subpaths with Spark on k8s. Subpaths 
(https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath) allow the 
user to specify a path within a volume to use instead of the volume's root.

## How was this patch tested?

Added unit tests. Ran SparkPi on a cluster with event logging pointed at a 
subpath-mount and verified the driver host created and used the subpath.

Closes #23026 from NiharS/k8s_subpath.

Authored-by: Nihar Sheth <niharrsh...@gmail.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3df307aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3df307aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3df307aa

Branch: refs/heads/master
Commit: 3df307aa515b3564686e75d1b71754bbcaaf2dec
Parents: 2512a1d
Author: Nihar Sheth <niharrsh...@gmail.com>
Authored: Mon Nov 26 11:06:02 2018 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Nov 26 11:06:02 2018 -0800

----------------------------------------------------------------------
 docs/running-on-kubernetes.md                   | 17 +++++
 .../org/apache/spark/deploy/k8s/Config.scala    |  1 +
 .../spark/deploy/k8s/KubernetesVolumeSpec.scala |  1 +
 .../deploy/k8s/KubernetesVolumeUtils.scala      |  2 +
 .../k8s/features/MountVolumesFeatureStep.scala  |  1 +
 .../deploy/k8s/KubernetesVolumeUtilsSuite.scala | 12 +++
 .../features/MountVolumesFeatureStepSuite.scala | 79 ++++++++++++++++++++
 .../submit/KubernetesDriverBuilderSuite.scala   | 34 +++++++++
 .../k8s/KubernetesExecutorBuilderSuite.scala    |  1 +
 9 files changed, 148 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index e940d9a..2c01e1e 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -245,6 +245,7 @@ To mount a volume of any of the types above into the driver 
pod, use the followi
 ```
 --conf 
spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path=<mount 
path>
 --conf 
spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly=<true|false>
+--conf 
spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath=<mount 
subPath>
 ``` 
 
 Specifically, `VolumeType` can be one of the following values: `hostPath`, 
`emptyDir`, and `persistentVolumeClaim`. `VolumeName` is the name you want to 
use for the volume under the `volumes` field in the pod specification.
@@ -807,6 +808,14 @@ specific to Spark on Kubernetes.
   </td>
 </tr>
 <tr>
+  
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath</code></td>
+  <td>(none)</td>
+  <td>
+   Specifies a <a 
href="https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath";>subpath</a>
 to be mounted from the volume into the driver pod.
+   
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint</code>.
+  </td>
+</tr>
+<tr>
   
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
   <td>(none)</td>
   <td>
@@ -831,6 +840,14 @@ specific to Spark on Kubernetes.
   </td>
 </tr>
 <tr>
+  
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPath</code></td>
+  <td>(none)</td>
+  <td>
+   Specifies a <a 
href="https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath";>subpath</a>
 to be mounted from the volume into the executor pod.
+   
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint</code>.
+  </td>
+</tr>
+<tr>
   
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
   <td>false</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index a32bd93..724acd2 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -297,6 +297,7 @@ private[spark] object Config extends Logging {
   val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
   val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
   val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
+  val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
   val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
   val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
   val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"

http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
index b1762d1..1a214fa 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
@@ -34,5 +34,6 @@ private[spark] case class KubernetesEmptyDirVolumeConf(
 private[spark] case class KubernetesVolumeSpec[T <: 
KubernetesVolumeSpecificConf](
     volumeName: String,
     mountPath: String,
+    mountSubPath: String,
     mountReadOnly: Boolean,
     volumeConf: T)

http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
index 713df5f..1553264 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
@@ -39,6 +39,7 @@ private[spark] object KubernetesVolumeUtils {
     getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
       val pathKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
       val readOnlyKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
+      val subPathKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
 
       for {
         path <- properties.getTry(pathKey)
@@ -46,6 +47,7 @@ private[spark] object KubernetesVolumeUtils {
       } yield KubernetesVolumeSpec(
         volumeName = volumeName,
         mountPath = path,
+        mountSubPath = properties.get(subPathKey).getOrElse(""),
         mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
         volumeConf = volumeConf
       )

http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index e60259c..1473a7d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -51,6 +51,7 @@ private[spark] class MountVolumesFeatureStep(
       val volumeMount = new VolumeMountBuilder()
         .withMountPath(spec.mountPath)
         .withReadOnly(spec.mountReadOnly)
+        .withSubPath(spec.mountSubPath)
         .withName(spec.volumeName)
         .build()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
index d795d15..de79a58 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
@@ -33,6 +33,18 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
       KubernetesHostPathVolumeConf("/hostPath"))
   }
 
+  test("Parses subPath correctly") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
+    sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
+    sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head.get
+    assert(volumeSpec.volumeName === "volumeName")
+    assert(volumeSpec.mountPath === "/path")
+    assert(volumeSpec.mountSubPath === "subPath")
+  }
+
   test("Parses persistentVolumeClaim volumes correctly") {
     val sparkConf = new SparkConf(false)
     sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")

http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index 2a95746..aadbf16 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -43,6 +43,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     val volumeConf = KubernetesVolumeSpec(
       "testVolume",
       "/tmp",
+      "",
       false,
       KubernetesHostPathVolumeConf("/hostPath/tmp")
     )
@@ -62,6 +63,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     val volumeConf = KubernetesVolumeSpec(
       "testVolume",
       "/tmp",
+      "",
       true,
       KubernetesPVCVolumeConf("pvcClaim")
     )
@@ -83,6 +85,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     val volumeConf = KubernetesVolumeSpec(
       "testVolume",
       "/tmp",
+      "",
       false,
       KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))
     )
@@ -104,6 +107,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     val volumeConf = KubernetesVolumeSpec(
       "testVolume",
       "/tmp",
+      "",
       false,
       KubernetesEmptyDirVolumeConf(None, None)
     )
@@ -125,12 +129,14 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     val hpVolumeConf = KubernetesVolumeSpec(
       "hpVolume",
       "/tmp",
+      "",
       false,
       KubernetesHostPathVolumeConf("/hostPath/tmp")
     )
     val pvcVolumeConf = KubernetesVolumeSpec(
       "checkpointVolume",
       "/checkpoints",
+      "",
       true,
       KubernetesPVCVolumeConf("pvcClaim")
     )
@@ -142,4 +148,77 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
     assert(configuredPod.container.getVolumeMounts.size() === 2)
   }
+
+  test("Mounts subpath on emptyDir") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      "foo",
+      false,
+      KubernetesEmptyDirVolumeConf(None, None)
+    )
+    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: 
Nil)
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    val emptyDirMount = configuredPod.container.getVolumeMounts.get(0)
+    assert(emptyDirMount.getMountPath === "/tmp")
+    assert(emptyDirMount.getName === "testVolume")
+    assert(emptyDirMount.getSubPath === "foo")
+  }
+
+  test("Mounts subpath on persistentVolumeClaims") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      "bar",
+      true,
+      KubernetesPVCVolumeConf("pvcClaim")
+    )
+    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: 
Nil)
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    val pvcClaim = 
configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+    assert(pvcClaim.getClaimName === "pvcClaim")
+    assert(configuredPod.container.getVolumeMounts.size() === 1)
+    val pvcMount = configuredPod.container.getVolumeMounts.get(0)
+    assert(pvcMount.getMountPath === "/tmp")
+    assert(pvcMount.getName === "testVolume")
+    assert(pvcMount.getSubPath === "bar")
+  }
+
+  test("Mounts multiple subpaths") {
+    val volumeConf = KubernetesEmptyDirVolumeConf(None, None)
+    val emptyDirSpec = KubernetesVolumeSpec(
+      "testEmptyDir",
+      "/tmp/foo",
+      "foo",
+      true,
+      KubernetesEmptyDirVolumeConf(None, None)
+    )
+    val pvcSpec = KubernetesVolumeSpec(
+      "testPVC",
+      "/tmp/bar",
+      "bar",
+      true,
+      KubernetesEmptyDirVolumeConf(None, None)
+    )
+    val kubernetesConf = emptyKubernetesConf.copy(
+      roleVolumes = emptyDirSpec :: pvcSpec :: Nil)
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
+    val mounts = configuredPod.container.getVolumeMounts
+    assert(mounts.size() === 2)
+    assert(mounts.get(0).getName === "testEmptyDir")
+    assert(mounts.get(0).getMountPath === "/tmp/foo")
+    assert(mounts.get(0).getSubPath === "foo")
+    assert(mounts.get(1).getName === "testPVC")
+    assert(mounts.get(1).getMountPath === "/tmp/bar")
+    assert(mounts.get(1).getSubPath === "bar")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index fe900fd..3708864 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -140,6 +140,40 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
     val volumeSpec = KubernetesVolumeSpec(
       "volume",
       "/tmp",
+      "",
+      false,
+      KubernetesHostPathVolumeConf("/path"))
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesDriverSpecificConf(
+        JavaMainAppResource(None),
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      volumeSpec :: Nil,
+      hadoopConfSpec = None)
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE,
+      LOCAL_DIRS_STEP_TYPE,
+      MOUNT_VOLUMES_STEP_TYPE,
+      DRIVER_CMD_STEP_TYPE)
+  }
+
+  test("Apply volumes step if a mount subpath is present.") {
+    val volumeSpec = KubernetesVolumeSpec(
+      "volume",
+      "/tmp",
+      "foo",
       false,
       KubernetesHostPathVolumeConf("/path"))
     val conf = KubernetesConf(

http://git-wip-us.apache.org/repos/asf/spark/blob/3df307aa/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index 1fea08c..a59f6d0 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -107,6 +107,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
     val volumeSpec = KubernetesVolumeSpec(
       "volume",
       "/tmp",
+      "",
       false,
       KubernetesHostPathVolumeConf("/checkpoint"))
     val conf = KubernetesConf(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to