This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d30284b [SPARK-27760][CORE] Spark resources - change user resource config from .count to .amount d30284b is described below commit d30284b5a51dd784f663eb4eea37087b35a54d00 Author: Thomas Graves <tgra...@nvidia.com> AuthorDate: Thu Jun 6 14:16:05 2019 -0500 [SPARK-27760][CORE] Spark resources - change user resource config from .count to .amount ## What changes were proposed in this pull request? Change the resource config spark.{executor/driver}.resource.{resourceName}.count to .amount to allow future usage of containing both a count and a unit. Right now we only support counts - # of gpus for instance, but in the future we may want to support units for things like memory - 25G. I think making the user only have to specify a single config .amount is better then making them specify 2 separate configs of a .count and then a .unit. Change it now since its a user facing config. Amount also matches how the spark on yarn configs are setup. ## How was this patch tested? Unit tests and manually verified on yarn and local cluster mode Closes #24810 from tgravescs/SPARK-27760-amount. Authored-by: Thomas Graves <tgra...@nvidia.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../main/scala/org/apache/spark/SparkConf.scala | 4 +-- .../main/scala/org/apache/spark/SparkContext.scala | 12 ++++---- .../main/scala/org/apache/spark/TestUtils.scala | 2 +- .../executor/CoarseGrainedExecutorBackend.scala | 2 +- .../org/apache/spark/internal/config/package.scala | 2 +- .../org/apache/spark/ResourceDiscovererSuite.scala | 2 +- .../scala/org/apache/spark/SparkConfSuite.scala | 8 ++--- .../scala/org/apache/spark/SparkContextSuite.scala | 24 +++++++-------- .../CoarseGrainedExecutorBackendSuite.scala | 26 ++++++++-------- .../CoarseGrainedSchedulerBackendSuite.scala | 2 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 4 +-- docs/configuration.md | 14 ++++----- .../apache/spark/deploy/k8s/KubernetesUtils.scala | 4 +-- .../k8s/features/BasicDriverFeatureStepSuite.scala | 2 +- .../features/BasicExecutorFeatureStepSuite.scala | 4 +-- .../spark/deploy/yarn/ResourceRequestHelper.scala | 8 ++--- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 36 ++++++++++++++++++---- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 4 +-- 19 files changed, 93 insertions(+), 69 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 227f4a5..e231a40 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -512,8 +512,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria */ private[spark] def getTaskResourceRequirements(): Map[String, Int] = { getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX) - .withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_SUFFIX)} - .map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_SUFFIX.length), v.toInt)}.toMap + .withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_AMOUNT_SUFFIX)} + .map { case (k, v) => (k.dropRight(SPARK_RESOURCE_AMOUNT_SUFFIX.length), v.toInt)}.toMap } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 66f8f41..c169842 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -391,7 +391,7 @@ class SparkContext(config: SparkConf) extends Logging { } // verify the resources we discovered are what the user requested val driverReqResourcesAndCounts = - SparkConf.getConfigsWithSuffix(allDriverResourceConfs, SPARK_RESOURCE_COUNT_SUFFIX).toMap + SparkConf.getConfigsWithSuffix(allDriverResourceConfs, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap ResourceDiscoverer.checkActualResourcesMeetRequirements(driverReqResourcesAndCounts, _resources) logInfo("===============================================================================") @@ -2725,7 +2725,7 @@ object SparkContext extends Logging { // executor and resources required by each task. val taskResourcesAndCount = sc.conf.getTaskResourceRequirements() val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix( - SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_COUNT_SUFFIX).toMap + SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap var numSlots = execCores / taskCores var limitingResourceName = "CPU" taskResourcesAndCount.foreach { case (rName, taskCount) => @@ -2733,17 +2733,17 @@ object SparkContext extends Logging { val execCount = executorResourcesAndCounts.getOrElse(rName, throw new SparkException( s"The executor resource config: " + - s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_SUFFIX} " + + s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} " + "needs to be specified since a task requirement config: " + - s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_SUFFIX} was specified") + s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} was specified") ) // Make sure the executor resources are large enough to launch at least one task. if (execCount.toLong < taskCount.toLong) { throw new SparkException( s"The executor resource config: " + - s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_SUFFIX} " + + s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} " + s"= $execCount has to be >= the task config: " + - s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_SUFFIX} = $taskCount") + s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} = $taskCount") } // Compare and update the max slots each executor can provide. val resourceNumSlots = execCount.toInt / taskCount diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index d306eed..5d88612 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -319,7 +319,7 @@ private[spark] object TestUtils { conf: SparkConf, resourceName: String, resourceCount: Int): SparkConf = { - val key = s"${SPARK_TASK_RESOURCE_PREFIX}${resourceName}${SPARK_RESOURCE_COUNT_SUFFIX}" + val key = s"${SPARK_TASK_RESOURCE_PREFIX}${resourceName}${SPARK_RESOURCE_AMOUNT_SUFFIX}" conf.set(key, resourceCount.toString) } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 13bf8a9..bfb7976 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -106,7 +106,7 @@ private[spark] class CoarseGrainedExecutorBackend( } val execReqResourcesAndCounts = env.conf.getAllWithPrefixAndSuffix(SPARK_EXECUTOR_RESOURCE_PREFIX, - SPARK_RESOURCE_COUNT_SUFFIX).toMap + SPARK_RESOURCE_AMOUNT_SUFFIX).toMap ResourceDiscoverer.checkActualResourcesMeetRequirements(execReqResourcesAndCounts, actualExecResources) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 90826bb..8d4910f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -35,7 +35,7 @@ package object config { private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = "spark.executor.resource." private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource." - private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count" + private[spark] val SPARK_RESOURCE_AMOUNT_SUFFIX = ".amount" private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = ".discoveryScript" private[spark] val SPARK_RESOURCE_VENDOR_SUFFIX = ".vendor" diff --git a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala index 400c9ca..2272573 100644 --- a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala +++ b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala @@ -224,7 +224,7 @@ class ResourceDiscovererSuite extends SparkFunSuite test("gpu's specified but not discovery script") { val sparkconf = new SparkConf sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_COUNT_SUFFIX, "2") + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") val error = intercept[SparkException] { ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX) diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 6978f30..c1265ce 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -450,16 +450,16 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("get task resource requirement from config") { val conf = new SparkConf() - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2") - conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "1") + conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") + conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") var taskResourceRequirement = conf.getTaskResourceRequirements() assert(taskResourceRequirement.size == 2) assert(taskResourceRequirement(GPU) == 2) assert(taskResourceRequirement(FPGA) == 1) - conf.remove(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX) + conf.remove(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX) // Ignore invalid prefix - conf.set("spark.invalid.prefix" + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "1") + conf.set("spark.invalid.prefix" + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") taskResourceRequirement = conf.getTaskResourceRequirements() assert(taskResourceRequirement.size == 1) assert(taskResourceRequirement.get(FPGA).isEmpty) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 40ec1b2..d1a36d4 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -747,7 +747,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu val conf = new SparkConf() .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_COUNT_SUFFIX, "1") + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath) .setMaster("local-cluster[1, 1, 1024]") @@ -784,7 +784,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu val conf = new SparkConf() .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_COUNT_SUFFIX, "1") + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath) .set(DRIVER_RESOURCES_FILE, resourcesFile) @@ -806,7 +806,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test("Test parsing resources task configs with missing executor config") { val conf = new SparkConf() .set(SPARK_TASK_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_COUNT_SUFFIX, "1") + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") @@ -814,17 +814,17 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc = new SparkContext(conf) }.getMessage() - assert(error.contains("The executor resource config: spark.executor.resource.gpu.count " + - "needs to be specified since a task requirement config: spark.task.resource.gpu.count " + + assert(error.contains("The executor resource config: spark.executor.resource.gpu.amount " + + "needs to be specified since a task requirement config: spark.task.resource.gpu.amount " + "was specified")) } test("Test parsing resources executor config < task requirements") { val conf = new SparkConf() .set(SPARK_TASK_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_COUNT_SUFFIX, "2") + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_COUNT_SUFFIX, "1") + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") @@ -833,14 +833,14 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu }.getMessage() assert(error.contains("The executor resource config: " + - "spark.executor.resource.gpu.count = 1 has to be >= the task config: " + - "spark.task.resource.gpu.count = 2")) + "spark.executor.resource.gpu.amount = 1 has to be >= the task config: " + + "spark.task.resource.gpu.amount = 2")) } test("Parse resources executor config not the same multiple numbers of the task requirements") { val conf = new SparkConf() - .set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2") - .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "4") + .set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") + .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "4") .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") @@ -873,7 +873,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu val discoveryScript = resourceFile.getPath() val conf = new SparkConf() - .set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_COUNT_SUFFIX}", "3") + .set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3") .set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX}", discoveryScript) .setMaster("local-cluster[3, 3, 1024]") diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 0b4ad0d..3c4d51f 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -57,7 +57,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("parsing no resources") { val conf = new SparkConf - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2") + conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) @@ -79,8 +79,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("parsing one resources") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2") - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2") + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") + conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function @@ -103,10 +103,10 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("parsing multiple resources") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3") - conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3") - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2") - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2") + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") + conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") + conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function @@ -136,8 +136,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("error checking parsing resources and executor and task configs") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2") - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2") + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") + conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function @@ -178,8 +178,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("executor resource found less than required") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "4") - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "1") + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "4") + conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function @@ -204,8 +204,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("use discoverer") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3") - conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3") + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") + conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") assume(!(Utils.isWindows)) withTempDir { dir => val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga") diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 6b3916b..70d368e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -187,7 +187,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val conf = new SparkConf() .set(EXECUTOR_CORES, 3) - .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "3") + .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test .setMaster( "coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]") diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 456996c..9a4c7a9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1251,9 +1251,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val executorCpus = 4 val taskScheduler = setupScheduler(numCores = executorCpus, config.CPUS_PER_TASK.key -> taskCpus.toString, - s"${config.SPARK_TASK_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_COUNT_SUFFIX}" -> + s"${config.SPARK_TASK_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}" -> taskGpus.toString, - s"${config.SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_COUNT_SUFFIX}" -> + s"${config.SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}" -> executorGpus.toString, config.EXECUTOR_CORES.key -> executorCpus.toString) val taskSet = FakeTask.createTaskSet(3) diff --git a/docs/configuration.md b/docs/configuration.md index 6632dea..7b7d6cc 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -194,10 +194,10 @@ of the most common options to set are: </td> </tr> <tr> - <td><code>spark.driver.resource.{resourceName}.count</code></td> + <td><code>spark.driver.resource.{resourceName}.amount</code></td> <td>0</td> <td> - The number of a particular resource type to use on the driver. + Amount of a particular resource type to use on the driver. If this is used, you must also specify the <code>spark.driver.resource.{resourceName}.discoveryScript</code> for the driver to find the resource on startup. @@ -264,10 +264,10 @@ of the most common options to set are: </td> </tr> <tr> - <td><code>spark.executor.resource.{resourceName}.count</code></td> + <td><code>spark.executor.resource.{resourceName}.amount</code></td> <td>0</td> <td> - The number of a particular resource type to use per executor process. + Amount of a particular resource type to use per executor process. If this is used, you must also specify the <code>spark.executor.resource.{resourceName}.discoveryScript</code> for the executor to find the resource on startup. @@ -1888,11 +1888,11 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td><code>spark.task.resource.{resourceName}.count</code></td> + <td><code>spark.task.resource.{resourceName}.amount</code></td> <td>1</td> <td> - Number of a particular resource type to allocate for each task. If this is specified - you must also provide the executor config <code>spark.executor.resource.{resourceName}.count</code> + Amount of a particular resource type to allocate for each task. If this is specified + you must also provide the executor config <code>spark.executor.resource.{resourceName}.amount</code> and any corresponding discovery configs so that your executors are created with that resource type. </td> </tr> diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 522c8f7..25b997f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -32,7 +32,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{SPARK_RESOURCE_COUNT_SUFFIX, SPARK_RESOURCE_VENDOR_SUFFIX} +import org.apache.spark.internal.config.{SPARK_RESOURCE_AMOUNT_SUFFIX, SPARK_RESOURCE_VENDOR_SUFFIX} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.{Clock, SystemClock, Utils} import org.apache.spark.util.Utils.getHadoopFileSystem @@ -228,7 +228,7 @@ private[spark] object KubernetesUtils extends Logging { sparkConf: SparkConf): Map[String, Quantity] = { val allResources = sparkConf.getAllWithPrefix(componentName) val vendors = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_VENDOR_SUFFIX).toMap - val amounts = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_COUNT_SUFFIX).toMap + val amounts = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap val uniqueResources = SparkConf.getBaseOfConfigs(allResources) uniqueResources.map { rName => diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index f60c6fb..df326a2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -56,7 +56,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS) resources.foreach { case (_, testRInfo) => sparkConf.set( - s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}", + s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", testRInfo.count) sparkConf.set( s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}", diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 3e892a9..eb5532e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -97,7 +97,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { // test missing vendor gpuResources.foreach { case (_, testRInfo) => baseConf.set( - s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}", + s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", testRInfo.count) } val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) @@ -127,7 +127,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { ("foo.com/fpga" -> TestResourceInformation("fpga", "f1", "foo.com"))) gpuResources.foreach { case (_, testRInfo) => baseConf.set( - s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}", + s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", testRInfo.count) baseConf.set( s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}", diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index 66e4781..7480dd8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -68,13 +68,13 @@ private object ResourceRequestHelper extends Logging { (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"), (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"), (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"), - (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}", + (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}", s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"), - (s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}", + (s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}", s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"), - (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu{SPARK_RESOURCE_COUNT_SUFFIX}", + (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}", s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"), - (s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}", + (s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}", s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}")) val errorMessage = new mutable.StringBuilder() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 7c8ee03..0f11820 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -55,7 +55,7 @@ object YarnSparkHadoopUtil { ): Map[String, String] = { Map("gpu" -> YARN_GPU_RESOURCE_CONFIG, "fpga" -> YARN_FPGA_RESOURCE_CONFIG).map { case (rName, yarnName) => - val resourceCountSparkConf = s"${confPrefix}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}" + val resourceCountSparkConf = s"${confPrefix}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}" (yarnName -> sparkConf.get(resourceCountSparkConf, "0")) }.filter { case (_, count) => count.toLong > 0 } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 2c2c237..884e0f5 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -390,7 +390,7 @@ class ClientSuite extends SparkFunSuite with Matchers { } } - test(s"custom resource request yarn config and spark config fails") { + test("custom driver resource request yarn config and spark config fails") { assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", YARN_FPGA_RESOURCE_CONFIG -> "fpga") ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) @@ -400,7 +400,7 @@ class ClientSuite extends SparkFunSuite with Matchers { conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2") } resources.values.foreach { rName => - conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3") + conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3") } val error = intercept[SparkException] { @@ -408,12 +408,36 @@ class ClientSuite extends SparkFunSuite with Matchers { }.getMessage() assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga," + - " please use spark.driver.resource.fpga")) + " please use spark.driver.resource.fpga.amount")) assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," + - " please use spark.driver.resource.gpu")) + " please use spark.driver.resource.gpu.amount")) } - test(s"custom resources spark config mapped to yarn config") { + test("custom executor resource request yarn config and spark config fails") { + assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", YARN_FPGA_RESOURCE_CONFIG -> "fpga") + ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) + + val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") + resources.keys.foreach { yarnName => + conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}", "2") + } + resources.values.foreach { rName => + conf.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3") + } + + val error = intercept[SparkException] { + ResourceRequestHelper.validateResources(conf) + }.getMessage() + + assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga," + + " please use spark.executor.resource.fpga.amount")) + assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu," + + " please use spark.executor.resource.gpu.amount")) + } + + + test("custom resources spark config mapped to yarn config") { assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) val yarnMadeupResource = "yarn.io/madeup" val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", @@ -423,7 +447,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") resources.values.foreach { rName => - conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3") + conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3") } // also just set yarn one that we don't convert conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index b16464c..0040369 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -189,8 +189,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) val sparkResources = - Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}" -> "3", - s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}" -> "2", + Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> "3", + s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> "2", s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5") val handler = createAllocator(1, mockAmClient, sparkResources) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org