This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b073fd607f1 [SPARK-47208][CORE] Allow overriding base overhead memory
4b073fd607f1 is described below

commit 4b073fd607f14153ca511da08dbc28c7340b287a
Author: jpcorreia99 <jpcorrei...@gmail.com>
AuthorDate: Thu Mar 14 08:23:23 2024 -0500

    [SPARK-47208][CORE] Allow overriding base overhead memory
    
    ### What changes were proposed in this pull request?
    We can already select the desired overhead memory directly via the 
`spark.driver/executor.memoryOverhead` flags, however, if that flag is not 
present the overhead memory calculation goes as follows:
    
    ```
    overhead_memory = Max(384, 'spark.driver/executor.memory' * 
'spark.driver/executor.memoryOverheadFactor')
    
    [where the 'memoryOverheadFactor' flag defaults to 0.1]
    ```
    
    This PR adds two new spark configs: `spark.driver.minMemoryOverhead` and 
`spark.executor.minMemoryOverhead`, which can be used to override the 384Mib 
minimum value.
    
    The memory overhead calculation will now be :
    
    ```
    min_memory = 
sparkConf.get('spark.driver/executor.minMemoryOverhead').getOrElse(384)
    
    overhead_memory = Max(min_memory, 'spark.driver/executor.memory' * 
'spark.driver/executor.memoryOverheadFactor')
    ```
    
    ### Why are the changes needed?
    There are certain times where being able to override the 384Mb minimum 
directly can be beneficial. We may have a scenario where a lot of off-heap 
operations are performed (ex: using package managers/native 
compression/decompression) where we don't have a need for a large JVM heap but 
we may still need a signficant amount of memory in the spark node.
    
    Using the `memoryOverheadFactor` config flag may not prove appropriate, 
since we may not want the overhead allocation to directly scale with JVM 
memory, as a cost saving/resource limitation problem.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, as described above, two new flags have been added to the spark config. 
No break of existing behaviours.
    
    ### How was this patch tested?
    Added tests for 3 cases:
    - If `spark.driver/executor.memoryOverhead` is set, then the new changes 
have no effect.
    - If  `spark.driver/executor.minMemoryOverhead` is set and its value is 
higher than  'spark.driver/executor.memory' * 
'spark.driver/executor.memoryOverheadFactor', the total memory will be the 
allocated JVM memory + `spark.driver/executor.minMemoryOverhead`
    - If  `spark.driver/executor.minMemoryOverhead` but its value is lower than 
'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor', 
the total memory will be the allocated JVM memory + 
'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor'.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45240 from jpcorreia99/jcorrreia/MinOverheadMemoryOverride.
    
    Authored-by: jpcorreia99 <jpcorrei...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../org/apache/spark/internal/config/package.scala | 17 +++++
 .../apache/spark/resource/ResourceProfile.scala    |  8 +-
 .../spark/resource/ResourceProfileSuite.scala      |  2 +-
 docs/configuration.md                              | 22 +++++-
 .../k8s/features/BasicDriverFeatureStep.scala      |  7 +-
 .../k8s/features/BasicExecutorFeatureStep.scala    |  2 +
 .../k8s/features/BasicDriverFeatureStepSuite.scala | 85 ++++++++++++++++++++--
 .../features/BasicExecutorFeatureStepSuite.scala   | 43 ++++++++++-
 .../org/apache/spark/deploy/yarn/Client.scala      | 16 +++-
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  4 +-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 33 +++++++++
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 36 ++++++++-
 12 files changed, 252 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 1fcf75b02503..aa240b5cc5b5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -117,6 +117,14 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
