This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b065c945fe2 [SPARK-39006][K8S] Show a directional error message for 
executor PVC dynamic allocation failure
b065c945fe2 is described below

commit b065c945fe27dd5869b39bfeaad8e2b23a8835b5
Author: Qian.Sun <qian.sun2...@gmail.com>
AuthorDate: Sat May 7 17:58:20 2022 -0700

    [SPARK-39006][K8S] Show a directional error message for executor PVC 
dynamic allocation failure
    
    ### What changes were proposed in this pull request?
    
    This PR aims to show a directional error message for executor PVC dynamic 
allocation failure.
    
    ### Why are the changes needed?
    
    #29846 supports dynamic PVC creation/deletion for K8s executors.
    #29557 support execId placeholder in executor PVC conf.
    If not set 
`spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName`
 with `onDemand` or `SPARK_EXECUTOR_ID`, spark will continue to try to create 
the executor pod.
    After this PR, spark can show a directional error message for this 
situation.
    ```plain
    ERROR ExecutorPodsSnapshotsStoreImpl: Going to stop due to 
IllegalArgumentException
    java.lang.IllegalArgumentException: PVC ClaimName should contain OnDemand 
or SPARK_EXECUTOR_ID when multiple executors are required
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Add unit test.
    
    Closes #36374 from dcoliversun/SPARK-39006.
    
    Authored-by: Qian.Sun <qian.sun2...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../k8s/features/MountVolumesFeatureStep.scala     | 16 +++++++++
 .../features/MountVolumesFeatureStepSuite.scala    | 39 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index 78dd6ec21ed..d47024ca9fe 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model._
 
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Constants.{ENV_EXECUTOR_ID, 
SPARK_APP_ID_LABEL}
+import org.apache.spark.internal.config.EXECUTOR_INSTANCES
 
 private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
   extends KubernetesFeatureConfigStep {
@@ -71,6 +72,7 @@ private[spark] class MountVolumesFeatureStep(conf: 
KubernetesConf)
         case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
           val claimName = conf match {
             case c: KubernetesExecutorConf =>
+              checkPVCClaimName(claimNameTemplate)
               claimNameTemplate
                 .replaceAll(PVC_ON_DEMAND,
                   
s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i")
@@ -120,6 +122,20 @@ private[spark] class MountVolumesFeatureStep(conf: 
KubernetesConf)
   override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
     additionalResources.toSeq
   }
+
+  private def checkPVCClaimName(claimName: String): Unit = {
+    val executorInstances = conf.get(EXECUTOR_INSTANCES)
+    if (executorInstances.isDefined && executorInstances.get > 1) {
+      // PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID
+      // when requiring multiple executors.
+      // Else, spark continues to try to create the executor pod.
+      if (!claimName.contains(PVC_ON_DEMAND) && 
!claimName.contains(ENV_EXECUTOR_ID)) {
+        throw new IllegalArgumentException(s"PVC ClaimName: $claimName " +
+          s"should contain $PVC_ON_DEMAND or $ENV_EXECUTOR_ID " +
+          "when requiring multiple executors")
+      }
+    }
+  }
 }
 
 private[spark] object MountVolumesFeatureStep {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index 468d1dde9fb..e428e54d661 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -16,10 +16,13 @@
  */
 package org.apache.spark.deploy.k8s.features
 
+import java.util.UUID
+
 import scala.collection.JavaConverters._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
+import org.apache.spark.internal.config.EXECUTOR_INSTANCES
 
 class MountVolumesFeatureStepSuite extends SparkFunSuite {
   test("Mounts hostPath volumes") {
@@ -148,6 +151,40 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
   }
 
+  test("SPARK-39006: Check PVC ClaimName") {
+    val claimName = s"pvc-${UUID.randomUUID().toString}"
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      "",
+      mountReadOnly = true,
+      KubernetesPVCVolumeConf(claimName)
+    )
+    // Create pvc without specified claimName unsuccessfully when requiring 
multiple executors
+    val conf = new SparkConf().set(EXECUTOR_INSTANCES, 2)
+    var executorConf =
+      KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = 
Seq(volumeConf))
+    var executorStep = new MountVolumesFeatureStep(executorConf)
+    assertThrows[IllegalArgumentException] {
+      executorStep.configurePod(SparkPod.initialPod())
+    }
+    assert(intercept[IllegalArgumentException] {
+      executorStep.configurePod(SparkPod.initialPod())
+    }.getMessage.equals(s"PVC ClaimName: $claimName " +
+      "should contain OnDemand or SPARK_EXECUTOR_ID when requiring multiple 
executors"))
+
+    // Create and mount pvc with any claimName successfully when requiring one 
executor
+    conf.set(EXECUTOR_INSTANCES, 1)
+    executorConf =
+      KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = 
Seq(volumeConf))
+    executorStep = new MountVolumesFeatureStep(executorConf)
+    val executorPod = executorStep.configurePod(SparkPod.initialPod())
+
+    assert(executorPod.pod.getSpec.getVolumes.size() === 1)
+    val executorPVC = 
executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+    assert(executorPVC.getClaimName.equals(claimName))
+  }
+
   test("Mounts emptyDir") {
     val volumeConf = KubernetesVolumeSpec(
       "testVolume",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to