This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f36a5fb  [SPARK-38194] Followup: Fix k8s memory overhead passing to 
executor pods
f36a5fb is described below

commit f36a5fb2b88620c1c490d087b0293c4e58d29979
Author: Adam Binford - Customer Site (Virginia) - CW 121796 
<abinf...@pu00cenvdi797.vdi.ux.dg.local>
AuthorDate: Thu Mar 17 18:32:29 2022 -0500

    [SPARK-38194] Followup: Fix k8s memory overhead passing to executor pods
    
    ### What changes were proposed in this pull request?
    
    Follow up to https://github.com/apache/spark/pull/35504 to fix k8s memory 
overhead handling.
    
    ### Why are the changes needed?
    
    https://github.com/apache/spark/pull/35504 introduced a bug only caught by 
the K8S integration tests.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Fix back to old behavior.
    
    ### How was this patch tested?
    
    See if IT passes
    
    Closes #35901 from Kimahriman/k8s-memory-overhead-executors.
    
    Authored-by: Adam Binford - Customer Site (Virginia) - CW 121796 
<abinf...@pu00cenvdi797.vdi.ux.dg.local>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../k8s/features/BasicDriverFeatureStep.scala      | 30 +++++++-----
 .../k8s/features/BasicDriverFeatureStepSuite.scala | 57 +++++++++++++++++-----
 2 files changed, 63 insertions(+), 24 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 9715149..413f5bc 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -53,28 +53,32 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
 
   // Memory settings
   private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
-  private val memoryOverheadFactor = if 
(conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
-    conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
-  } else {
-    conf.get(MEMORY_OVERHEAD_FACTOR)
-  }
 
