This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4b073fd607f1 [SPARK-47208][CORE] Allow overriding base overhead memory 4b073fd607f1 is described below commit 4b073fd607f14153ca511da08dbc28c7340b287a Author: jpcorreia99 <jpcorrei...@gmail.com> AuthorDate: Thu Mar 14 08:23:23 2024 -0500 [SPARK-47208][CORE] Allow overriding base overhead memory ### What changes were proposed in this pull request? We can already select the desired overhead memory directly via the `spark.driver/executor.memoryOverhead` flags, however, if that flag is not present the overhead memory calculation goes as follows: ``` overhead_memory = Max(384, 'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor') [where the 'memoryOverheadFactor' flag defaults to 0.1] ``` This PR adds two new spark configs: `spark.driver.minMemoryOverhead` and `spark.executor.minMemoryOverhead`, which can be used to override the 384Mib minimum value. The memory overhead calculation will now be : ``` min_memory = sparkConf.get('spark.driver/executor.minMemoryOverhead').getOrElse(384) overhead_memory = Max(min_memory, 'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor') ``` ### Why are the changes needed? There are certain times where being able to override the 384Mb minimum directly can be beneficial. We may have a scenario where a lot of off-heap operations are performed (ex: using package managers/native compression/decompression) where we don't have a need for a large JVM heap but we may still need a signficant amount of memory in the spark node. Using the `memoryOverheadFactor` config flag may not prove appropriate, since we may not want the overhead allocation to directly scale with JVM memory, as a cost saving/resource limitation problem. ### Does this PR introduce _any_ user-facing change? Yes, as described above, two new flags have been added to the spark config. No break of existing behaviours. ### How was this patch tested? Added tests for 3 cases: - If `spark.driver/executor.memoryOverhead` is set, then the new changes have no effect. - If `spark.driver/executor.minMemoryOverhead` is set and its value is higher than 'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor', the total memory will be the allocated JVM memory + `spark.driver/executor.minMemoryOverhead` - If `spark.driver/executor.minMemoryOverhead` but its value is lower than 'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor', the total memory will be the allocated JVM memory + 'spark.driver/executor.memory' * 'spark.driver/executor.memoryOverheadFactor'. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45240 from jpcorreia99/jcorrreia/MinOverheadMemoryOverride. Authored-by: jpcorreia99 <jpcorrei...@gmail.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../org/apache/spark/internal/config/package.scala | 17 +++++ .../apache/spark/resource/ResourceProfile.scala | 8 +- .../spark/resource/ResourceProfileSuite.scala | 2 +- docs/configuration.md | 22 +++++- .../k8s/features/BasicDriverFeatureStep.scala | 7 +- .../k8s/features/BasicExecutorFeatureStep.scala | 2 + .../k8s/features/BasicDriverFeatureStepSuite.scala | 85 ++++++++++++++++++++-- .../features/BasicExecutorFeatureStepSuite.scala | 43 ++++++++++- .../org/apache/spark/deploy/yarn/Client.scala | 16 +++- .../apache/spark/deploy/yarn/YarnAllocator.scala | 4 +- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 33 +++++++++ .../spark/deploy/yarn/YarnAllocatorSuite.scala | 36 ++++++++- 12 files changed, 252 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1fcf75b02503..aa240b5cc5b5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -117,6 +117,14 @@ package object config { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val DRIVER_MIN_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.minMemoryOverhead") + .doc("The minimum amount of non-heap memory to be allocated per driver in cluster mode, " + + "in MiB unless otherwise specified. This value is ignored if " + + "spark.driver.memoryOverhead is set directly.") + .version("4.0.0") + .bytesConf(ByteUnit.MiB) + .createWithDefaultString("384m") + private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.driver.memoryOverheadFactor") .doc("Fraction of driver memory to be allocated as additional non-heap memory per driver " + @@ -358,6 +366,15 @@ package object config { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val EXECUTOR_MIN_MEMORY_OVERHEAD = + ConfigBuilder("spark.executor.minMemoryOverhead") + .doc("The minimum amount of non-heap memory to be allocated per executor " + + "in MiB unless otherwise specified. This value is ignored if " + + "spark.executor.memoryOverhead is set directly.") + .version("4.0.0") + .bytesConf(ByteUnit.MiB) + .createWithDefaultString("384m") + private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.executor.memoryOverheadFactor") .doc("Fraction of executor memory to be allocated as additional non-heap memory per " + diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 4a55b2f619e6..e95dbe973691 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -352,8 +352,6 @@ object ResourceProfile extends Logging { val UNKNOWN_RESOURCE_PROFILE_ID = -1 val DEFAULT_RESOURCE_PROFILE_ID = 0 - private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L - private lazy val nextProfileId = new AtomicInteger(0) private val DEFAULT_PROFILE_LOCK = new Object() @@ -489,10 +487,11 @@ object ResourceProfile extends Logging { private[spark] def calculateOverHeadMemory( overHeadMemFromConf: Option[Long], + minimumOverHeadMemoryFromConf: Long, executorMemoryMiB: Long, overheadFactor: Double): Long = { overHeadMemFromConf.getOrElse(math.max((overheadFactor * executorMemoryMiB).toInt, - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)) + minimumOverHeadMemoryFromConf)) } /** @@ -504,6 +503,7 @@ object ResourceProfile extends Logging { private[spark] def getResourcesForClusterManager( rpId: Int, execResources: Map[String, ExecutorResourceRequest], + minimumOverheadMemory: Long, overheadFactor: Double, conf: SparkConf, isPythonApp: Boolean, @@ -515,7 +515,7 @@ object ResourceProfile extends Logging { var memoryOffHeapMiB = defaultResources.memoryOffHeapMiB var pysparkMemoryMiB = defaultResources.pysparkMemoryMiB.getOrElse(0L) var memoryOverheadMiB = calculateOverHeadMemory(defaultResources.memoryOverheadMiB, - executorMemoryMiB, overheadFactor) + minimumOverheadMemory, executorMemoryMiB, overheadFactor) val finalCustomResources = if (rpId != DEFAULT_RESOURCE_PROFILE_ID) { val customResources = new mutable.HashMap[String, ExecutorResourceRequest] diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala index 99bd5f520042..3464c3b3a0c5 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala @@ -98,7 +98,7 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar { new ExecutorResourceRequests().cores(4) val rp = rpBuilder.require(taskReq).require(execReq).build() val executorResourceForRp = ResourceProfile.getResourcesForClusterManager( - rp.id, rp.executorResources, 0.0, sparkConf, false, Map.empty) + rp.id, rp.executorResources, 500L, 0.0, sparkConf, false, Map.empty) // Standalone cluster only take cores and executor memory as built-in resources. assert(executorResourceForRp.cores.get === 4) assert(executorResourceForRp.executorMemoryMiB === 1024L) diff --git a/docs/configuration.md b/docs/configuration.md index f0d68c55e7b3..9a686bc514c5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -187,7 +187,7 @@ of the most common options to set are: </tr> <tr> <td><code>spark.driver.memoryOverhead</code></td> - <td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with minimum of 384 </td> + <td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with minimum of <code>spark.driver.minMemoryOverhead</code></td> <td> Amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, @@ -202,6 +202,15 @@ of the most common options to set are: </td> <td>2.3.0</td> </tr> +<tr> + <td><code>spark.driver.minMemoryOverhead</code></td> + <td>384m</td> + <td> + The minimum amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless otherwise specified, if <code>spark.driver.memoryOverhead</code> is not defined. + This option is currently supported on YARN and Kubernetes. + </td> + <td>4.0.0</td> +</tr> <tr> <td><code>spark.driver.memoryOverheadFactor</code></td> <td>0.10</td> @@ -291,7 +300,7 @@ of the most common options to set are: </tr> <tr> <td><code>spark.executor.memoryOverhead</code></td> - <td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with minimum of 384 </td> + <td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with minimum of <code>spark.executor.minMemoryOverhead</code></td> <td> Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. @@ -306,6 +315,15 @@ of the most common options to set are: </td> <td>2.3.0</td> </tr> +<tr> + <td><code>spark.driver.minMemoryOverhead</code></td> + <td>384m</td> + <td> + The minimum amount of non-heap memory to be allocated per executor process, in MiB unless otherwise specified, if <code>spark.executor.memoryOverhead</code> is not defined. + This option is currently supported on YARN and Kubernetes. + </td> + <td>4.0.0</td> +</tr> <tr> <td><code>spark.executor.memoryOverheadFactor</code></td> <td>0.10</td> diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 51ee9ffbe405..bd198deed3d5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ -import org.apache.spark.resource.ResourceProfile import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils @@ -67,6 +66,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) conf.get(MEMORY_OVERHEAD_FACTOR) } + private val driverMinimumMemoryOverhead = conf.get(DRIVER_MIN_MEMORY_OVERHEAD) + // Prefer the driver memory overhead factor if set explicitly private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) { conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) @@ -74,10 +75,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) defaultOverheadFactor } - private val memoryOverheadMiB = conf + private val memoryOverheadMiB: Long = conf .get(DRIVER_MEMORY_OVERHEAD) .getOrElse(math.max((memoryOverheadFactor * driverMemoryMiB).toInt, - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)) + driverMinimumMemoryOverhead)) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index febd8dcfa75f..fa4a6f43215c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -59,6 +59,7 @@ private[spark] class BasicExecutorFeatureStep( private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON) private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP) + private val minimumMemoryOverhead = kubernetesConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD) private val memoryOverheadFactor = if (kubernetesConf.contains(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) { kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) } else { @@ -68,6 +69,7 @@ private[spark] class BasicExecutorFeatureStep( val execResources = ResourceProfile.getResourcesForClusterManager( resourceProfile.id, resourceProfile.executorResources, + minimumMemoryOverhead, memoryOverheadFactor, kubernetesConf.sparkConf, isPythonApp, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index a9149b6cbf14..f102851e6c3b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ -import org.apache.spark.resource.{ResourceID, ResourceProfile} +import org.apache.spark.resource.ResourceID import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.util.Utils @@ -205,7 +205,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { test(s"memory overhead factor new config: $name") { // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB val driverMem = - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get / + DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 // main app resource, overhead factor val sparkConf = new SparkConf(false) @@ -235,7 +236,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { test(s"memory overhead factor old config: $name") { // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB val driverMem = - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 // main app resource, overhead factor val sparkConf = new SparkConf(false) @@ -259,7 +260,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { test(s"SPARK-38194: memory overhead factor precendence") { // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB val driverMem = - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get / + DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 // main app resource, overhead factor val sparkConf = new SparkConf(false) @@ -288,7 +290,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") { // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB val driverMem = - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + DRIVER_MIN_MEMORY_OVERHEAD.defaultValue.get / + DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 // main app resource, overhead factor val sparkConf = new SparkConf(false) @@ -372,6 +375,78 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { path.startsWith(FILE_UPLOAD_PATH) && path.endsWith("some-local-jar.jar"))) } + test("SPARK-47208: User can override the minimum memory overhead of the driver") { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(DRIVER_MEMORY.key, "256M") + .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L) + .set(CONTAINER_IMAGE, "spark-driver:latest") + val kubernetesConf: KubernetesDriverConf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + labels = CUSTOM_DRIVER_LABELS, + environment = DRIVER_ENVS, + annotations = DRIVER_ANNOTATIONS) + + val featureStep = new BasicDriverFeatureStep(kubernetesConf) + val basePod = SparkPod.initialPod() + val configuredPod = featureStep.configurePod(basePod) + + val resourceRequirements = configuredPod.container.getResources + val requests = resourceRequirements.getRequests.asScala + assert(amountAndFormat(requests("memory")) === "756Mi") + val limits = resourceRequirements.getLimits.asScala + assert(amountAndFormat(limits("memory")) === "756Mi") + } + + test("SPARK-47208: Explicit overhead takes precedence over minimum overhead") { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(DRIVER_MEMORY.key, "256M") + .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L) + .set(DRIVER_MEMORY_OVERHEAD, 200L) + .set(CONTAINER_IMAGE, "spark-driver:latest") + val kubernetesConf: KubernetesDriverConf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + labels = CUSTOM_DRIVER_LABELS, + environment = DRIVER_ENVS, + annotations = DRIVER_ANNOTATIONS) + + val featureStep = new BasicDriverFeatureStep(kubernetesConf) + val basePod = SparkPod.initialPod() + val configuredPod = featureStep.configurePod(basePod) + + val resourceRequirements = configuredPod.container.getResources + val requests = resourceRequirements.getRequests.asScala + assert(amountAndFormat(requests("memory")) === "456Mi") + val limits = resourceRequirements.getLimits.asScala + assert(amountAndFormat(limits("memory")) === "456Mi") + } + + test("SPARK-47208: Overhead is maximum between factor of memory and min base overhead") { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(DRIVER_MEMORY.key, "5000M") + .set(DRIVER_MIN_MEMORY_OVERHEAD, 200L) + .set(CONTAINER_IMAGE, "spark-driver:latest") + val kubernetesConf: KubernetesDriverConf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + labels = CUSTOM_DRIVER_LABELS, + environment = DRIVER_ENVS, + annotations = DRIVER_ANNOTATIONS) + + val featureStep = new BasicDriverFeatureStep(kubernetesConf) + val basePod = SparkPod.initialPod() + val configuredPod = featureStep.configurePod(basePod) + + val resourceRequirements = configuredPod.container.getResources + val requests = resourceRequirements.getRequests.asScala + // mem = 5000 + max(overhead_factor[0.1] * 5000, 200) + assert(amountAndFormat(requests("memory")) === "5500Mi") + val limits = resourceRequirements.getLimits.asScala + assert(amountAndFormat(limits("memory")) === "5500Mi") + } + + def containerPort(name: String, portNumber: Int): ContainerPort = new ContainerPortBuilder() .withName(name) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index b9d5681907a6..eaf39dd816dc 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -460,7 +460,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { test(s"SPARK-38194: memory overhead factor precendence") { // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get - val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2 + val executorMem = EXECUTOR_MIN_MEMORY_OVERHEAD.defaultValue.get / defaultFactor * 2 // main app resource, overhead factor val sparkConf = new SparkConf(false) @@ -487,7 +487,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") { // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get - val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2 + val executorMem = EXECUTOR_MIN_MEMORY_OVERHEAD.defaultValue.get / defaultFactor * 2 // main app resource, overhead factor val sparkConf = new SparkConf(false) @@ -524,6 +524,45 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { assert(podConfigured1.container.getPorts.contains(ports)) } + test("SPARK-47208: User can override the minimum memory overhead of the executor") { + // main app resource, overriding the minimum oberhead to 500Mb + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L) + + val conf = KubernetesTestConf.createExecutorConf( + sparkConf = sparkConf) + ResourceProfile.clearDefaultProfile() + val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf), + resourceProfile) + val executor = step.configurePod(SparkPod.initialPod()) + + // memory = 1024M (default) + 500B (minimum overhead got overridden from the 384Mib) + assert(amountAndFormat(executor.container.getResources + .getLimits.get("memory")) === "1524Mi") + } + + test("SPARK-47208: Explicit overhead takes precedence over minimum overhead") { + // main app resource, explicit overhead of 150MiB + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(EXECUTOR_MEMORY_OVERHEAD, 150L) + .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L) + + val conf = KubernetesTestConf.createExecutorConf( + sparkConf = sparkConf) + ResourceProfile.clearDefaultProfile() + val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf), + resourceProfile) + val executor = step.configurePod(SparkPod.initialPod()) + + // memory = 1024M + 150MB (overrides any other overhead calculation) + assert(amountAndFormat(executor.container.getResources + .getLimits.get("memory")) === "1174Mi") + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 4e70b57e4bb7..1c762b98c4d0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -59,7 +59,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} -import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper} import org.apache.spark.util.ArrayImplicits._ @@ -76,7 +75,6 @@ private[spark] class Client( private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster" - private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode private val statCachePreloadEnabled = sparkConf.get(YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED) private val statCachePreloadDirectoryCountThreshold: Int = @@ -96,12 +94,21 @@ private[spark] class Client( } else { sparkConf.get(AM_MEMORY).toInt } + + private val driverMinimumMemoryOverhead = + if (isClusterMode) { + sparkConf.get(DRIVER_MIN_MEMORY_OVERHEAD) + } else { + 384L + } + private val amMemoryOverhead = { val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD sparkConf.get(amMemoryOverheadEntry).getOrElse( math.max((amMemoryOverheadFactor * amMemory).toLong, - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt + driverMinimumMemoryOverhead)).toInt } + private val amCores = if (isClusterMode) { sparkConf.get(DRIVER_CORES) } else { @@ -114,9 +121,10 @@ private[spark] class Client( protected val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf) private val executorMemoryOvereadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) + private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD) private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((executorMemoryOvereadFactor * executorMemory).toLong, - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt + minMemoryOverhead)).toInt private val isPython = sparkConf.get(IS_PYTHON_APP) private val pysparkWorkerMemory: Int = if (isPython) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 7f0469937fef..ea4dec4e6b73 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -169,6 +169,8 @@ private[yarn] class YarnAllocator( private val isPythonApp = sparkConf.get(IS_PYTHON_APP) + private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD) + private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( @@ -313,7 +315,7 @@ private[yarn] class YarnAllocator( val resourcesWithDefaults = ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources, - memoryOverheadFactor, sparkConf, isPythonApp, resourceNameMapping) + minMemoryOverhead, memoryOverheadFactor, sparkConf, isPythonApp, resourceNameMapping) val customSparkResources = resourcesWithDefaults.customResources.map { case (name, execReq) => (name, execReq.amount.toString) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index ac946b514fa5..78e84690900e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -706,6 +706,39 @@ class ClientSuite extends SparkFunSuite assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE), mockFsLookup).size === 2) } + Seq( + "client", + "cluster" + ).foreach { case (deployMode) => + test(s"SPARK-47208: minimum memory overhead is correctly set in ($deployMode mode)") { + val sparkConf = new SparkConf() + .set("spark.app.name", "foo-test-app") + .set(SUBMIT_DEPLOY_MODE, deployMode) + .set(DRIVER_MIN_MEMORY_OVERHEAD, 500L) + val args = new ClientArguments(Array()) + + val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) + val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) + val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + + val client = new Client(args, sparkConf, null) + client.createApplicationSubmissionContext( + new YarnClientApplication(getNewApplicationResponse, appContext), + containerLaunchContext) + + appContext.getApplicationName should be ("foo-test-app") + // flag should only work for cluster mode + if (deployMode == "cluster") { + // 1Gb driver default + 500 overridden minimum default overhead + appContext.getResource should be (Resource.newInstance(1524L, 1)) + } else { + // 512 driver default (non-cluster) + 384 overhead default + // that can't be changed in non cluster mode. + appContext.getResource should be (Resource.newInstance(896L, 1)) + } + } + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index efd66a912174..09e35a308728 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -731,6 +731,7 @@ class YarnAllocatorSuite extends SparkFunSuite val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt val offHeapMemoryInMB = 1024L val offHeapMemoryInByte = offHeapMemoryInMB * 1024 * 1024 + val clientModeMinOffHeapMemory = 384L try { sparkConf.set(MEMORY_OFFHEAP_ENABLED, true) sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte) @@ -739,7 +740,7 @@ class YarnAllocatorSuite extends SparkFunSuite val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) val memory = defaultResource.getMemorySize assert(memory == - executorMemory + offHeapMemoryInMB + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB) + executorMemory + offHeapMemoryInMB + clientModeMinOffHeapMemory) } finally { sparkConf.set(MEMORY_OFFHEAP_ENABLED, originalOffHeapEnabled) sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize) @@ -772,6 +773,39 @@ class YarnAllocatorSuite extends SparkFunSuite assert(memory == (executorMemory * 1.4).toLong) } finally { sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1) + sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD) + } + } + + test("SPARK-47208: User can override the minimum memory overhead of the executor") { + val executorMemory = sparkConf.get(EXECUTOR_MEMORY) + try { + sparkConf + .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L) + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) + val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) + val memory = defaultResource.getMemorySize + assert(memory == (executorMemory + 500)) + } finally { + sparkConf.remove(EXECUTOR_MIN_MEMORY_OVERHEAD) + } + } + + test("SPARK-47208: Explicit overhead takes precedence over minimum overhead") { + val executorMemory = sparkConf.get(EXECUTOR_MEMORY) + try { + sparkConf + .set(EXECUTOR_MIN_MEMORY_OVERHEAD, 500L) + .set(EXECUTOR_MEMORY_OVERHEAD, 100L) + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) + val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) + val memory = defaultResource.getMemorySize + assert(memory == (executorMemory + 100)) + } finally { + sparkConf.remove(EXECUTOR_MEMORY_OVERHEAD) + sparkConf.remove(EXECUTOR_MIN_MEMORY_OVERHEAD) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org