This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 998086c [SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory 998086c is described below commit 998086c9a179692b2687bc9a104dbbb35f5a44e2 Author: Warren Zhu <zho...@microsoft.com> AuthorDate: Mon Jul 27 08:16:13 2020 -0500 [SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory ### What changes were proposed in this pull request? Support set off heap memory in `ExecutorResourceRequests` ### Why are the changes needed? Support stage level scheduling ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in `ResourceProfileSuite` and `DAGSchedulerSuite` Closes #28972 from warrenzhu25/30794. Authored-by: Warren Zhu <zho...@microsoft.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../spark/resource/ExecutorResourceRequests.scala | 14 ++++++ .../apache/spark/resource/ResourceProfile.scala | 8 +++- .../spark/resource/ResourceProfileSuite.scala | 50 +++++++++++++++++++--- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 5 ++- python/pyspark/resource/requests.py | 9 ++++ python/pyspark/resource/tests/test_resources.py | 5 ++- .../apache/spark/deploy/yarn/YarnAllocator.scala | 3 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 23 ++++++++-- 8 files changed, 102 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala index 9da6ffb..654afa0 100644 --- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala +++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala @@ -55,6 +55,20 @@ class ExecutorResourceRequests() extends Serializable { } /** + * Specify off heap memory. The value specified will be converted to MiB. + * This value only take effect when MEMORY_OFFHEAP_ENABLED is true. + * + * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g). + * Default unit is MiB if not specified. + */ + def offHeapMemory(amount: String): this.type = { + val amountMiB = JavaUtils.byteStringAsMb(amount) + val req = new ExecutorResourceRequest(OFFHEAP_MEM, amountMiB) + _executorResources.put(OFFHEAP_MEM, req) + this + } + + /** * Specify overhead memory. The value specified will be converted to MiB. * * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g). diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index f56ea69..8a37670 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -243,13 +243,15 @@ object ResourceProfile extends Logging { // task resources val CPUS = "cpus" // Executor resources + // Make sure add new executor resource in below allSupportedExecutorResources val CORES = "cores" val MEMORY = "memory" + val OFFHEAP_MEM = "offHeap" val OVERHEAD_MEM = "memoryOverhead" val PYSPARK_MEM = "pyspark.memory" // all supported spark executor resources (minus the custom resources like GPUs/FPGAs) - val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM) + val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM) val UNKNOWN_RESOURCE_PROFILE_ID = -1 val DEFAULT_RESOURCE_PROFILE_ID = 0 @@ -295,6 +297,10 @@ object ResourceProfile extends Logging { ereqs.memory(conf.get(EXECUTOR_MEMORY).toString) conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => ereqs.memoryOverhead(mem.toString)) conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => ereqs.pysparkMemory(mem.toString)) + if (conf.get(MEMORY_OFFHEAP_ENABLED)) { + // Explicitly add suffix b as default unit of offHeapMemory is Mib + ereqs.offHeapMemory(conf.get(MEMORY_OFFHEAP_SIZE).toString + "b") + } val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX) execReq.foreach { req => val name = req.id.resourceName diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala index 29d3ef1..d0479ca 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.resource import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} +import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.resource.TestResourceIDs._ @@ -55,6 +55,8 @@ class ResourceProfileSuite extends SparkFunSuite { "pyspark memory empty if not specified") assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None, "overhead memory empty if not specified") + assert(rprof.executorResources.get(ResourceProfile.OFFHEAP_MEM) == None, + "offHeap memory empty if not specified") assert(rprof.taskResources.size === 1, "Task resources should just contain cpus by default") assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1, @@ -69,14 +71,16 @@ class ResourceProfileSuite extends SparkFunSuite { conf.set(EXECUTOR_MEMORY_OVERHEAD.key, "1g") conf.set(EXECUTOR_MEMORY.key, "4g") conf.set(EXECUTOR_CORES.key, "4") + conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") + conf.set(MEMORY_OFFHEAP_SIZE.key, "3m") conf.set(TASK_GPU_ID.amountConf, "1") conf.set(EXECUTOR_GPU_ID.amountConf, "1") conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, "nameOfScript") val rprof = ResourceProfile.getOrCreateDefaultProfile(conf) assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) val execResources = rprof.executorResources - assert(execResources.size === 5, s"Executor resources should contain cores, pyspark " + - s"memory, memory overhead, memory, and gpu $execResources") + assert(execResources.size === 6, s"Executor resources should contain cores, pyspark " + + s"memory, memory overhead, memory, offHeap memory and gpu $execResources") assert(execResources.contains("gpu"), "Executor resources should have gpu") assert(rprof.executorResources(ResourceProfile.CORES).amount === 4, "Executor resources should have 4 core") @@ -88,6 +92,8 @@ class ResourceProfileSuite extends SparkFunSuite { "pyspark memory empty if not specified") assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount == 1024, "overhead memory empty if not specified") + assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount == 3, + "Executor resources should have 3 offHeap memory") assert(rprof.taskResources.size === 2, "Task resources should just contain cpus and gpu") assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu") @@ -172,14 +178,14 @@ class ResourceProfileSuite extends SparkFunSuite { val ereqs = new ExecutorResourceRequests() ereqs.cores(2).memory("4096") - ereqs.memoryOverhead("2048").pysparkMemory("1024") + ereqs.memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072") val treqs = new TaskResourceRequests() treqs.cpus(1) rprof.require(treqs) rprof.require(ereqs) - assert(rprof.executorResources.size === 5) + assert(rprof.executorResources.size === 6) assert(rprof.executorResources(ResourceProfile.CORES).amount === 2, "Executor resources should have 2 cores") assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096, @@ -188,6 +194,8 @@ class ResourceProfileSuite extends SparkFunSuite { "Executor resources should have 2048 overhead memory") assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024, "Executor resources should have 1024 pyspark memory") + assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 3072, + "Executor resources should have 3072 offHeap memory") assert(rprof.taskResources.size === 2) assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu") @@ -217,7 +225,7 @@ class ResourceProfileSuite extends SparkFunSuite { val rprof = new ResourceProfileBuilder() val ereqs = new ExecutorResourceRequests() ereqs.memory("4g") - ereqs.memoryOverhead("2000m").pysparkMemory("512000k") + ereqs.memoryOverhead("2000m").pysparkMemory("512000k").offHeapMemory("1g") rprof.require(ereqs) assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096, @@ -226,6 +234,8 @@ class ResourceProfileSuite extends SparkFunSuite { "Executor resources should have 2000 overhead memory") assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500, "Executor resources should have 512 pyspark memory") + assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 1024, + "Executor resources should have 1024 offHeap memory") } test("Test TaskResourceRequest fractional") { @@ -256,4 +266,32 @@ class ResourceProfileSuite extends SparkFunSuite { }.getMessage() assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number.")) } + + test("ResourceProfile has correct custom executor resources") { + val rprof = new ResourceProfileBuilder() + val eReq = new ExecutorResourceRequests() + .cores(2).memory("4096") + .memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072") + .resource("gpu", 2) + rprof.require(eReq) + + // Update this if new resource type added + assert(ResourceProfile.allSupportedExecutorResources.size === 5, + "Executor resources should have 5 supported resources") + assert(ResourceProfile.getCustomExecutorResources(rprof.build).size === 1, + "Executor resources should have 1 custom resource") + } + + test("ResourceProfile has correct custom task resources") { + val rprof = new ResourceProfileBuilder() + val taskReq = new TaskResourceRequests() + .resource("gpu", 1) + val eReq = new ExecutorResourceRequests() + .cores(2).memory("4096") + .memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072") + rprof.require(taskReq).require(eReq) + + assert(ResourceProfile.getCustomTaskResources(rprof.build).size === 1, + "Task resources should have 1 custom resource") + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2b38aa1..45af0d0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3286,7 +3286,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mergedRp.taskResources.get(GPU).get.amount == 1) val ereqs5 = new ExecutorResourceRequests().cores(1).memory("3g") - .memoryOverhead("1g").pysparkMemory("2g").resource(GPU, 1, "disc") + .memoryOverhead("1g").pysparkMemory("2g").offHeapMemory("4g").resource(GPU, 1, "disc") val treqs5 = new TaskResourceRequests().cpus(1).resource(GPU, 1) val rp5 = new ResourceProfile(ereqs5.requests, treqs5.requests) val ereqs6 = new ExecutorResourceRequests().cores(8).resource(FPGA, 2, "fdisc") @@ -3296,7 +3296,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mergedRp.getTaskCpus.get == 2) assert(mergedRp.getExecutorCores.get == 8) - assert(mergedRp.executorResources.size == 6) + assert(mergedRp.executorResources.size == 7) assert(mergedRp.taskResources.size == 3) assert(mergedRp.executorResources.get(GPU).get.amount == 1) assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc") @@ -3307,6 +3307,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mergedRp.executorResources.get(ResourceProfile.MEMORY).get.amount == 3072) assert(mergedRp.executorResources.get(ResourceProfile.PYSPARK_MEM).get.amount == 2048) assert(mergedRp.executorResources.get(ResourceProfile.OVERHEAD_MEM).get.amount == 1024) + assert(mergedRp.executorResources.get(ResourceProfile.OFFHEAP_MEM).get.amount == 4096) val ereqs7 = new ExecutorResourceRequests().cores(1).memory("3g") .resource(GPU, 4, "disc") diff --git a/python/pyspark/resource/requests.py b/python/pyspark/resource/requests.py index 56ad6e8..6149108 100644 --- a/python/pyspark/resource/requests.py +++ b/python/pyspark/resource/requests.py @@ -91,6 +91,7 @@ class ExecutorResourceRequests(object): _MEMORY = "memory" _OVERHEAD_MEM = "memoryOverhead" _PYSPARK_MEM = "pyspark.memory" + _OFFHEAP_MEM = "offHeap" def __init__(self, _jvm=None, _requests=None): from pyspark import SparkContext @@ -139,6 +140,14 @@ class ExecutorResourceRequests(object): ExecutorResourceRequest(self._PYSPARK_MEM, _parse_memory(amount)) return self + def offheapMemory(self, amount): + if self._java_executor_resource_requests is not None: + self._java_executor_resource_requests.offHeapMemory(amount) + else: + self._executor_resources[self._OFFHEAP_MEM] = \ + ExecutorResourceRequest(self._OFFHEAP_MEM, _parse_memory(amount)) + return self + def cores(self, amount): if self._java_executor_resource_requests is not None: self._java_executor_resource_requests.cores(amount) diff --git a/python/pyspark/resource/tests/test_resources.py b/python/pyspark/resource/tests/test_resources.py index 9eb5a35..09c0d3c 100644 --- a/python/pyspark/resource/tests/test_resources.py +++ b/python/pyspark/resource/tests/test_resources.py @@ -25,15 +25,16 @@ class ResourceProfileTests(unittest.TestCase): def test_profile_before_sc(self): rpb = ResourceProfileBuilder() ereqs = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g") - ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com") + ereqs.pysparkMemory("2g").offheapMemory("3g").resource("gpu", 2, "testGpus", "nvidia.com") treqs = TaskResourceRequests().cpus(2).resource("gpu", 2) def assert_request_contents(exec_reqs, task_reqs): - self.assertEqual(len(exec_reqs), 5) + self.assertEqual(len(exec_reqs), 6) self.assertEqual(exec_reqs["cores"].amount, 2) self.assertEqual(exec_reqs["memory"].amount, 6144) self.assertEqual(exec_reqs["memoryOverhead"].amount, 1024) self.assertEqual(exec_reqs["pyspark.memory"].amount, 2048) + self.assertEqual(exec_reqs["offHeap"].amount, 3072) self.assertEqual(exec_reqs["gpu"].amount, 2) self.assertEqual(exec_reqs["gpu"].discoveryScript, "testGpus") self.assertEqual(exec_reqs["gpu"].resourceName, "gpu") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index dc09323..adbbbc0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -308,7 +308,6 @@ private[yarn] class YarnAllocator( if (!rpIdToYarnResource.contains(rp.id)) { // Start with the application or default settings var heapMem = executorMemory.toLong - // Note we currently don't support off heap memory in ResourceProfile - SPARK-30794 var offHeapMem = executorOffHeapMemory.toLong var overheadMem = memoryOverhead.toLong var pysparkMem = pysparkWorkerMemory.toLong @@ -326,6 +325,8 @@ private[yarn] class YarnAllocator( overheadMem = execReq.amount case ResourceProfile.PYSPARK_MEM => pysparkMem = execReq.amount + case ResourceProfile.OFFHEAP_MEM => + offHeapMem = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf, execReq) case ResourceProfile.CORES => cores = execReq.amount.toInt case "gpu" => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 9d6b776..fe8990b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils +import org.apache.spark.resource.ExecutorResourceRequest import org.apache.spark.util.Utils object YarnSparkHadoopUtil { @@ -187,11 +188,27 @@ object YarnSparkHadoopUtil { * Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false. */ def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = { + val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString) + checkOffHeapEnabled(sparkConf, sizeInMB).toInt + } + + /** + * Get offHeap memory size from [[ExecutorResourceRequest]] + * return 0 if MEMORY_OFFHEAP_ENABLED is false. + */ + def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf, + execRequest: ExecutorResourceRequest): Long = { + checkOffHeapEnabled(sparkConf, execRequest.amount) + } + + /** + * return 0 if MEMORY_OFFHEAP_ENABLED is false. + */ + def checkOffHeapEnabled(sparkConf: SparkConf, offHeapSize: Long): Long = { if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) { - val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString) - require(sizeInMB > 0, + require(offHeapSize > 0, s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true") - sizeInMB + offHeapSize } else { 0 } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org