This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f36a5fb [SPARK-38194] Followup: Fix k8s memory overhead passing to executor pods f36a5fb is described below commit f36a5fb2b88620c1c490d087b0293c4e58d29979 Author: Adam Binford - Customer Site (Virginia) - CW 121796 <abinf...@pu00cenvdi797.vdi.ux.dg.local> AuthorDate: Thu Mar 17 18:32:29 2022 -0500 [SPARK-38194] Followup: Fix k8s memory overhead passing to executor pods ### What changes were proposed in this pull request? Follow up to https://github.com/apache/spark/pull/35504 to fix k8s memory overhead handling. ### Why are the changes needed? https://github.com/apache/spark/pull/35504 introduced a bug only caught by the K8S integration tests. ### Does this PR introduce _any_ user-facing change? Fix back to old behavior. ### How was this patch tested? See if IT passes Closes #35901 from Kimahriman/k8s-memory-overhead-executors. Authored-by: Adam Binford - Customer Site (Virginia) - CW 121796 <abinf...@pu00cenvdi797.vdi.ux.dg.local> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../k8s/features/BasicDriverFeatureStep.scala | 30 +++++++----- .../k8s/features/BasicDriverFeatureStepSuite.scala | 57 +++++++++++++++++----- 2 files changed, 63 insertions(+), 24 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 9715149..413f5bc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -53,28 +53,32 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) - private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) { - conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) - } else { - conf.get(MEMORY_OVERHEAD_FACTOR) - } - // The memory overhead factor to use. If the user has not set it, then use a different - // value for non-JVM apps. This value is propagated to executors. - private val overheadFactor = + // The default memory overhead factor to use, derived from the deprecated + // `spark.kubernetes.memoryOverheadFactor` config or the default overhead values. + // If the user has not set it, then use a different default for non-JVM apps. This value is + // propagated to executors and used if the executor overhead factor is not set explicitly. + private val defaultOverheadFactor = if (conf.mainAppResource.isInstanceOf[NonJVMResource]) { - if (conf.contains(MEMORY_OVERHEAD_FACTOR) || conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) { - memoryOverheadFactor + if (conf.contains(MEMORY_OVERHEAD_FACTOR)) { + conf.get(MEMORY_OVERHEAD_FACTOR) } else { NON_JVM_MEMORY_OVERHEAD_FACTOR } } else { - memoryOverheadFactor + conf.get(MEMORY_OVERHEAD_FACTOR) } + // Prefer the driver memory overhead factor if set explicitly + private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) { + conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) + } else { + defaultOverheadFactor + } + private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) - .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, + .getOrElse(math.max((memoryOverheadFactor * driverMemoryMiB).toInt, ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB @@ -169,7 +173,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, "spark.app.id" -> conf.appId, KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", - DRIVER_MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) + MEMORY_OVERHEAD_FACTOR.key -> defaultOverheadFactor.toString) // try upload local, resolvable files to a hadoop compatible file system Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key => val uris = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index d45f5f9..9a3b06a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -134,7 +134,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", "spark.app.id" -> KubernetesTestConf.APP_ID, "spark.kubernetes.submitInDriver" -> "true", - DRIVER_MEMORY_OVERHEAD_FACTOR.key -> DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) + MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } @@ -192,13 +192,16 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { // Memory overhead tests. Tuples are: // test name, main resource, overhead factor, expected factor + val driverDefault = DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get + val oldConfigDefault = MEMORY_OVERHEAD_FACTOR.defaultValue.get + val nonJvm = NON_JVM_MEMORY_OVERHEAD_FACTOR Seq( - ("java", JavaMainAppResource(None), None, DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get), - ("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR), - ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d), - ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR) - ).foreach { case (name, resource, factor, expectedFactor) => - test(s"memory overhead factor: $name") { + ("java", JavaMainAppResource(None), None, driverDefault, oldConfigDefault), + ("python default", PythonMainAppResource(null), None, nonJvm, nonJvm), + ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d, nonJvm), + ("r default", RMainAppResource(null), None, nonJvm, nonJvm) + ).foreach { case (name, resource, factor, expectedFactor, expectedPropFactor) => + test(s"memory overhead factor new config: $name") { // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB val driverMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 @@ -218,7 +221,37 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(mem === s"${expected}Mi") val systemProperties = step.getAdditionalPodSystemProperties() - assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedPropFactor.toString) + } + } + + Seq( + ("java", JavaMainAppResource(None), None, driverDefault), + ("python default", PythonMainAppResource(null), None, nonJvm), + ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d), + ("r default", RMainAppResource(null), None, nonJvm) + ).foreach { case (name, resource, factor, expectedFactor) => + test(s"memory overhead factor old config: $name") { + // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val driverMem = + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") + factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) } + val conf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + mainAppResource = resource) + val step = new BasicDriverFeatureStep(conf) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) + val expected = (driverMem + driverMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) } } @@ -234,8 +267,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { // New config should take precedence val expectedFactor = 0.2 + val oldFactor = 0.3 sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, expectedFactor) - sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) + sparkConf.set(MEMORY_OVERHEAD_FACTOR, oldFactor) val conf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf) @@ -245,8 +279,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val expected = (driverMem + driverMem * expectedFactor).toInt assert(mem === s"${expected}Mi") + // The old config should be passed as a system property for use by the executor val systemProperties = step.getAdditionalPodSystemProperties() - assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === oldFactor.toString) } test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") { @@ -272,7 +307,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(mem === s"${expected}Mi") val systemProperties = step.getAdditionalPodSystemProperties() - assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) } test("SPARK-35493: make spark.blockManager.port be able to be fallen back to in driver pod") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org