-  // The memory overhead factor to use. If the user has not set it, then use a 
different
-  // value for non-JVM apps. This value is propagated to executors.
-  private val overheadFactor =
+  // The default memory overhead factor to use, derived from the deprecated
+  // `spark.kubernetes.memoryOverheadFactor` config or the default overhead 
values.
+  // If the user has not set it, then use a different default for non-JVM 
apps. This value is
+  // propagated to executors and used if the executor overhead factor is not 
set explicitly.
+  private val defaultOverheadFactor =
     if (conf.mainAppResource.isInstanceOf[NonJVMResource]) {
-      if (conf.contains(MEMORY_OVERHEAD_FACTOR) || 
conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
-        memoryOverheadFactor
+      if (conf.contains(MEMORY_OVERHEAD_FACTOR)) {
+        conf.get(MEMORY_OVERHEAD_FACTOR)
       } else {
         NON_JVM_MEMORY_OVERHEAD_FACTOR
       }
     } else {
-      memoryOverheadFactor
+      conf.get(MEMORY_OVERHEAD_FACTOR)
     }
 
+  // Prefer the driver memory overhead factor if set explicitly
+  private val memoryOverheadFactor = if 
(conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
+    conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
+  } else {
+    defaultOverheadFactor
+  }
+
   private val memoryOverheadMiB = conf
     .get(DRIVER_MEMORY_OVERHEAD)
-    .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt,
+    .getOrElse(math.max((memoryOverheadFactor * driverMemoryMiB).toInt,
       ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
   private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
 
@@ -169,7 +173,7 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
       KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
       "spark.app.id" -> conf.appId,
       KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
-      DRIVER_MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
+      MEMORY_OVERHEAD_FACTOR.key -> defaultOverheadFactor.toString)
     // try upload local, resolvable files to a hadoop compatible file system
     Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key =>
       val uris = conf.get(key).filter(uri => 
KubernetesUtils.isLocalAndResolvable(uri))
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index d45f5f9..9a3b06a 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -134,7 +134,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
       "spark.app.id" -> KubernetesTestConf.APP_ID,
       "spark.kubernetes.submitInDriver" -> "true",
-      DRIVER_MEMORY_OVERHEAD_FACTOR.key -> 
DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
+      MEMORY_OVERHEAD_FACTOR.key -> 
MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
     assert(featureStep.getAdditionalPodSystemProperties() === 
expectedSparkConf)
   }
 
@@ -192,13 +192,16 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
   // Memory overhead tests. Tuples are:
   //   test name, main resource, overhead factor, expected factor
+  val driverDefault = DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get
+  val oldConfigDefault = MEMORY_OVERHEAD_FACTOR.defaultValue.get
+  val nonJvm = NON_JVM_MEMORY_OVERHEAD_FACTOR
   Seq(
-    ("java", JavaMainAppResource(None), None, 
DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get),
-    ("python default", PythonMainAppResource(null), None, 
NON_JVM_MEMORY_OVERHEAD_FACTOR),
-    ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d),
-    ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR)
-  ).foreach { case (name, resource, factor, expectedFactor) =>
-    test(s"memory overhead factor: $name") {
+    ("java", JavaMainAppResource(None), None, driverDefault, oldConfigDefault),
+    ("python default", PythonMainAppResource(null), None, nonJvm, nonJvm),
+    ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d, 
nonJvm),
+    ("r default", RMainAppResource(null), None, nonJvm, nonJvm)
+  ).foreach { case (name, resource, factor, expectedFactor, 
expectedPropFactor) =>
+    test(s"memory overhead factor new config: $name") {
       // Choose a driver memory where the default memory overhead is > 
MEMORY_OVERHEAD_MIN_MIB
       val driverMem =
         ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / 
DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
@@ -218,7 +221,37 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       assert(mem === s"${expected}Mi")
 
       val systemProperties = step.getAdditionalPodSystemProperties()
-      assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === 
expectedFactor.toString)
+      assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === 
expectedPropFactor.toString)
+    }
+  }
+
+  Seq(
+    ("java", JavaMainAppResource(None), None, driverDefault),
+    ("python default", PythonMainAppResource(null), None, nonJvm),
+    ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d),
+    ("r default", RMainAppResource(null), None, nonJvm)
+  ).foreach { case (name, resource, factor, expectedFactor) =>
+    test(s"memory overhead factor old config: $name") {
+      // Choose a driver memory where the default memory overhead is > 
MEMORY_OVERHEAD_MIN_MIB
+      val driverMem =
+        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / 
MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+
+      // main app resource, overhead factor
+      val sparkConf = new SparkConf(false)
+        .set(CONTAINER_IMAGE, "spark-driver:latest")
+        .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
+      factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) }
+      val conf = KubernetesTestConf.createDriverConf(
+        sparkConf = sparkConf,
+        mainAppResource = resource)
+      val step = new BasicDriverFeatureStep(conf)
+      val pod = step.configurePod(SparkPod.initialPod())
+      val mem = 
amountAndFormat(pod.container.getResources.getRequests.get("memory"))
+      val expected = (driverMem + driverMem * expectedFactor).toInt
+      assert(mem === s"${expected}Mi")
+
+      val systemProperties = step.getAdditionalPodSystemProperties()
+      assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === 
expectedFactor.toString)
     }
   }
 
@@ -234,8 +267,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
     // New config should take precedence
     val expectedFactor = 0.2
+    val oldFactor = 0.3
     sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, expectedFactor)
-    sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3)
+    sparkConf.set(MEMORY_OVERHEAD_FACTOR, oldFactor)
 
     val conf = KubernetesTestConf.createDriverConf(
       sparkConf = sparkConf)
@@ -245,8 +279,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val expected = (driverMem + driverMem * expectedFactor).toInt
     assert(mem === s"${expected}Mi")
 
+    // The old config should be passed as a system property for use by the 
executor
     val systemProperties = step.getAdditionalPodSystemProperties()
-    assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === 
expectedFactor.toString)
+    assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === oldFactor.toString)
   }
 
   test(s"SPARK-38194: old memory factor settings is applied if new one isn't 
given") {
@@ -272,7 +307,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     assert(mem === s"${expected}Mi")
 
     val systemProperties = step.getAdditionalPodSystemProperties()
-    assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === 
expectedFactor.toString)
+    assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === 
expectedFactor.toString)
   }
 
   test("SPARK-35493: make spark.blockManager.port be able to be fallen back to 
in driver pod") {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to