This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cdd41a9f2c4f [SPARK-47003][K8S] Detect and fail on invalid volume 
sizes (< 1KiB) in K8s
cdd41a9f2c4f is described below

commit cdd41a9f2c4f278c5da7e1826c5e0ca0db7ec548
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Wed Feb 7 14:58:20 2024 -0800

    [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s
    
    ### What changes were proposed in this pull request?
    
    This PR aims to detect and fails on invalid volume size.
    
    ### Why are the changes needed?
    
    This happens when the user forget the unit of volume size. For example, 
`100` instead of `100Gi`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    For K8s volumes, the system is trying to use the system default minimum 
volume size. However it totally depends on the underlying system. And, this 
misconfiguration misleads the users in many cases because the job is started 
and running in unhealthy status.
    - First, the executor pods will be killed by the K8s control plane due to 
the out of disk situation.
    - Second, Spark is trying to create new executors (still with small disks) 
and to retry multiple times.
    
    We had better detect the missed-unit situation and make those jobs fail as 
early as possible.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45061 from dongjoon-hyun/SPARK-47003.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/deploy/k8s/KubernetesVolumeUtils.scala   | 13 +++++++++++
 .../deploy/k8s/KubernetesVolumeUtilsSuite.scala    | 26 ++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
index 18fda708d9bb..baa519658c2e 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.deploy.k8s
 
+import java.lang.Long.parseLong
+
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 
@@ -76,6 +78,7 @@ private[spark] object KubernetesVolumeUtils {
           
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY"
         val sizeLimitKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
         verifyOptionKey(options, claimNameKey, KUBERNETES_VOLUMES_PVC_TYPE)
+        verifySize(options.get(sizeLimitKey))
         KubernetesPVCVolumeConf(
           options(claimNameKey),
           options.get(storageClassKey),
@@ -84,6 +87,7 @@ private[spark] object KubernetesVolumeUtils {
       case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
         val mediumKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
         val sizeLimitKey = 
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
+        verifySize(options.get(sizeLimitKey))
         KubernetesEmptyDirVolumeConf(options.get(mediumKey), 
options.get(sizeLimitKey))
 
       case KUBERNETES_VOLUMES_NFS_TYPE =>
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
       throw new NoSuchElementException(key + s" is required for $msg")
     }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+    size.foreach { v =>
+      if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+        throw new IllegalArgumentException(
+          s"Volume size `$v` is smaller than 1KiB. Missing units?")
+      }
+    }
+  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
index 156740d7c8ae..fdc1aae0d410 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
@@ -182,4 +182,30 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     }
     assert(e.getMessage.contains("nfs.volumeName.options.server"))
   }
+
+  test("SPARK-47003: Check emptyDir volume size") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
+    sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
+    sparkConf.set("test.emptyDir.volumeName.options.medium", "medium")
+    sparkConf.set("test.emptyDir.volumeName.options.sizeLimit", "5")
+
+    val m = intercept[IllegalArgumentException] {
+      KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
+    }.getMessage
+    assert(m.contains("smaller than 1KiB. Missing units?"))
+  }
+
+  test("SPARK-47003: Check persistentVolumeClaim volume size") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", 
"false")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", 
"claimName")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.options.sizeLimit", 
"1000")
+
+    val m = intercept[IllegalArgumentException] {
+      KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
+    }.getMessage
+    assert(m.contains("smaller than 1KiB. Missing units?"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to