+  private[spark] val DRIVER_MIN_MEMORY_OVERHEAD = 
ConfigBuilder("spark.driver.minMemoryOverhead")
+    .doc("The minimum amount of non-heap memory to be allocated per driver in 
cluster mode, " +
+      "in MiB unless otherwise specified. This value is ignored if " +
+      "spark.driver.memoryOverhead is set directly.")
+    .version("4.0.0")
+    .bytesConf(ByteUnit.MiB)
+    .createWithDefaultString("384m")
+
   private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR =
     ConfigBuilder("spark.driver.memoryOverheadFactor")
       .doc("Fraction of driver memory to be allocated as additional non-heap 
memory per driver " +
@@ -358,6 +366,15 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
+  private[spark] val EXECUTOR_MIN_MEMORY_OVERHEAD =
+    ConfigBuilder("spark.executor.minMemoryOverhead")
+    .doc("The minimum amount of non-heap memory to be allocated per executor " 
+
+      "in MiB unless otherwise specified. This value is ignored if " +
+      "spark.executor.memoryOverhead is set directly.")
+    .version("4.0.0")
+    .bytesConf(ByteUnit.MiB)
+    .createWithDefaultString("384m")
+
   private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR =
     ConfigBuilder("spark.executor.memoryOverheadFactor")
       .doc("Fraction of executor memory to be allocated as additional non-heap 
memory per " +
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 4a55b2f619e6..e95dbe973691 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -352,8 +352,6 @@ object ResourceProfile extends Logging {
   val UNKNOWN_RESOURCE_PROFILE_ID = -1
   val DEFAULT_RESOURCE_PROFILE_ID = 0
 
-  private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L
-
   private lazy val nextProfileId = new AtomicInteger(0)
   private val DEFAULT_PROFILE_LOCK = new Object()
 
@@ -489,10 +487,11 @@ object ResourceProfile extends Logging {
 
   private[spark] def calculateOverHeadMemory(
       overHeadMemFromConf: Option[Long],
+      minimumOverHeadMemoryFromConf: Long,
       executorMemoryMiB: Long,
       overheadFactor: Double): Long = {
     overHeadMemFromConf.getOrElse(math.max((overheadFactor * 
executorMemoryMiB).toInt,
-        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
+      minimumOverHeadMemoryFromConf))
   }
 
   /**
@@ -504,6 +503,7 @@ object ResourceProfile extends Logging {
   private[spark] def getResourcesForClusterManager(
       rpId: Int,
       execResources: Map[String, ExecutorResourceRequest],
+      minimumOverheadMemory: Long,
       overheadFactor: Double,
       conf: SparkConf,
       isPythonApp: Boolean,
@@ -515,7 +515,7 @@ object ResourceProfile extends Logging {
     var memoryOffHeapMiB = defaultResources.memoryOffHeapMiB
     var pysparkMemoryMiB = defaultResources.pysparkMemoryMiB.getOrElse(0L)
     var memoryOverheadMiB = 
calculateOverHeadMemory(defaultResources.memoryOverheadMiB,
-      executorMemoryMiB, overheadFactor)
+      minimumOverheadMemory, executorMemoryMiB, overheadFactor)
 
     val finalCustomResources = if (rpId != DEFAULT_RESOURCE_PROFILE_ID) {
       val customResources = new mutable.HashMap[String, 
ExecutorResourceRequest]
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index 99bd5f520042..3464c3b3a0c5 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -98,7 +98,7 @@ class ResourceProfileSuite extends SparkFunSuite with 
MockitoSugar {
       new ExecutorResourceRequests().cores(4)
     val rp = rpBuilder.require(taskReq).require(execReq).build()
     val executorResourceForRp = ResourceProfile.getResourcesForClusterManager(
-      rp.id, rp.executorResources, 0.0, sparkConf, false, Map.empty)
+      rp.id, rp.executorResources, 500L, 0.0, sparkConf, false, Map.empty)
     // Standalone cluster only take cores and executor memory as built-in 
resources.
     assert(executorResourceForRp.cores.get === 4)
     assert(executorResourceForRp.executorMemoryMiB === 1024L)
diff --git a/docs/configuration.md b/docs/configuration.md
index f0d68c55e7b3..9a686bc514c5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -187,7 +187,7 @@ of the most common options to set are:
 </tr>
 <tr>
   <td><code>spark.driver.memoryOverhead</code></td>
-  <td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with 
minimum of 384 </td>
+  <td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with 
minimum of <code>spark.driver.minMemoryOverhead</code></td>
   <td>
     Amount of non-heap memory to be allocated per driver process in cluster 
mode, in MiB unless
     otherwise specified. This is memory that accounts for things like VM 
overheads, interned strings,
@@ -202,6 +202,15 @@ of the most common options to set are:
   </td>
   <td>2.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.driver.minMemoryOverhead</code></td>
+  <td>384m</td>
+  <td>
+    The minimum amount of non-heap memory to be allocated per driver process 
in cluster mode, in MiB unless otherwise specified, if 
<code>spark.driver.memoryOverhead</code> is not defined.
+    This option is currently supported on YARN and Kubernetes.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.driver.memoryOverheadFactor</code></td>
   <td>0.10</td>
@@ -291,7 +300,7 @@ of the most common options to set are:
 </tr>
 <tr>
  <td><code>spark.executor.memoryOverhead</code></td>
-  <td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with 
minimum of 384 </td>
+  <td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with 
minimum of <code>spark.executor.minMemoryOverhead</code></td>
   <td>
     Amount of additional memory to be allocated per executor process, in MiB 
unless otherwise specified.
     This is memory that accounts for things like VM overheads, interned 
strings, other native overheads, etc.
@@ -306,6 +315,15 @@ of the most common options to set are:
   </td>
   <td>2.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.driver.minMemoryOverhead</code></td>
+  <td>384m</td>
+  <td>
+    The minimum amount of non-heap memory to be allocated per executor 
process, in MiB unless otherwise specified, if 
<code>spark.executor.memoryOverhead</code> is not defined.
+    This option is currently supported on YARN and Kubernetes.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.executor.memoryOverheadFactor</code></td>
   <td>0.10</td>
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 51ee9ffbe405..bd198deed3d5 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
-import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.Utils
 
@@ -67,6 +66,8 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
       conf.get(MEMORY_OVERHEAD_FACTOR)
     }
 
+  private val driverMinimumMemoryOverhead = 
conf.get(DRIVER_MIN_MEMORY_OVERHEAD)
+
   // Prefer the driver memory overhead factor if set explicitly
   private val memoryOverheadFactor = if 
(conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
     conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
@@ -74,10 +75,10 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
     defaultOverheadFactor
   }
 
-  private val memoryOverheadMiB = conf
+  private val memoryOverheadMiB: Long = conf
     .get(DRIVER_MEMORY_OVERHEAD)
     .getOrElse(math.max((memoryOverheadFactor * driverMemoryMiB).toInt,
-      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
+      driverMinimumMemoryOverhead))
   private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
 
   override def configurePod(pod: SparkPod): SparkPod = {
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index febd8dcfa75f..fa4a6f43215c 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -59,6 +59,7 @@ private[spark] class BasicExecutorFeatureStep(
   private val isDefaultProfile = resourceProfile.id == 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
   private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == 
Some(APP_RESOURCE_TYPE_PYTHON)
   private val disableConfigMap = 
kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)
+  private val minimumMemoryOverhead = 
kubernetesConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD)
   private val memoryOverheadFactor = if 
(kubernetesConf.contains(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) {
     kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
   } else {
@@ -68,6 +69,7 @@ private[spark] class BasicExecutorFeatureStep(
   val execResources = ResourceProfile.getResourcesForClusterManager(
     resourceProfile.id,
     resourceProfile.executorResources,
+    minimumMemoryOverhead,
     memoryOverheadFactor,
     kubernetesConf.sparkConf,
     isPythonApp,
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index a9149b6cbf14..f102851e6c3b 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
-import org.apache.spark.resource.{ResourceID, ResourceProfile}
+import org.apache.spark.resource.ResourceID
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.util.Utils
 
@@ -205,7 +205,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     test(s"memory overhead factor new config: $name") {
       // Choose a driver memory where the default memory overhead is > 
MEMORY_OVERHEAD_MIN_MIB
       val driverMem =
-        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / 
DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+        DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get /
+          DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
 
       // main app resource, overhead factor
       val sparkConf = new SparkConf(false)
@@ -235,7 +236,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     test(s"memory overhead factor old config: $name") {
       // Choose a driver memory where the default memory overhead is > 
MEMORY_OVERHEAD_MIN_MIB
       val driverMem =
-        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / 
MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+        DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get / 
MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
 
       // main app resource, overhead factor
       val sparkConf = new SparkConf(false)
@@ -259,7 +260,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
   test(s"SPARK-38194: memory overhead factor precendence") {
     // Choose a driver memory where the default memory overhead is > 
MEMORY_OVERHEAD_MIN_MIB
     val driverMem =
-      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / 
DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+      DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get /
+        DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
 
     // main app resource, overhead factor
     val sparkConf = new SparkConf(false)
@@ -288,7 +290,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
   test(s"SPARK-38194: old memory factor settings is applied if new one isn't 
given") {
     // Choose a driver memory where the default memory overhead is > 
MEMORY_OVERHEAD_MIN_MIB
     val driverMem =
-      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / 
DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+      DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get /
+        DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
 
     // main app resource, overhead factor
     val sparkConf = new SparkConf(false)
@@ -372,6 +375,78 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       path.startsWith(FILE_UPLOAD_PATH) && 
path.endsWith("some-local-jar.jar")))
   }
 
+  test("SPARK-47208: User can override the minimum memory overhead of the 
driver") {
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+      .set(DRIVER_MEMORY.key, "256M")
+      .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+    val kubernetesConf: KubernetesDriverConf = 
KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf,
+      labels = CUSTOM_DRIVER_LABELS,
+      environment = DRIVER_ENVS,
+      annotations = DRIVER_ANNOTATIONS)
+
+    val featureStep = new BasicDriverFeatureStep(kubernetesConf)
+    val basePod = SparkPod.initialPod()
+    val configuredPod = featureStep.configurePod(basePod)
+
+    val resourceRequirements = configuredPod.container.getResources
+    val requests = resourceRequirements.getRequests.asScala
+    assert(amountAndFormat(requests("memory")) === "756Mi")
+    val limits = resourceRequirements.getLimits.asScala
+    assert(amountAndFormat(limits("memory")) === "756Mi")
+  }
+
+  test("SPARK-47208: Explicit overhead takes precedence over minimum 
overhead") {
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+      .set(DRIVER_MEMORY.key, "256M")
+      .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
+      .set(DRIVER_MEMORY_OVERHEAD, 200L)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+    val kubernetesConf: KubernetesDriverConf = 
KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf,
+      labels = CUSTOM_DRIVER_LABELS,
+      environment = DRIVER_ENVS,
+      annotations = DRIVER_ANNOTATIONS)
+
+    val featureStep = new BasicDriverFeatureStep(kubernetesConf)
+    val basePod = SparkPod.initialPod()
+    val configuredPod = featureStep.configurePod(basePod)
+
+    val resourceRequirements = configuredPod.container.getResources
+    val requests = resourceRequirements.getRequests.asScala
+    assert(amountAndFormat(requests("memory")) === "456Mi")
+    val limits = resourceRequirements.getLimits.asScala
+    assert(amountAndFormat(limits("memory")) === "456Mi")
+  }
+
+  test("SPARK-47208: Overhead is maximum between factor of memory and min base 
overhead") {
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+      .set(DRIVER_MEMORY.key, "5000M")
+      .set(DRIVER_MIN_MEMORY_OVERHEAD, 200L)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+    val kubernetesConf: KubernetesDriverConf = 
KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf,
+      labels = CUSTOM_DRIVER_LABELS,
+      environment = DRIVER_ENVS,
+      annotations = DRIVER_ANNOTATIONS)
+
+    val featureStep = new BasicDriverFeatureStep(kubernetesConf)
+    val basePod = SparkPod.initialPod()
+    val configuredPod = featureStep.configurePod(basePod)
+
+    val resourceRequirements = configuredPod.container.getResources
+    val requests = resourceRequirements.getRequests.asScala
+    // mem = 5000 + max(overhead_factor[0.1] * 5000, 200)
+    assert(amountAndFormat(requests("memory")) === "5500Mi")
+    val limits = resourceRequirements.getLimits.asScala
+    assert(amountAndFormat(limits("memory")) === "5500Mi")
+  }
+
+
   def containerPort(name: String, portNumber: Int): ContainerPort =
     new ContainerPortBuilder()
       .withName(name)
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index b9d5681907a6..eaf39dd816dc 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -460,7 +460,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
   test(s"SPARK-38194: memory overhead factor precendence") {
     // Choose an executor memory where the default memory overhead is > 
MEMORY_OVERHEAD_MIN_MIB
     val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
-    val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor 
* 2
+    val executorMem = EXECUTOR_MIN_MEMORY_OVERHEAD.defaultValue.get / 
defaultFactor * 2
 
     // main app resource, overhead factor
     val sparkConf = new SparkConf(false)
@@ -487,7 +487,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
   test(s"SPARK-38194: old memory factor settings is applied if new one isn't 
given") {
     // Choose an executor memory where the default memory overhead is > 
MEMORY_OVERHEAD_MIN_MIB
     val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
-    val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor 
* 2
+    val executorMem = EXECUTOR_MIN_MEMORY_OVERHEAD.defaultValue.get / 
defaultFactor * 2
 
     // main app resource, overhead factor
     val sparkConf = new SparkConf(false)
@@ -524,6 +524,45 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
     assert(podConfigured1.container.getPorts.contains(ports))
   }
 
+  test("SPARK-47208: User can override the minimum memory overhead of the 
executor") {
+    // main app resource, overriding the minimum oberhead to 500Mb
+    val sparkConf = new SparkConf(false)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+
+    val conf = KubernetesTestConf.createExecutorConf(
+      sparkConf = sparkConf)
+    ResourceProfile.clearDefaultProfile()
+    val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    val step = new BasicExecutorFeatureStep(conf, new 
SecurityManager(baseConf),
+      resourceProfile)
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    // memory = 1024M (default) + 500B (minimum overhead got overridden from 
the 384Mib)
+    assert(amountAndFormat(executor.container.getResources
+      .getLimits.get("memory")) === "1524Mi")
+  }
+
+  test("SPARK-47208: Explicit overhead takes precedence over minimum 
overhead") {
+    // main app resource, explicit overhead of 150MiB
+    val sparkConf = new SparkConf(false)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(EXECUTOR_MEMORY_OVERHEAD, 150L)
+      .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+
+    val conf = KubernetesTestConf.createExecutorConf(
+      sparkConf = sparkConf)
+    ResourceProfile.clearDefaultProfile()
+    val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    val step = new BasicExecutorFeatureStep(conf, new 
SecurityManager(baseConf),
+      resourceProfile)
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    // memory = 1024M  + 150MB (overrides any other overhead calculation)
+    assert(amountAndFormat(executor.container.getResources
+      .getLimits.get("memory")) === "1174Mi")
+  }
+
   // There is always exactly one controller reference, and it points to the 
driver pod.
   private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit 
= {
     assert(executor.getMetadata.getOwnerReferences.size() === 1)
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4e70b57e4bb7..1c762b98c4d0 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -59,7 +59,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, 
SparkAppHandle, YarnCommandBuilderUtils}
-import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper}
 import org.apache.spark.util.ArrayImplicits._
@@ -76,7 +75,6 @@ private[spark] class Client(
   private val hadoopConf = new 
YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
 
   private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
-
   private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && 
!isClusterMode
   private val statCachePreloadEnabled = 
sparkConf.get(YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED)
   private val statCachePreloadDirectoryCountThreshold: Int =
@@ -96,12 +94,21 @@ private[spark] class Client(
   } else {
     sparkConf.get(AM_MEMORY).toInt
   }
+
+  private val driverMinimumMemoryOverhead =
+    if (isClusterMode) {
+      sparkConf.get(DRIVER_MIN_MEMORY_OVERHEAD)
+    } else {
+      384L
+    }
+
   private val amMemoryOverhead = {
     val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else 
AM_MEMORY_OVERHEAD
     sparkConf.get(amMemoryOverheadEntry).getOrElse(
       math.max((amMemoryOverheadFactor * amMemory).toLong,
-        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
+        driverMinimumMemoryOverhead)).toInt
   }
+
   private val amCores = if (isClusterMode) {
     sparkConf.get(DRIVER_CORES)
   } else {
@@ -114,9 +121,10 @@ private[spark] class Client(
   protected val executorOffHeapMemory = 
Utils.executorOffHeapMemorySizeAsMb(sparkConf)
 
   private val executorMemoryOvereadFactor = 
sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
+  private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD)
   private val executorMemoryOverhead = 
sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
     math.max((executorMemoryOvereadFactor * executorMemory).toLong,
-      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
+      minMemoryOverhead)).toInt
 
   private val isPython = sparkConf.get(IS_PYTHON_APP)
   private val pysparkWorkerMemory: Int = if (isPython) {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 7f0469937fef..ea4dec4e6b73 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -169,6 +169,8 @@ private[yarn] class YarnAllocator(
 
   private val isPythonApp = sparkConf.get(IS_PYTHON_APP)
 
+  private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD)
+
   private val memoryOverheadFactor = 
sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
 
   private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
@@ -313,7 +315,7 @@ private[yarn] class YarnAllocator(
 
       val resourcesWithDefaults =
         ResourceProfile.getResourcesForClusterManager(rp.id, 
rp.executorResources,
-          memoryOverheadFactor, sparkConf, isPythonApp, resourceNameMapping)
+          minMemoryOverhead, memoryOverheadFactor, sparkConf, isPythonApp, 
resourceNameMapping)
       val customSparkResources =
         resourcesWithDefaults.customResources.map { case (name, execReq) =>
           (name, execReq.amount.toString)
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index ac946b514fa5..78e84690900e 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -706,6 +706,39 @@ class ClientSuite extends SparkFunSuite
     assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE), 
mockFsLookup).size === 2)
   }
 
+  Seq(
+      "client",
+      "cluster"
+    ).foreach { case (deployMode) =>
+      test(s"SPARK-47208: minimum memory overhead is correctly set in 
($deployMode mode)") {
+        val sparkConf = new SparkConf()
+          .set("spark.app.name", "foo-test-app")
+          .set(SUBMIT_DEPLOY_MODE, deployMode)
+          .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L)
+        val args = new ClientArguments(Array())
+
+        val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+        val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+        val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+        val client = new Client(args, sparkConf, null)
+        client.createApplicationSubmissionContext(
+          new YarnClientApplication(getNewApplicationResponse, appContext),
+          containerLaunchContext)
+
+        appContext.getApplicationName should be ("foo-test-app")
+        // flag should only work for cluster mode
+        if (deployMode == "cluster") {
+          // 1Gb driver default + 500 overridden minimum default overhead
+          appContext.getResource should be (Resource.newInstance(1524L, 1))
+        } else {
+          // 512 driver default (non-cluster) + 384 overhead default
+          // that can't be changed in non cluster mode.
+          appContext.getResource should be (Resource.newInstance(896L, 1))
+        }
+      }
+    }
+
   private val matching = Seq(
     ("files URI match test1", "file:///file1", "file:///file2"),
     ("files URI match test2", "file:///c:file1", "file://c:file2"),
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index efd66a912174..09e35a308728 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -731,6 +731,7 @@ class YarnAllocatorSuite extends SparkFunSuite
     val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
     val offHeapMemoryInMB = 1024L
     val offHeapMemoryInByte = offHeapMemoryInMB * 1024 * 1024
+    val clientModeMinOffHeapMemory = 384L
     try {
       sparkConf.set(MEMORY_OFFHEAP_ENABLED, true)
       sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte)
@@ -739,7 +740,7 @@ class YarnAllocatorSuite extends SparkFunSuite
       val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
       val memory = defaultResource.getMemorySize
       assert(memory ==
-        executorMemory + offHeapMemoryInMB + 
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)
+        executorMemory + offHeapMemoryInMB + clientModeMinOffHeapMemory)
     } finally {
       sparkConf.set(MEMORY_OFFHEAP_ENABLED, originalOffHeapEnabled)
       sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
@@ -772,6 +773,39 @@ class YarnAllocatorSuite extends SparkFunSuite
       assert(memory == (executorMemory * 1.4).toLong)
     } finally {
       sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+      sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+    }
+  }
+
+  test("SPARK-47208: User can override the minimum memory overhead of the 
executor") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf
+        .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> 
executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemorySize
+      assert(memory == (executorMemory + 500))
+    } finally {
+      sparkConf.remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
+    }
+  }
+
+  test("SPARK-47208: Explicit overhead takes precedence over minimum 
overhead") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf
+        .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L)
+        .set(EXECUTOR_MEMORY_OVERHEAD, 100L)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> 
executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemorySize
+      assert(memory == (executorMemory + 100))
+    } finally {
+      sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD)
+      sparkConf.remove(EXECUTOR_MIN_MEMORY_OVERHEAD)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to