This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7056e00 [SPARK-27823][CORE] Refactor resource handling code 7056e00 is described below commit 7056e004ee566fabbb9b22ddee2de55ef03260db Author: Xiangrui Meng <m...@databricks.com> AuthorDate: Tue Jun 18 17:18:17 2019 -0700 [SPARK-27823][CORE] Refactor resource handling code ## What changes were proposed in this pull request? Continue the work from https://github.com/apache/spark/pull/24821. Refactor resource handling code to make the code more readable. Major changes: * Moved resource-related classes to `spark.resource` from `spark`. * Added ResourceUtils and helper classes so we don't need to directly deal with Spark conf. * ResourceID: resource identifier and it provides conf keys * ResourceRequest/Allocation: abstraction for requested and allocated resources * Added `TestResourceIDs` to reference commonly used resource IDs in tests like `spark.executor.resource.gpu`. cc: tgravescs jiangxb1987 Ngone51 ## How was this patch tested? Unit tests for added utils and existing unit tests. Closes #24856 from mengxr/SPARK-27823. Lead-authored-by: Xiangrui Meng <m...@databricks.com> Co-authored-by: Thomas Graves <tgra...@nvidia.com> Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com> --- .../org/apache/spark/BarrierTaskContext.scala | 1 + .../org/apache/spark/ResourceDiscoverer.scala | 151 ------------ .../org/apache/spark/ResourceInformation.scala | 37 --- .../main/scala/org/apache/spark/SparkConf.scala | 45 ---- .../main/scala/org/apache/spark/SparkContext.scala | 94 +++----- .../main/scala/org/apache/spark/TaskContext.scala | 4 +- .../scala/org/apache/spark/TaskContextImpl.scala | 1 + .../main/scala/org/apache/spark/TestUtils.scala | 30 ++- .../executor/CoarseGrainedExecutorBackend.scala | 52 ++--- .../org/apache/spark/internal/config/package.scala | 10 +- .../spark/resource/ResourceInformation.scala | 87 +++++++ .../org/apache/spark/resource/ResourceUtils.scala | 191 +++++++++++++++ .../scala/org/apache/spark/scheduler/Task.scala | 1 + .../apache/spark/scheduler/TaskDescription.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 10 +- .../apache/spark/scheduler/TaskSetManager.scala | 11 +- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../org/apache/spark/ResourceDiscovererSuite.scala | 236 ------------------- .../scala/org/apache/spark/SparkConfSuite.scala | 53 +---- .../scala/org/apache/spark/SparkContextSuite.scala | 93 +++----- .../CoarseGrainedExecutorBackendSuite.scala | 159 +++++++------ .../org/apache/spark/executor/ExecutorSuite.scala | 1 + .../spark/resource/ResourceInformationSuite.scala | 64 +++++ .../apache/spark/resource/ResourceUtilsSuite.scala | 259 +++++++++++++++++++++ .../TestResourceIDs.scala} | 17 +- .../CoarseGrainedSchedulerBackendSuite.scala | 8 +- .../scheduler/ExecutorResourceInfoSuite.scala | 2 +- .../spark/scheduler/TaskDescriptionSuite.scala | 4 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 10 +- .../spark/scheduler/TaskSetManagerSuite.scala | 5 +- .../apache/spark/deploy/k8s/KubernetesUtils.scala | 20 +- .../k8s/features/BasicDriverFeatureStep.scala | 2 +- .../k8s/features/BasicExecutorFeatureStep.scala | 2 +- .../k8s/features/BasicDriverFeatureStepSuite.scala | 14 +- .../features/BasicExecutorFeatureStepSuite.scala | 37 ++- .../k8s/features/KubernetesFeaturesTestUtils.scala | 3 +- .../MesosFineGrainedSchedulerBackendSuite.scala | 3 +- .../org/apache/spark/deploy/yarn/Client.scala | 2 +- .../spark/deploy/yarn/ResourceRequestHelper.scala | 9 +- .../apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 8 +- .../YarnCoarseGrainedExecutorBackend.scala | 3 +- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 8 +- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 6 +- 44 files changed, 908 insertions(+), 851 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index cf957ff..c393df8 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -26,6 +26,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source +import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.{RpcEndpointRef, RpcTimeout} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util._ diff --git a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala deleted file mode 100644 index e5ae202..0000000 --- a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.io.{BufferedInputStream, File, FileInputStream} - -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.databind.exc.MismatchedInputException -import org.json4s.{DefaultFormats, MappingException} -import org.json4s.jackson.JsonMethods._ - -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ -import org.apache.spark.util.Utils.executeAndGetOutput - -/** - * Discovers information about resources (GPUs/FPGAs/etc). It currently only supports - * resources that have addresses. - * This class finds resources by running and parsing the output of the user specified script - * from the config spark.{driver/executor}.resource.{resourceName}.discoveryScript. - * The output of the script it runs is expected to be JSON in the format of the - * ResourceInformation class. - * - * For example: {"name": "gpu", "addresses": ["0","1"]} - */ -private[spark] object ResourceDiscoverer extends Logging { - - private implicit val formats = DefaultFormats - - /** - * This function will discover information about a set of resources by using the - * user specified script (spark.{driver/executor}.resource.{resourceName}.discoveryScript). - * It optionally takes a set of resource names or if that isn't specified - * it uses the config prefix passed in to look at the executor or driver configs - * to get the resource names. Then for each resource it will run the discovery script - * and get the ResourceInformation about it. - * - * @param sparkConf SparkConf - * @param confPrefix Driver or Executor resource prefix - * @param resourceNamesOpt Optionally specify resource names. If not set uses the resource - * configs based on confPrefix passed in to get the resource names. - * @return Map of resource name to ResourceInformation - */ - def discoverResourcesInformation( - sparkConf: SparkConf, - confPrefix: String, - resourceNamesOpt: Option[Set[String]] = None - ): Map[String, ResourceInformation] = { - val resourceNames = resourceNamesOpt.getOrElse( - // get unique resource names by grabbing first part config with multiple periods, - // ie resourceName.count, grab resourceName part - SparkConf.getBaseOfConfigs(sparkConf.getAllWithPrefix(confPrefix)) - ) - resourceNames.map { rName => { - val rInfo = getResourceInfo(sparkConf, confPrefix, rName) - (rName -> rInfo) - }}.toMap - } - - private def getResourceInfo( - sparkConf: SparkConf, - confPrefix: String, - resourceName: String): ResourceInformation = { - val discoveryConf = confPrefix + resourceName + SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX - val script = sparkConf.getOption(discoveryConf) - val result = if (script.nonEmpty) { - val scriptFile = new File(script.get) - // check that script exists and try to execute - if (scriptFile.exists()) { - try { - val output = executeAndGetOutput(Seq(script.get), new File(".")) - val parsedJson = parse(output) - val name = (parsedJson \ "name").extract[String] - val addresses = (parsedJson \ "addresses").extract[Array[String]] - if (name != resourceName) { - throw new SparkException(s"Discovery script: ${script.get} specified via " + - s"$discoveryConf returned a resource name: $name that doesn't match the " + - s"config name: $resourceName") - } - new ResourceInformation(name, addresses) - } catch { - case e @ (_: SparkException | _: MappingException | _: JsonParseException) => - throw new SparkException(s"Error running the resource discovery script: $scriptFile" + - s" for $resourceName", e) - } - } else { - throw new SparkException(s"Resource script: $scriptFile to discover $resourceName" + - s" doesn't exist!") - } - } else { - throw new SparkException(s"User is expecting to use $resourceName resources but " + - s"didn't specify a script via conf: $discoveryConf, to find them!") - } - result - } - - /** - * Make sure the actual resources we have on startup are at least the number the user - * requested. Note that there is other code in SparkConf that makes sure we have executor configs - * for each task resource requirement and that they are large enough. This function - * is used by both driver and executors. - * - * @param requiredResources The resources that are required for us to run. - * @param actualResources The actual resources discovered. - */ - def checkActualResourcesMeetRequirements( - requiredResources: Map[String, String], - actualResources: Map[String, ResourceInformation]): Unit = { - requiredResources.foreach { case (rName, reqCount) => - val actualRInfo = actualResources.get(rName).getOrElse( - throw new SparkException(s"Resource: $rName required but wasn't discovered on startup")) - - if (actualRInfo.addresses.size < reqCount.toLong) { - throw new SparkException(s"Resource: $rName, with addresses: " + - s"${actualRInfo.addresses.mkString(",")} " + - s"is less than what the user requested: $reqCount)") - } - } - } - - def parseAllocatedFromJsonFile(resourcesFile: String): Map[String, ResourceInformation] = { - implicit val formats = DefaultFormats - // case class to make json4s parsing easy - case class JsonResourceInformation(val name: String, val addresses: Array[String]) - val resourceInput = new BufferedInputStream(new FileInputStream(resourcesFile)) - val resources = try { - parse(resourceInput).extract[Seq[JsonResourceInformation]] - } catch { - case e@(_: MappingException | _: MismatchedInputException | _: ClassCastException) => - throw new SparkException(s"Exception parsing the resources in $resourcesFile", e) - } finally { - resourceInput.close() - } - resources.map(r => (r.name, new ResourceInformation(r.name, r.addresses))).toMap - } -} diff --git a/core/src/main/scala/org/apache/spark/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/ResourceInformation.scala deleted file mode 100644 index 6a5b725..0000000 --- a/core/src/main/scala/org/apache/spark/ResourceInformation.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import org.apache.spark.annotation.Evolving - -/** - * Class to hold information about a type of Resource. A resource could be a GPU, FPGA, etc. - * The array of addresses are resource specific and its up to the user to interpret the address. - * - * One example is GPUs, where the addresses would be the indices of the GPUs - * - * @param name the name of the resource - * @param addresses an array of strings describing the addresses of the resource - */ -@Evolving -class ResourceInformation( - val name: String, - val addresses: Array[String]) extends Serializable { - - override def toString: String = s"[name: ${name}, addresses: ${addresses.mkString(",")}]" -} diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index e231a40..24be54e 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -416,14 +416,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } /** - * Get all parameters that start with `prefix` and end with 'suffix' - */ - def getAllWithPrefixAndSuffix(prefix: String, suffix: String): Array[(String, String)] = { - getAll.filter { case (k, v) => k.startsWith(prefix) && k.endsWith(suffix) } - .map { case (k, v) => (k.substring(prefix.length, (k.length - suffix.length)), v) } - } - - /** * Get a parameter as an integer, falling back to a default if not set * @throws NumberFormatException If the value cannot be interpreted as an integer */ @@ -507,14 +499,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } } - /** - * Get task resource requirements. - */ - private[spark] def getTaskResourceRequirements(): Map[String, Int] = { - getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX) - .withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_AMOUNT_SUFFIX)} - .map { case (k, v) => (k.dropRight(SPARK_RESOURCE_AMOUNT_SUFFIX.length), v.toInt)}.toMap - } /** * Checks for illegal or deprecated config settings. Throws an exception for the former. Not @@ -805,35 +789,6 @@ private[spark] object SparkConf extends Logging { } /** - * A function to help parsing configs with multiple parts where the base and - * suffix could be one of many options. For instance configs like: - * spark.executor.resource.{resourceName}.{count/addresses} - * This function takes an Array of configs you got from the - * getAllWithPrefix function, selects only those that end with the suffix - * passed in and returns just the base part of the config before the first - * '.' and its value. - */ - def getConfigsWithSuffix( - configs: Array[(String, String)], - suffix: String - ): Array[(String, String)] = { - configs.filter { case (rConf, _) => rConf.endsWith(suffix)}. - map { case (k, v) => (k.split('.').head, v) } - } - - /** - * A function to help parsing configs with multiple parts where the base and - * suffix could be one of many options. For instance configs like: - * spark.executor.resource.{resourceName}.{count/addresses} - * This function takes an Array of configs you got from the - * getAllWithPrefix function and returns the base part of the config - * before the first '.'. - */ - def getBaseOfConfigs(configs: Array[(String, String)]): Set[String] = { - configs.map { case (k, _) => k.split('.').head }.toSet - } - - /** * Holds information about keys that have been deprecated and do not have a replacement. * * @param key The deprecated key. diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4fc445b..a0d7aa7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -51,6 +51,8 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.metrics.source.JVMCPUSource import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ +import org.apache.spark.resource.{ResourceID, ResourceInformation} +import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend @@ -363,43 +365,6 @@ class SparkContext(config: SparkConf) extends Logging { Utils.setLogLevel(org.apache.log4j.Level.toLevel(upperCased)) } - /** - * Checks to see if any resources (GPU/FPGA/etc) are available to the driver by looking - * at and processing the spark.driver.resourcesFile and - * spark.driver.resource.resourceName.discoveryScript configs. The configs have to be - * present when the driver starts, setting them after startup does not work. - * - * If a resources file was specified then assume all resources will be specified - * in that file. Otherwise use the discovery scripts to find the resources. Users should - * not be setting the resources file config directly and should not be mixing methods - * for different types of resources since the resources file config is meant for Standalone mode - * and other cluster managers should use the discovery scripts. - */ - private def setupDriverResources(): Unit = { - // Only call getAllWithPrefix once and filter on those since there could be a lot of spark - // configs. - val allDriverResourceConfs = _conf.getAllWithPrefix(SPARK_DRIVER_RESOURCE_PREFIX) - val resourcesFile = _conf.get(DRIVER_RESOURCES_FILE) - _resources = resourcesFile.map { rFile => { - ResourceDiscoverer.parseAllocatedFromJsonFile(rFile) - }}.getOrElse { - // we already have the resources confs here so just pass in the unique resource names - // rather then having the resource discoverer reparse all the configs. - val uniqueResources = SparkConf.getBaseOfConfigs(allDriverResourceConfs) - ResourceDiscoverer.discoverResourcesInformation(_conf, SPARK_DRIVER_RESOURCE_PREFIX, - Some(uniqueResources)) - } - // verify the resources we discovered are what the user requested - val driverReqResourcesAndCounts = - SparkConf.getConfigsWithSuffix(allDriverResourceConfs, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap - ResourceDiscoverer.checkActualResourcesMeetRequirements(driverReqResourcesAndCounts, _resources) - - logInfo("===============================================================================") - logInfo(s"Driver Resources:") - _resources.foreach { case (k, v) => logInfo(s"$k -> $v") } - logInfo("===============================================================================") - } - try { _conf = config.clone() _conf.validateSettings() @@ -413,7 +378,8 @@ class SparkContext(config: SparkConf) extends Logging { _driverLogger = DriverLogger(_conf) - setupDriverResources() + val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE) + _resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt) // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") @@ -2732,44 +2698,46 @@ object SparkContext extends Logging { // Calculate the max slots each executor can provide based on resources available on each // executor and resources required by each task. - val taskResourcesAndCount = sc.conf.getTaskResourceRequirements() - val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix( - SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap + val taskResourceRequirements = parseTaskResourceRequirements(sc.conf) + val executorResourcesAndAmounts = + parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX) + .map(request => (request.id.resourceName, request.amount)).toMap var numSlots = execCores / taskCores var limitingResourceName = "CPU" - taskResourcesAndCount.foreach { case (rName, taskCount) => + + taskResourceRequirements.foreach { taskReq => // Make sure the executor resources were specified through config. - val execCount = executorResourcesAndCounts.getOrElse(rName, - throw new SparkException( - s"The executor resource config: " + - s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} " + - "needs to be specified since a task requirement config: " + - s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} was specified") + val execAmount = executorResourcesAndAmounts.getOrElse(taskReq.resourceName, + throw new SparkException("The executor resource config: " + + ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf + + " needs to be specified since a task requirement config: " + + ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf + + " was specified") ) // Make sure the executor resources are large enough to launch at least one task. - if (execCount.toLong < taskCount.toLong) { - throw new SparkException( - s"The executor resource config: " + - s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} " + - s"= $execCount has to be >= the task config: " + - s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} = $taskCount") + if (execAmount < taskReq.amount) { + throw new SparkException("The executor resource config: " + + ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf + + s" = $execAmount has to be >= the requested amount in task resource config: " + + ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf + + s" = ${taskReq.amount}") } // Compare and update the max slots each executor can provide. - val resourceNumSlots = execCount.toInt / taskCount + val resourceNumSlots = execAmount / taskReq.amount if (resourceNumSlots < numSlots) { numSlots = resourceNumSlots - limitingResourceName = rName + limitingResourceName = taskReq.resourceName } } // There have been checks above to make sure the executor resources were specified and are // large enough if any task resources were specified. - taskResourcesAndCount.foreach { case (rName, taskCount) => - val execCount = executorResourcesAndCounts(rName) - if (taskCount.toInt * numSlots < execCount.toInt) { - val message = s"The configuration of resource: $rName (exec = ${execCount.toInt}, " + - s"task = ${taskCount}) will result in wasted resources due to resource " + - s"${limitingResourceName} limiting the number of runnable tasks per executor to: " + - s"${numSlots}. Please adjust your configuration." + taskResourceRequirements.foreach { taskReq => + val execAmount = executorResourcesAndAmounts(taskReq.resourceName) + if (taskReq.amount * numSlots < execAmount) { + val message = s"The configuration of resource: ${taskReq.resourceName} " + + s"(exec = ${execAmount}, task = ${taskReq.amount}) will result in wasted " + + s"resources due to resource ${limitingResourceName} limiting the number of " + + s"runnable tasks per executor to: ${numSlots}. Please adjust your configuration." if (Utils.isTesting) { throw new SparkException(message) } else { diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 803167e..2299c54 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -24,6 +24,7 @@ import org.apache.spark.annotation.{DeveloperApi, Evolving} import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source +import org.apache.spark.resource.ResourceInformation import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, TaskFailureListener} @@ -178,7 +179,8 @@ abstract class TaskContext extends Serializable { /** * Resources allocated to the task. The key is the resource name and the value is information - * about the resource. Please refer to [[ResourceInformation]] for specifics. + * about the resource. Please refer to [[org.apache.spark.resource.ResourceInformation]] for + * specifics. */ @Evolving def resources(): Map[String, ResourceInformation] diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 8e40b7f..516fb95 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source +import org.apache.spark.resource.ResourceInformation import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util._ diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 5d88612..41ae3ae 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -20,9 +20,11 @@ package org.apache.spark import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream} import java.net.{HttpURLConnection, URI, URL} import java.nio.charset.StandardCharsets +import java.nio.file.{Files => JavaFiles} +import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE} import java.security.SecureRandom import java.security.cert.X509Certificate -import java.util.{Arrays, Properties} +import java.util.{Arrays, EnumSet, Properties} import java.util.concurrent.{TimeoutException, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream} import javax.net.ssl._ @@ -36,6 +38,8 @@ import scala.util.Try import com.google.common.io.{ByteStreams, Files} import org.apache.log4j.PropertyConfigurator +import org.json4s.JsonAST.JValue +import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config._ @@ -312,15 +316,21 @@ private[spark] object TestUtils { current ++ current.filter(_.isDirectory).flatMap(recursiveList) } - /** - * Set task resource requirement. - */ - def setTaskResourceRequirement( - conf: SparkConf, - resourceName: String, - resourceCount: Int): SparkConf = { - val key = s"${SPARK_TASK_RESOURCE_PREFIX}${resourceName}${SPARK_RESOURCE_AMOUNT_SUFFIX}" - conf.set(key, resourceCount.toString) + /** Creates a temp JSON file that contains the input JSON record. */ + def createTempJsonFile(dir: File, prefix: String, jsonValue: JValue): String = { + val file = File.createTempFile(prefix, ".json", dir) + JavaFiles.write(file.toPath, compact(render(jsonValue)).getBytes()) + file.getPath + } + + /** Creates a temp bash script that prints the given output. */ + def createTempScriptWithExpectedOutput(dir: File, prefix: String, output: String): String = { + val file = File.createTempFile(prefix, ".sh", dir) + val script = s"cat <<EOF\n$output\nEOF\n" + Files.write(script, file, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(file.toPath, + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + file.getPath } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9b5f9f5..2f4fc0e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -34,6 +34,8 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -48,7 +50,7 @@ private[spark] class CoarseGrainedExecutorBackend( cores: Int, userClassPath: Seq[URL], env: SparkEnv, - resourcesFile: Option[String]) + resourcesFileOpt: Option[String]) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { private implicit val formats = DefaultFormats @@ -70,7 +72,7 @@ private[spark] class CoarseGrainedExecutorBackend( override def onStart() { logInfo("Connecting to driver: " + driverUrl) - val resources = parseOrFindResources(resourcesFile) + val resources = parseOrFindResources(resourcesFileOpt) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) @@ -86,36 +88,19 @@ private[spark] class CoarseGrainedExecutorBackend( } // visible for testing - def parseOrFindResources(resourcesFile: Option[String]): Map[String, ResourceInformation] = { + def parseOrFindResources(resourcesFileOpt: Option[String]): Map[String, ResourceInformation] = { // only parse the resources if a task requires them - val resourceInfo = if (env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX).nonEmpty) { - val actualExecResources = resourcesFile.map { rFile => { - ResourceDiscoverer.parseAllocatedFromJsonFile(rFile) - }}.getOrElse { - ResourceDiscoverer.discoverResourcesInformation(env.conf, SPARK_EXECUTOR_RESOURCE_PREFIX) - } - - if (actualExecResources.isEmpty) { + val resourceInfo = if (parseTaskResourceRequirements(env.conf).nonEmpty) { + val resources = getOrDiscoverAllResources(env.conf, SPARK_EXECUTOR_PREFIX, resourcesFileOpt) + if (resources.isEmpty) { throw new SparkException("User specified resources per task via: " + - s"$SPARK_TASK_RESOURCE_PREFIX, but can't find any resources available on the executor.") + s"$SPARK_TASK_PREFIX, but can't find any resources available on the executor.") } - val execReqResourcesAndCounts = - env.conf.getAllWithPrefixAndSuffix(SPARK_EXECUTOR_RESOURCE_PREFIX, - SPARK_RESOURCE_AMOUNT_SUFFIX).toMap - - ResourceDiscoverer.checkActualResourcesMeetRequirements(execReqResourcesAndCounts, - actualExecResources) - - logInfo("===============================================================================") - logInfo(s"Executor $executorId Resources:") - actualExecResources.foreach { case (k, v) => logInfo(s"$k -> $v") } - logInfo("===============================================================================") - - actualExecResources + resources } else { - if (resourcesFile.nonEmpty) { - logWarning(s"A resources file was specified but the application is not configured " + - s"to use any resources, see the configs with prefix: ${SPARK_TASK_RESOURCE_PREFIX}") + if (resourcesFileOpt.nonEmpty) { + logWarning("A resources file was specified but the application is not configured " + + s"to use any resources, see the configs with prefix: ${SPARK_TASK_PREFIX}") } Map.empty[String, ResourceInformation] } @@ -245,13 +230,14 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { appId: String, workerUrl: Option[String], userClassPath: mutable.ListBuffer[URL], - resourcesFile: Option[String]) + resourcesFileOpt: Option[String]) def main(args: Array[String]): Unit = { val createFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFile) + arguments.hostname, arguments.cores, arguments.userClassPath, env, + arguments.resourcesFileOpt) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) System.exit(0) @@ -313,7 +299,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var executorId: String = null var hostname: String = null var cores: Int = 0 - var resourcesFile: Option[String] = None + var resourcesFileOpt: Option[String] = None var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() @@ -334,7 +320,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cores = value.toInt argv = tail case ("--resourcesFile") :: value :: tail => - resourcesFile = Some(value) + resourcesFileOpt = Some(value) argv = tail case ("--app-id") :: value :: tail => appId = value @@ -361,7 +347,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl, - userClassPath, resourcesFile) + userClassPath, resourcesFileOpt) } private def printUsageAndExit(classNameForEntry: String): Unit = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4b0c607..c2dab68 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -31,13 +31,9 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_ package object config { - private[spark] val SPARK_DRIVER_RESOURCE_PREFIX = "spark.driver.resource." - private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = "spark.executor.resource." - private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource." - - private[spark] val SPARK_RESOURCE_AMOUNT_SUFFIX = ".amount" - private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = ".discoveryScript" - private[spark] val SPARK_RESOURCE_VENDOR_SUFFIX = ".vendor" + private[spark] val SPARK_DRIVER_PREFIX = "spark.driver" + private[spark] val SPARK_EXECUTOR_PREFIX = "spark.executor" + private[spark] val SPARK_TASK_PREFIX = "spark.task" private[spark] val DRIVER_RESOURCES_FILE = ConfigBuilder("spark.driver.resourcesFile") diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala new file mode 100644 index 0000000..96aef74 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import scala.util.control.NonFatal + +import org.json4s.{DefaultFormats, Extraction, JValue} +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Evolving + +/** + * Class to hold information about a type of Resource. A resource could be a GPU, FPGA, etc. + * The array of addresses are resource specific and its up to the user to interpret the address. + * + * One example is GPUs, where the addresses would be the indices of the GPUs + * + * @param name the name of the resource + * @param addresses an array of strings describing the addresses of the resource + */ +@Evolving +class ResourceInformation( + val name: String, + val addresses: Array[String]) extends Serializable { + + override def toString: String = s"[name: ${name}, addresses: ${addresses.mkString(",")}]" + + override def equals(obj: Any): Boolean = { + obj match { + case that: ResourceInformation => + that.getClass == this.getClass && + that.name == name && that.addresses.toSeq == addresses.toSeq + case _ => + false + } + } + + override def hashCode(): Int = Seq(name, addresses.toSeq).hashCode() +} + +private[spark] object ResourceInformation { + + private lazy val exampleJson: String = compact(render( + ResourceInformationJson("gpu", Seq("0", "1")).toJValue)) + + /** + * Parses a JSON string into a [[ResourceInformation]] instance. + */ + def parseJson(json: String): ResourceInformation = { + implicit val formats = DefaultFormats + try { + parse(json).extract[ResourceInformationJson].toResourceInformation + } catch { + case NonFatal(e) => + throw new SparkException(s"Error parsing JSON into ResourceInformation:\n$json\n" + + s"Here is a correct example: $exampleJson.", e) + } + } +} + +/** A case class to simplify JSON serialization of [[ResourceInformation]]. */ +private case class ResourceInformationJson(name: String, addresses: Seq[String]) { + + def toJValue: JValue = { + Extraction.decompose(this)(DefaultFormats) + } + + def toResourceInformation: ResourceInformation = { + new ResourceInformation(name, addresses.toArray) + } +} diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala new file mode 100644 index 0000000..6926586 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import java.io.File +import java.nio.file.{Files, Paths} + +import scala.util.control.NonFatal + +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils.executeAndGetOutput + +/** + * Resource identifier. + * @param componentName spark.driver / spark.executor / spark.task + * @param resourceName gpu, fpga, etc + */ +private[spark] case class ResourceID(componentName: String, resourceName: String) { + def confPrefix: String = s"$componentName.resource.$resourceName." // with ending dot + def amountConf: String = s"$confPrefix${ResourceUtils.AMOUNT}" + def discoveryScriptConf: String = s"$confPrefix${ResourceUtils.DISCOVERY_SCRIPT}" + def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}" +} + +private[spark] case class ResourceRequest( + id: ResourceID, + amount: Int, + discoveryScript: Option[String], + vendor: Option[String]) + +private[spark] case class TaskResourceRequirement(resourceName: String, amount: Int) + +/** + * Case class representing allocated resource addresses for a specific resource. + * Cluster manager uses the JSON serialization of this case class to pass allocated resource info to + * driver and executors. See the ``--resourcesFile`` option there. + */ +private[spark] case class ResourceAllocation(id: ResourceID, addresses: Seq[String]) { + def toResourceInformation: ResourceInformation = { + new ResourceInformation(id.resourceName, addresses.toArray) + } +} + +private[spark] object ResourceUtils extends Logging { + + // config suffixes + val DISCOVERY_SCRIPT = "discoveryScript" + val VENDOR = "vendor" + // user facing configs use .amount to allow to extend in the future, + // internally we currently only support addresses, so its just an integer count + val AMOUNT = "amount" + + def parseResourceRequest(sparkConf: SparkConf, resourceId: ResourceID): ResourceRequest = { + val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap + val amount = settings.getOrElse(AMOUNT, + throw new SparkException(s"You must specify an amount for ${resourceId.resourceName}") + ).toInt + val discoveryScript = settings.get(DISCOVERY_SCRIPT) + val vendor = settings.get(VENDOR) + ResourceRequest(resourceId, amount, discoveryScript, vendor) + } + + def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = { + sparkConf.getAllWithPrefix(s"$componentName.resource.").map { case (key, _) => + key.substring(0, key.indexOf('.')) + }.toSet.toSeq.map(name => ResourceID(componentName, name)) + } + + def parseAllResourceRequests( + sparkConf: SparkConf, + componentName: String): Seq[ResourceRequest] = { + listResourceIds(sparkConf, componentName).map { id => + parseResourceRequest(sparkConf, id) + } + } + + def parseTaskResourceRequirements(sparkConf: SparkConf): Seq[TaskResourceRequirement] = { + parseAllResourceRequests(sparkConf, SPARK_TASK_PREFIX).map { request => + TaskResourceRequirement(request.id.resourceName, request.amount) + } + } + + private def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = { + implicit val formats = DefaultFormats + val json = new String(Files.readAllBytes(Paths.get(resourcesFile))) + try { + parse(json).extract[Seq[ResourceAllocation]] + } catch { + case NonFatal(e) => + throw new SparkException(s"Error parsing resources file $resourcesFile", e) + } + } + + private def parseAllocatedOrDiscoverResources( + sparkConf: SparkConf, + componentName: String, + resourcesFileOpt: Option[String]): Seq[ResourceAllocation] = { + val allocated = resourcesFileOpt.toSeq.flatMap(parseAllocatedFromJsonFile) + .filter(_.id.componentName == componentName) + val otherResourceIds = listResourceIds(sparkConf, componentName).diff(allocated.map(_.id)) + allocated ++ otherResourceIds.map { id => + val request = parseResourceRequest(sparkConf, id) + ResourceAllocation(id, discoverResource(request).addresses) + } + } + + private def assertResourceAllocationMeetsRequest( + allocation: ResourceAllocation, + request: ResourceRequest): Unit = { + require(allocation.id == request.id && allocation.addresses.size >= request.amount, + s"Resource: ${allocation.id.resourceName}, with addresses: " + + s"${allocation.addresses.mkString(",")} " + + s"is less than what the user requested: ${request.amount})") + } + + private def assertAllResourceAllocationsMeetRequests( + allocations: Seq[ResourceAllocation], + requests: Seq[ResourceRequest]): Unit = { + val allocated = allocations.map(x => x.id -> x).toMap + requests.foreach(r => assertResourceAllocationMeetsRequest(allocated(r.id), r)) + } + + /** + * Gets all allocated resource information for the input component from input resources file and + * discover the remaining via discovery scripts. + * It also verifies the resource allocation meets required amount for each resource. + * @return a map from resource name to resource info + */ + def getOrDiscoverAllResources( + sparkConf: SparkConf, + componentName: String, + resourcesFileOpt: Option[String]): Map[String, ResourceInformation] = { + val requests = parseAllResourceRequests(sparkConf, componentName) + val allocations = parseAllocatedOrDiscoverResources(sparkConf, componentName, resourcesFileOpt) + assertAllResourceAllocationsMeetRequests(allocations, requests) + val resourceInfoMap = allocations.map(a => (a.id.resourceName, a.toResourceInformation)).toMap + logInfo("==============================================================") + logInfo(s"Resources for $componentName:\n${resourceInfoMap.mkString("\n")}") + logInfo("==============================================================") + resourceInfoMap + } + + // visible for test + private[spark] def discoverResource(resourceRequest: ResourceRequest): ResourceInformation = { + val resourceName = resourceRequest.id.resourceName + val script = resourceRequest.discoveryScript + val result = if (script.nonEmpty) { + val scriptFile = new File(script.get) + // check that script exists and try to execute + if (scriptFile.exists()) { + val output = executeAndGetOutput(Seq(script.get), new File(".")) + ResourceInformation.parseJson(output) + } else { + throw new SparkException(s"Resource script: $scriptFile to discover $resourceName " + + "doesn't exist!") + } + } else { + throw new SparkException(s"User is expecting to use resource: $resourceName but " + + "didn't specify a discovery script!") + } + if (!result.name.equals(resourceName)) { + throw new SparkException(s"Error running the resource discovery script ${script.get}: " + + s"script returned resource name ${result.name} and we were expecting $resourceName.") + } + result + } + + // known types of resources + final val GPU: String = "gpu" + final val FPGA: String = "fpga" +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index bb44e9a..9dfbf86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -26,6 +26,7 @@ import org.apache.spark.internal.config.APP_CALLER_CONTEXT import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.resource.ResourceInformation import org.apache.spark.util._ /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index c29ee06..247cfe7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import scala.collection.mutable.{ArrayBuffer, HashMap, Map} -import org.apache.spark.ResourceInformation +import org.apache.spark.resource.ResourceInformation import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index cf07847..242486c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -28,9 +28,9 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.RpcEndpoint import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality @@ -93,7 +93,7 @@ private[spark] class TaskSchedulerImpl( val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK) // Resources to request per task - val resourcesPerTask = conf.getTaskResourceRequirements() + val resourcesReqsPerTask = ResourceUtils.parseTaskResourceRequirements(sc.conf) // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. Protected by `this` @@ -382,8 +382,8 @@ private[spark] class TaskSchedulerImpl( * Check whether the resources from the WorkerOffer are enough to run at least one task. */ private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = { - resourcesPerTask.forall { case (rName, rNum) => - resources.contains(rName) && resources(rName).size >= rNum + resourcesReqsPerTask.forall { req => + resources.contains(req.resourceName) && resources(req.resourceName).size >= req.amount } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6f2b982..e7645fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -30,6 +30,7 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceInformation import org.apache.spark.scheduler.SchedulingMode._ import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils} import org.apache.spark.util.collection.MedianHeap @@ -534,14 +535,16 @@ private[spark] class TaskSetManager( logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)") - val extraResources = sched.resourcesPerTask.map { case (rName, rNum) => + val extraResources = sched.resourcesReqsPerTask.map { taskReq => + val rName = taskReq.resourceName + val count = taskReq.amount val rAddresses = availableResources.getOrElse(rName, Seq.empty) - assert(rAddresses.size >= rNum, s"Required $rNum $rName addresses, but only " + + assert(rAddresses.size >= count, s"Required $count $rName addresses, but only " + s"${rAddresses.size} available.") // We'll drop the allocated addresses later inside TaskSchedulerImpl. - val allocatedAddresses = rAddresses.take(rNum) + val allocatedAddresses = rAddresses.take(count) (rName, new ResourceInformation(rName, allocatedAddresses.toArray)) - } + }.toMap sched.dagScheduler.taskStarted(task, info) new TaskDescription( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 82d51f8..a90fff0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -19,8 +19,8 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer -import org.apache.spark.ResourceInformation import org.apache.spark.TaskState.TaskState +import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.ExecutorLossReason import org.apache.spark.util.SerializableBuffer diff --git a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala deleted file mode 100644 index 2272573..0000000 --- a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.io.File -import java.nio.charset.StandardCharsets -import java.nio.file.{Files => JavaFiles} -import java.nio.file.attribute.PosixFilePermission._ -import java.util.EnumSet - -import com.google.common.io.Files - -import org.apache.spark.ResourceName._ -import org.apache.spark.internal.config._ -import org.apache.spark.util.Utils - -class ResourceDiscovererSuite extends SparkFunSuite - with LocalSparkContext { - - def mockDiscoveryScript(file: File, result: String): String = { - Files.write(s"echo $result", file, StandardCharsets.UTF_8) - JavaFiles.setPosixFilePermissions(file.toPath(), - EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) - file.getPath() - } - - test("Resource discoverer no resources") { - val sparkconf = new SparkConf - val resources = - ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX) - assert(resources.size === 0) - assert(resources.get(GPU).isEmpty, - "Should have a gpus entry that is empty") - } - - test("Resource discoverer multiple gpus") { - val sparkconf = new SparkConf - assume(!(Utils.isWindows)) - withTempDir { dir => - val gpuFile = new File(dir, "gpuDiscoverScript") - val scriptPath = mockDiscoveryScript(gpuFile, - """'{"name": "gpu","addresses":["0", "1"]}'""") - sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath) - val resources = - ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX) - val gpuValue = resources.get(GPU) - assert(gpuValue.nonEmpty, "Should have a gpu entry") - assert(gpuValue.get.name == "gpu", "name should be gpu") - assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") - assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries") - } - } - - test("Resource discoverer passed in resources") { - val sparkconf = new SparkConf - assume(!(Utils.isWindows)) - withTempDir { dir => - val gpuFile = new File(dir, "gpuDiscoverScript") - val gpuScript = mockDiscoveryScript(gpuFile, - """'{"name": "gpu","addresses":["0", "1"]}'""") - val fpgaFile = new File(dir, "fpgaDiscoverScript") - val fpgaScript = mockDiscoveryScript(fpgaFile, - """'{"name": "fpga","addresses":["f0", "f1"]}'""") - sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuScript) - sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaScript) - // it should only look at the resources passed in and ignore fpga conf - val resources = - ResourceDiscoverer.discoverResourcesInformation(sparkconf, - SPARK_EXECUTOR_RESOURCE_PREFIX, Some(Set(GPU))) - assert(resources.size === 1, "should only have the gpu resource") - val gpuValue = resources.get(GPU) - assert(gpuValue.nonEmpty, "Should have a gpu entry") - assert(gpuValue.get.name == "gpu", "name should be gpu") - assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") - assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries") - } - } - - test("Resource discoverer no addresses errors") { - val sparkconf = new SparkConf - assume(!(Utils.isWindows)) - withTempDir { dir => - val gpuFile = new File(dir, "gpuDiscoverScript") - val scriptPath = mockDiscoveryScript(gpuFile, - """'{"name": "gpu"}'""") - sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath) - val resources = - ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX) - val gpuValue = resources.get(GPU) - assert(gpuValue.nonEmpty, "Should have a gpu entry") - assert(gpuValue.get.name == "gpu", "name should be gpu") - assert(gpuValue.get.addresses.size == 0, "Should have 0 indexes") - } - } - - test("Resource discoverer multiple resource types") { - val sparkconf = new SparkConf - assume(!(Utils.isWindows)) - withTempDir { dir => - val gpuFile = new File(dir, "gpuDiscoverScript") - val gpuDiscovery = mockDiscoveryScript(gpuFile, - """'{"name": "gpu", "addresses": ["0", "1"]}'""") - sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery) - - val fpgaFile = new File(dir, "fpgaDiscoverScript") - val fpgaDiscovery = mockDiscoveryScript(fpgaFile, - """'{"name": "fpga", "addresses": ["f1", "f2", "f3"]}'""") - sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaDiscovery) - - val resources = - ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX) - assert(resources.size === 2) - val gpuValue = resources.get(GPU) - assert(gpuValue.nonEmpty, "Should have a gpu entry") - assert(gpuValue.get.name == "gpu", "name should be gpu") - assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") - assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries") - - val fpgaValue = resources.get(FPGA) - assert(fpgaValue.nonEmpty, "Should have a gpu entry") - assert(fpgaValue.get.name == "fpga", "name should be fpga") - assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes") - assert(fpgaValue.get.addresses.deep == Array("f1", "f2", "f3").deep, - "should have f1,f2,f3 entries") - } - } - - test("Resource discoverer multiple gpus on driver") { - val sparkconf = new SparkConf - assume(!(Utils.isWindows)) - withTempDir { dir => - val gpuFile = new File(dir, "gpuDiscoverScript") - val gpuDiscovery = mockDiscoveryScript(gpuFile, - """'{"name": "gpu", "addresses": ["0", "1"]}'""") - sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery) - sparkconf set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, "boguspath") - // make sure it reads from correct config, here it should use driver - val resources = - ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_DRIVER_RESOURCE_PREFIX) - val gpuValue = resources.get(GPU) - assert(gpuValue.nonEmpty, "Should have a gpu entry") - assert(gpuValue.get.name == "gpu", "name should be gpu") - assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") - assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries") - } - } - - test("Resource discoverer script returns mismatched name") { - val sparkconf = new SparkConf - assume(!(Utils.isWindows)) - withTempDir { dir => - val gpuFile = new File(dir, "gpuDiscoverScript") - val gpuDiscovery = mockDiscoveryScript(gpuFile, - """'{"name": "fpga", "addresses": ["0", "1"]}'""") - sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery) - val error = intercept[SparkException] { - ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_DRIVER_RESOURCE_PREFIX) - }.getMessage() - - assert(error.contains("Error running the resource discovery script")) - } - } - - test("Resource discoverer script returns invalid format") { - val sparkconf = new SparkConf - assume(!(Utils.isWindows)) - withTempDir { dir => - val gpuFile = new File(dir, "gpuDiscoverScript") - val gpuDiscovery = mockDiscoveryScript(gpuFile, - """'{"addresses": ["0", "1"]}'""") - sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery) - val error = intercept[SparkException] { - ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX) - }.getMessage() - - assert(error.contains("Error running the resource discovery")) - } - } - - test("Resource discoverer script doesn't exist") { - val sparkconf = new SparkConf - withTempDir { dir => - val file1 = new File(dir, "bogusfilepath") - try { - sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, file1.getPath()) - - val error = intercept[SparkException] { - ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX) - }.getMessage() - - assert(error.contains("doesn't exist")) - } finally { - JavaFiles.deleteIfExists(file1.toPath()) - } - } - } - - test("gpu's specified but not discovery script") { - val sparkconf = new SparkConf - sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_AMOUNT_SUFFIX, "2") - - val error = intercept[SparkException] { - ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX) - }.getMessage() - - assert(error.contains("User is expecting to use")) - } - -} diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index c1265ce..74f5854 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -25,12 +25,14 @@ import scala.util.{Random, Try} import com.esotericsoftware.kryo.Kryo -import org.apache.spark.ResourceName._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Kryo._ import org.apache.spark.internal.config.Network._ import org.apache.spark.network.util.ByteUnit +import org.apache.spark.resource.ResourceID +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer} import org.apache.spark.util.{ResetSystemProperties, RpcUtils, Utils} @@ -111,34 +113,6 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(conf.getOption("k4") === None) } - test("basic getAllWithPrefixAndPostfix") { - val conf = new SparkConf(false) - conf.set("spark.prefix.main.suffix", "v1") - val prefix = "spark.prefix." - val suffix = ".suffix" - assert(conf.getAllWithPrefixAndSuffix(prefix, suffix).toSet === - Set(("main", "v1"))) - - conf.set("spark.prefix.main2.suffix", "v2") - conf.set("spark.prefix.main3.extra1.suffix", "v3") - conf.set("spark.prefix.main4.extra2.nonmatchingsuffix", "v4") - conf.set("spark.notmatchingprefix.main4.suffix", "v5") - - assert(conf.getAllWithPrefixAndSuffix(prefix, suffix).toSet === - Set(("main", "v1"), ("main2", "v2"), ("main3.extra1", "v3"))) - } - - test("test prefix config parsing utilities") { - val conf = new SparkConf(false) - conf.set("spark.prefix.main.suffix", "v1") - val prefix = "spark.prefix." - val suffix = ".suffix" - val configsWithPrefix = conf.getAllWithPrefix(prefix) - assert(configsWithPrefix.toSet === Set(("main.suffix", "v1"))) - assert(SparkConf.getBaseOfConfigs(configsWithPrefix) === Set("main")) - assert(SparkConf.getConfigsWithSuffix(configsWithPrefix, suffix).toSet === Set(("main", "v1"))) - } - test("basic getAllWithPrefix") { val prefix = "spark.prefix." val conf = new SparkConf(false) @@ -450,23 +424,20 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("get task resource requirement from config") { val conf = new SparkConf() - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") - conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") - var taskResourceRequirement = conf.getTaskResourceRequirements() + conf.set(TASK_GPU_ID.amountConf, "2") + conf.set(TASK_FPGA_ID.amountConf, "1") + var taskResourceRequirement = + parseTaskResourceRequirements(conf).map(req => (req.resourceName, req.amount)).toMap + assert(taskResourceRequirement.size == 2) assert(taskResourceRequirement(GPU) == 2) assert(taskResourceRequirement(FPGA) == 1) - conf.remove(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX) + conf.remove(TASK_FPGA_ID.amountConf) // Ignore invalid prefix - conf.set("spark.invalid.prefix" + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") - taskResourceRequirement = conf.getTaskResourceRequirements() - assert(taskResourceRequirement.size == 1) - assert(taskResourceRequirement.get(FPGA).isEmpty) - - // Ignore invalid suffix - conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + "invalid.suffix", "1") - taskResourceRequirement = conf.getTaskResourceRequirements() + conf.set(ResourceID("spark.invalid.prefix", FPGA).amountConf, "1") + taskResourceRequirement = + parseTaskResourceRequirements(conf).map(req => (req.resourceName, req.amount)).toMap assert(taskResourceRequirement.size == 1) assert(taskResourceRequirement.get(FPGA).isEmpty) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index d1a36d4..fa2c4bd 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -20,9 +20,6 @@ package org.apache.spark import java.io.File import java.net.{MalformedURLException, URI} import java.nio.charset.StandardCharsets -import java.nio.file.{Files => JavaFiles} -import java.nio.file.attribute.PosixFilePermission._ -import java.util.EnumSet import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit} import scala.concurrent.duration._ @@ -33,15 +30,16 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} -import org.json4s.JsonAST.JArray -import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods.{compact, render} +import org.json4s.{DefaultFormats, Extraction} import org.scalatest.Matchers._ import org.scalatest.concurrent.Eventually -import org.apache.spark.ResourceName.GPU +import org.apache.spark.TestUtils._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ +import org.apache.spark.resource.ResourceAllocation +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.{ThreadUtils, Utils} @@ -739,19 +737,16 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } - test("test gpu driver discovery under local-cluster mode") { + test("test driver discovery under local-cluster mode") { withTempDir { dir => - val gpuFile = new File(dir, "gpuDiscoverScript") - val scriptPath = mockDiscoveryScript(gpuFile, - """'{"name": "gpu","addresses":["5", "6"]}'""") + val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript", + """{"name": "gpu","addresses":["5", "6"]}""") val conf = new SparkConf() - .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_AMOUNT_SUFFIX, "1") - .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath) .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") + conf.set(DRIVER_GPU_ID.amountConf, "1") + conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath) sc = new SparkContext(conf) // Ensure all executors has started @@ -764,39 +759,31 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } - private def writeJsonFile(dir: File, strToWrite: JArray): String = { - val f1 = File.createTempFile("test-resource-parser1", "", dir) - JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes()) - f1.getPath() - } - test("test gpu driver resource files and discovery under local-cluster mode") { withTempDir { dir => - val gpuFile = new File(dir, "gpuDiscoverScript") - val scriptPath = mockDiscoveryScript(gpuFile, - """'{"name": "gpu","addresses":["5", "6"]}'""") + val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript", + """{"name": "gpu","addresses":["5", "6"]}""") + implicit val formats = DefaultFormats val gpusAllocated = - ("name" -> "gpu") ~ - ("addresses" -> Seq("0", "1", "8")) - val ja = JArray(List(gpusAllocated)) - val resourcesFile = writeJsonFile(dir, ja) + ResourceAllocation(DRIVER_GPU_ID, Seq("0", "1", "8")) + val ja = Extraction.decompose(Seq(gpusAllocated)) + val resourcesFile = createTempJsonFile(dir, "resources", ja) val conf = new SparkConf() - .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_AMOUNT_SUFFIX, "1") - .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath) .set(DRIVER_RESOURCES_FILE, resourcesFile) .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") + conf.set(DRIVER_GPU_ID.amountConf, "1") + conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath) + sc = new SparkContext(conf) // Ensure all executors has started eventually(timeout(10.seconds)) { assert(sc.statusTracker.getExecutorInfos.size == 1) } - // driver gpu addresses config should take precedence over the script + // driver gpu resources file should take precedence over the script assert(sc.resources.size === 1) assert(sc.resources.get(GPU).get.addresses === Array("0", "1", "8")) assert(sc.resources.get(GPU).get.name === "gpu") @@ -805,10 +792,9 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test("Test parsing resources task configs with missing executor config") { val conf = new SparkConf() - .set(SPARK_TASK_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_AMOUNT_SUFFIX, "1") .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") + conf.set(TASK_GPU_ID.amountConf, "1") var error = intercept[SparkException] { sc = new SparkContext(conf) @@ -821,28 +807,26 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test("Test parsing resources executor config < task requirements") { val conf = new SparkConf() - .set(SPARK_TASK_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_AMOUNT_SUFFIX, "2") - .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + - SPARK_RESOURCE_AMOUNT_SUFFIX, "1") .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") + conf.set(TASK_GPU_ID.amountConf, "2") + conf.set(EXECUTOR_GPU_ID.amountConf, "1") var error = intercept[SparkException] { sc = new SparkContext(conf) }.getMessage() - assert(error.contains("The executor resource config: " + - "spark.executor.resource.gpu.amount = 1 has to be >= the task config: " + + assert(error.contains("The executor resource config: spark.executor.resource.gpu.amount = 1 " + + "has to be >= the requested amount in task resource config: " + "spark.task.resource.gpu.amount = 2")) } test("Parse resources executor config not the same multiple numbers of the task requirements") { val conf = new SparkConf() - .set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") - .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "4") .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") + conf.set(TASK_GPU_ID.amountConf, "2") + conf.set(EXECUTOR_GPU_ID.amountConf, "4") var error = intercept[SparkException] { sc = new SparkContext(conf) @@ -853,32 +837,21 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu "executor to: 1. Please adjust your configuration.")) } - def mockDiscoveryScript(file: File, result: String): String = { - Files.write(s"echo $result", file, StandardCharsets.UTF_8) - JavaFiles.setPosixFilePermissions(file.toPath(), - EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) - file.getPath() - } - test("test resource scheduling under local-cluster mode") { import org.apache.spark.TestUtils._ assume(!(Utils.isWindows)) withTempDir { dir => - val resourceFile = new File(dir, "resourceDiscoverScript") - val resources = """'{"name": "gpu", "addresses": ["0", "1", "2"]}'""" - Files.write(s"echo $resources", resourceFile, StandardCharsets.UTF_8) - JavaFiles.setPosixFilePermissions(resourceFile.toPath(), - EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) - val discoveryScript = resourceFile.getPath() + val discoveryScript = createTempScriptWithExpectedOutput(dir, "resourceDiscoveryScript", + """{"name": "gpu","addresses":["0", "1", "2"]}""") val conf = new SparkConf() - .set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3") - .set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX}", - discoveryScript) .setMaster("local-cluster[3, 3, 1024]") .setAppName("test-cluster") - setTaskResourceRequirement(conf, GPU, 1) + conf.set(TASK_GPU_ID.amountConf, "1") + conf.set(EXECUTOR_GPU_ID.amountConf, "3") + conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, discoveryScript) + sc = new SparkContext(conf) // Ensure all executors has started diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 3c4d51f..693b0ee 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -20,26 +20,23 @@ package org.apache.spark.executor import java.io.File import java.net.URL import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.nio.file.{Files => JavaFiles} -import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE} -import java.util.{EnumSet, Properties} +import java.util.Properties import scala.collection.mutable import scala.concurrent.duration._ -import com.google.common.io.Files +import org.json4s.{DefaultFormats, Extraction} import org.json4s.JsonAST.{JArray, JObject} import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods.{compact, render} import org.mockito.Mockito.when import org.scalatest.concurrent.Eventually.{eventually, timeout} import org.scalatest.mockito.MockitoSugar import org.apache.spark._ -import org.apache.spark.ResourceInformation -import org.apache.spark.ResourceName._ -import org.apache.spark.internal.config._ +import org.apache.spark.TestUtils._ +import org.apache.spark.resource.{ResourceAllocation, ResourceInformation} +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.LaunchTask @@ -49,15 +46,11 @@ import org.apache.spark.util.{SerializableBuffer, Utils} class CoarseGrainedExecutorBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { - private def writeFileWithJson(dir: File, strToWrite: JArray): String = { - val f1 = File.createTempFile("test-resource-parser1", "", dir) - JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes()) - f1.getPath() - } + implicit val formats = DefaultFormats test("parsing no resources") { val conf = new SparkConf - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") + conf.set(TASK_GPU_ID.amountConf, "2") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) @@ -67,46 +60,45 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite withTempDir { tmpDir => val testResourceArgs: JObject = ("" -> "") val ja = JArray(List(testResourceArgs)) - val f1 = writeFileWithJson(tmpDir, ja) + val f1 = createTempJsonFile(tmpDir, "resources", ja) var error = intercept[SparkException] { val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() - assert(error.contains("Exception parsing the resources in"), + assert(error.contains("Error parsing resources file"), s"Calling with no resources didn't error as expected, error: $error") } } - test("parsing one resources") { + test("parsing one resource") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") + conf.set(EXECUTOR_GPU_ID.amountConf, "2") + conf.set(TASK_GPU_ID.amountConf, "2") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", 4, Seq.empty[URL], env, None) withTempDir { tmpDir => - val testResourceArgs = - ("name" -> "gpu") ~ - ("addresses" -> Seq("0", "1")) - val ja = JArray(List(testResourceArgs)) - val f1 = writeFileWithJson(tmpDir, ja) + val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) + val ja = Extraction.decompose(Seq(ra)) + val f1 = createTempJsonFile(tmpDir, "resources", ja) val parsedResources = backend.parseOrFindResources(Some(f1)) assert(parsedResources.size === 1) assert(parsedResources.get(GPU).nonEmpty) - assert(parsedResources.get(GPU).get.name === "gpu") + assert(parsedResources.get(GPU).get.name === GPU) assert(parsedResources.get(GPU).get.addresses.deep === Array("0", "1").deep) } } test("parsing multiple resources") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") - conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") + conf.set(EXECUTOR_GPU_ID.amountConf, "2") + conf.set(TASK_GPU_ID.amountConf, "2") + conf.set(EXECUTOR_FPGA_ID.amountConf, "3") + conf.set(TASK_FPGA_ID.amountConf, "3") + val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function @@ -114,30 +106,27 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite 4, Seq.empty[URL], env, None) withTempDir { tmpDir => - val gpuArgs = - ("name" -> "gpu") ~ - ("addresses" -> Seq("0", "1")) + val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val fpgaArgs = - ("name" -> "fpga") ~ - ("addresses" -> Seq("f1", "f2", "f3")) - val ja = JArray(List(gpuArgs, fpgaArgs)) - val f1 = writeFileWithJson(tmpDir, ja) + ResourceAllocation(EXECUTOR_FPGA_ID, Seq("f1", "f2", "f3")) + val ja = Extraction.decompose(Seq(gpuArgs, fpgaArgs)) + val f1 = createTempJsonFile(tmpDir, "resources", ja) val parsedResources = backend.parseOrFindResources(Some(f1)) assert(parsedResources.size === 2) assert(parsedResources.get(GPU).nonEmpty) - assert(parsedResources.get(GPU).get.name === "gpu") + assert(parsedResources.get(GPU).get.name === GPU) assert(parsedResources.get(GPU).get.addresses.deep === Array("0", "1").deep) assert(parsedResources.get(FPGA).nonEmpty) - assert(parsedResources.get(FPGA).get.name === "fpga") + assert(parsedResources.get(FPGA).get.name === FPGA) assert(parsedResources.get(FPGA).get.addresses.deep === Array("f1", "f2", "f3").deep) } } test("error checking parsing resources and executor and task configs") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2") + conf.set(EXECUTOR_GPU_ID.amountConf, "2") + conf.set(TASK_GPU_ID.amountConf, "2") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function @@ -146,13 +135,11 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // not enough gpu's on the executor withTempDir { tmpDir => - val gpuArgs = - ("name" -> "gpu") ~ - ("addresses" -> Seq("0")) - val ja = JArray(List(gpuArgs)) - val f1 = writeFileWithJson(tmpDir, ja) + val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0")) + val ja = Extraction.decompose(Seq(gpuArgs)) + val f1 = createTempJsonFile(tmpDir, "resources", ja) - var error = intercept[SparkException] { + var error = intercept[IllegalArgumentException] { val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() @@ -162,24 +149,23 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // missing resource on the executor withTempDir { tmpDir => - val gpuArgs = - ("name" -> "fpga") ~ - ("addresses" -> Seq("0")) - val ja = JArray(List(gpuArgs)) - val f1 = writeFileWithJson(tmpDir, ja) + val fpga = ResourceAllocation(EXECUTOR_FPGA_ID, Seq("0")) + val ja = Extraction.decompose(Seq(fpga)) + val f1 = createTempJsonFile(tmpDir, "resources", ja) var error = intercept[SparkException] { val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() - assert(error.contains("Resource: gpu required but wasn't discovered on startup")) + assert(error.contains("User is expecting to use resource: gpu but didn't specify a " + + "discovery script!")) } } test("executor resource found less than required") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "4") - conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "1") + conf.set(EXECUTOR_GPU_ID.amountConf, "4") + conf.set(TASK_GPU_ID.amountConf, "1") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function @@ -188,33 +174,28 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // executor resources < required withTempDir { tmpDir => - val gpuArgs = - ("name" -> "gpu") ~ - ("addresses" -> Seq("0", "1")) - val ja = JArray(List(gpuArgs)) - val f1 = writeFileWithJson(tmpDir, ja) + val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) + val ja = Extraction.decompose(Seq(gpuArgs)) + val f1 = createTempJsonFile(tmpDir, "resources", ja) - var error = intercept[SparkException] { + var error = intercept[IllegalArgumentException] { val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() - assert(error.contains("gpu, with addresses: 0,1 is less than what the user requested: 4")) + assert(error.contains("Resource: gpu, with addresses: 0,1 is less than what the " + + "user requested: 4")) } } - test("use discoverer") { + test("use resource discovery") { val conf = new SparkConf - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") - conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") + conf.set(EXECUTOR_FPGA_ID.amountConf, "3") + conf.set(TASK_FPGA_ID.amountConf, "3") assume(!(Utils.isWindows)) withTempDir { dir => - val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga") - Files.write("""echo '{"name": "fpga","addresses":["f1", "f2", "f3"]}'""", - fpgaDiscovery, StandardCharsets.UTF_8) - JavaFiles.setPosixFilePermissions(fpgaDiscovery.toPath(), - EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) - conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + - SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaDiscovery.getPath()) + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3"]}""") + conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, scriptPath) val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) @@ -227,11 +208,43 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite assert(parsedResources.size === 1) assert(parsedResources.get(FPGA).nonEmpty) - assert(parsedResources.get(FPGA).get.name === "fpga") + assert(parsedResources.get(FPGA).get.name === FPGA) + assert(parsedResources.get(FPGA).get.addresses.deep === Array("f1", "f2", "f3").deep) + } + } + + test("use resource discovery and allocated file option") { + val conf = new SparkConf + conf.set(EXECUTOR_FPGA_ID.amountConf, "3") + conf.set(TASK_FPGA_ID.amountConf, "3") + assume(!(Utils.isWindows)) + withTempDir { dir => + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3"]}""") + conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, scriptPath) + + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) + val ja = Extraction.decompose(Seq(gpuArgs)) + val f1 = createTempJsonFile(dir, "resources", ja) + val parsedResources = backend.parseOrFindResources(Some(f1)) + + assert(parsedResources.size === 2) + assert(parsedResources.get(GPU).nonEmpty) + assert(parsedResources.get(GPU).get.name === GPU) + assert(parsedResources.get(GPU).get.addresses.deep === Array("0", "1").deep) + assert(parsedResources.get(FPGA).nonEmpty) + assert(parsedResources.get(FPGA).get.name === FPGA) assert(parsedResources.get(FPGA).get.addresses.deep === Array("f1", "f2", "f3").deep) } } + test("track allocated resources by taskId") { val conf = new SparkConf val securityMgr = new SecurityManager(conf) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 495321f..8edf957 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -45,6 +45,7 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.memory.TestMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rdd.RDD +import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcTimeout} import org.apache.spark.scheduler.{FakeTask, ResultTask, Task, TaskDescription} import org.apache.spark.serializer.{JavaSerializer, SerializerManager} diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceInformationSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceInformationSuite.scala new file mode 100644 index 0000000..f5044c2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/resource/ResourceInformationSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.resource + +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.{SparkException, SparkFunSuite} + +class ResourceInformationSuite extends SparkFunSuite { + test("ResourceInformation.parseJson for valid JSON") { + val json1 = compact(render(("name" -> "p100") ~ ("addresses" -> Seq("0", "1")))) + val info1 = ResourceInformation.parseJson(json1) + assert(info1.name === "p100") + assert(info1.addresses === Array("0", "1")) + + // Currently we allow empty addresses. + val json2 = compact(render("name" -> "fpga")) + val info2 = ResourceInformation.parseJson(json2) + assert(info2.name === "fpga") + assert(info2.addresses.isEmpty) + + val json3 = compact(render("addresses" -> Seq("0"))) + val json4 = "invalid_json" + for (invalidJson <- Seq(json3, json4)) { + val ex = intercept[SparkException] { + print(ResourceInformation.parseJson(invalidJson)) + } + assert(ex.getMessage.contains("Error parsing JSON into ResourceInformation"), + "Error message should provide context.") + assert(ex.getMessage.contains(invalidJson), "Error message should include input json.") + } + } + + test("ResourceInformation.equals/hashCode") { + val a1 = new ResourceInformation("a", addresses = Array("0")) + val a21 = new ResourceInformation("a", addresses = Array("0", "1")) + val a22 = new ResourceInformation("a", addresses = Array("0", "1")) + val b2 = new ResourceInformation("b", addresses = Array("0", "1")) + object A2 extends ResourceInformation("a", Array("0", "1")) + assert(a1.equals(null) === false) + assert(a1.equals(a1)) + assert(a1.equals(a21) === false) + assert(a21.equals(a22) && a21.hashCode() === a22.hashCode()) + assert(a21.equals(b2) === false) + assert(a21.equals(A2) === false) + } +} diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala new file mode 100644 index 0000000..51a92e0 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import java.io.File +import java.nio.file.{Files => JavaFiles} + +import org.json4s.{DefaultFormats, Extraction} + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.TestUtils._ +import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs._ +import org.apache.spark.util.Utils + +class ResourceUtilsSuite extends SparkFunSuite + with LocalSparkContext { + + test("ResourceID") { + val componentName = "spark.test" + val resourceName = "p100" + val id = ResourceID(componentName, resourceName) + val confPrefix = s"$componentName.resource.$resourceName." + assert(id.confPrefix === confPrefix) + assert(id.amountConf === s"${confPrefix}amount") + assert(id.discoveryScriptConf === s"${confPrefix}discoveryScript") + assert(id.vendorConf === s"${confPrefix}vendor") + } + + test("Resource discoverer no addresses errors") { + val conf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoverScript", + """{"name": "gpu"}""") + conf.set(EXECUTOR_GPU_ID.amountConf, "2") + conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, scriptPath) + + val error = intercept[IllegalArgumentException] { + getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, None) + }.getMessage() + assert(error.contains("Resource: gpu, with " + + "addresses: is less than what the user requested: 2")) + } + } + + test("Resource discoverer multiple resource types") { + val conf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val gpuDiscovery = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript", + """{"name": "gpu", "addresses": ["0", "1"]}""") + conf.set(EXECUTOR_GPU_ID.amountConf, "2") + conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, gpuDiscovery) + + val fpgaDiscovery = createTempScriptWithExpectedOutput(dir, "fpgDiscoverScript", + """{"name": "fpga", "addresses": ["f1", "f2", "f3"]}""") + conf.set(EXECUTOR_FPGA_ID.amountConf, "2") + conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, fpgaDiscovery) + + val resources = getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, None) + assert(resources.size === 2) + val gpuValue = resources.get(GPU) + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.name == "gpu", "name should be gpu") + assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") + assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries") + + val fpgaValue = resources.get(FPGA) + assert(fpgaValue.nonEmpty, "Should have a gpu entry") + assert(fpgaValue.get.name == "fpga", "name should be fpga") + assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes") + assert(fpgaValue.get.addresses.deep == Array("f1", "f2", "f3").deep, + "should have f1,f2,f3 entries") + } + } + + test("get from resources file and discover the remaining") { + val conf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + implicit val formats = DefaultFormats + val fpgaAddrs = Seq("f1", "f2", "f3") + val fpgaAllocation = ResourceAllocation(EXECUTOR_FPGA_ID, fpgaAddrs) + val resourcesFile = createTempJsonFile( + dir, "resources", Extraction.decompose(Seq(fpgaAllocation))) + conf.set(EXECUTOR_FPGA_ID.amountConf, "3") + val resourcesFromFileOnly = getOrDiscoverAllResources( + conf, SPARK_EXECUTOR_PREFIX, Some(resourcesFile)) + val expectedFpgaInfo = new ResourceInformation(FPGA, fpgaAddrs.toArray) + assert(resourcesFromFileOnly(FPGA) === expectedFpgaInfo) + + val gpuDiscovery = createTempScriptWithExpectedOutput( + dir, "gpuDiscoveryScript", """{"name": "gpu", "addresses": ["0", "1"]}""") + conf.set(EXECUTOR_GPU_ID.amountConf, "2") + conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, gpuDiscovery) + val resourcesFromBoth = getOrDiscoverAllResources( + conf, SPARK_EXECUTOR_PREFIX, Some(resourcesFile)) + val expectedGpuInfo = new ResourceInformation(GPU, Array("0", "1")) + assert(resourcesFromBoth(FPGA) === expectedFpgaInfo) + assert(resourcesFromBoth(GPU) === expectedGpuInfo) + } + } + + test("list resource ids") { + val conf = new SparkConf + conf.set(DRIVER_GPU_ID.amountConf, "2") + var resources = listResourceIds(conf, SPARK_DRIVER_PREFIX) + assert(resources.size === 1, "should only have GPU for resource") + assert(resources(0).resourceName == GPU, "name should be gpu") + + conf.set(DRIVER_FPGA_ID.amountConf, "2") + val resourcesMap = listResourceIds(conf, SPARK_DRIVER_PREFIX) + .map{ rId => (rId.resourceName, 1)}.toMap + assert(resourcesMap.size === 2, "should only have GPU for resource") + assert(resourcesMap.get(GPU).nonEmpty, "should have GPU") + assert(resourcesMap.get(FPGA).nonEmpty, "should have FPGA") + } + + test("parse resource request") { + val conf = new SparkConf + conf.set(DRIVER_GPU_ID.amountConf, "2") + var request = parseResourceRequest(conf, DRIVER_GPU_ID) + assert(request.id.resourceName === GPU, "should only have GPU for resource") + assert(request.amount === 2, "GPU count should be 2") + assert(request.discoveryScript === None, "discovery script should be empty") + assert(request.vendor === None, "vendor should be empty") + + val vendor = "nvidia.com" + val discoveryScript = "discoveryScriptGPU" + conf.set(DRIVER_GPU_ID.discoveryScriptConf, discoveryScript) + conf.set(DRIVER_GPU_ID.vendorConf, vendor) + request = parseResourceRequest(conf, DRIVER_GPU_ID) + assert(request.id.resourceName === GPU, "should only have GPU for resource") + assert(request.amount === 2, "GPU count should be 2") + assert(request.discoveryScript.get === discoveryScript, "should get discovery script") + assert(request.vendor.get === vendor, "should get vendor") + + conf.remove(DRIVER_GPU_ID.amountConf) + val error = intercept[SparkException] { + request = parseResourceRequest(conf, DRIVER_GPU_ID) + }.getMessage() + + assert(error.contains("You must specify an amount for gpu")) + } + + test("Resource discoverer multiple gpus on driver") { + val conf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val gpuDiscovery = createTempScriptWithExpectedOutput(dir, "gpuDisocveryScript", + """{"name": "gpu", "addresses": ["0", "1"]}""") + conf.set(DRIVER_GPU_ID.amountConf, "2") + conf.set(DRIVER_GPU_ID.discoveryScriptConf, gpuDiscovery) + + // make sure it reads from correct config, here it should use driver + val resources = getOrDiscoverAllResources(conf, SPARK_DRIVER_PREFIX, None) + val gpuValue = resources.get(GPU) + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.name == "gpu", "name should be gpu") + assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") + assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries") + } + } + + test("Resource discoverer script returns mismatched name") { + val conf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val gpuDiscovery = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript", + """{"name": "fpga", "addresses": ["0", "1"]}""") + val request = + ResourceRequest( + DRIVER_GPU_ID, + 2, + Some(gpuDiscovery), + None) + + val error = intercept[SparkException] { + discoverResource(request) + }.getMessage() + + assert(error.contains(s"Error running the resource discovery script $gpuDiscovery: " + + "script returned resource name fpga and we were expecting gpu")) + } + } + + test("Resource discoverer script returns invalid format") { + val conf = new SparkConf + assume(!(Utils.isWindows)) + withTempDir { dir => + val gpuDiscovery = createTempScriptWithExpectedOutput(dir, "gpuDiscoverScript", + """{"addresses": ["0", "1"]}""") + + val request = + ResourceRequest( + EXECUTOR_GPU_ID, + 2, + Some(gpuDiscovery), + None) + + val error = intercept[SparkException] { + discoverResource(request) + }.getMessage() + + assert(error.contains("Error parsing JSON into ResourceInformation")) + } + } + + test("Resource discoverer script doesn't exist") { + val conf = new SparkConf + withTempDir { dir => + val file1 = new File(dir, "bogusfilepath") + try { + val request = + ResourceRequest( + EXECUTOR_GPU_ID, + 2, + Some(file1.getPath()), + None) + + val error = intercept[SparkException] { + discoverResource(request) + }.getMessage() + + assert(error.contains("doesn't exist")) + } finally { + JavaFiles.deleteIfExists(file1.toPath()) + } + } + } + + test("gpu's specified but not a discovery script") { + val request = ResourceRequest(EXECUTOR_GPU_ID, 2, None, None) + + val error = intercept[SparkException] { + discoverResource(request) + }.getMessage() + + assert(error.contains("User is expecting to use resource: gpu but " + + "didn't specify a discovery script!")) + } +} diff --git a/core/src/test/scala/org/apache/spark/ResourceName.scala b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala similarity index 57% rename from core/src/test/scala/org/apache/spark/ResourceName.scala rename to core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala index 6efe064..6d2c07d 100644 --- a/core/src/test/scala/org/apache/spark/ResourceName.scala +++ b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala @@ -15,10 +15,17 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.resource -private[spark] object ResourceName { - // known types of resources - final val GPU: String = "gpu" - final val FPGA: String = "fpga" +import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX, SPARK_TASK_PREFIX} +import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} + +object TestResourceIDs { + val DRIVER_GPU_ID = ResourceID(SPARK_DRIVER_PREFIX, GPU) + val EXECUTOR_GPU_ID = ResourceID(SPARK_EXECUTOR_PREFIX, GPU) + val TASK_GPU_ID = ResourceID(SPARK_TASK_PREFIX, GPU) + + val DRIVER_FPGA_ID = ResourceID(SPARK_DRIVER_PREFIX, FPGA) + val EXECUTOR_FPGA_ID = ResourceID(SPARK_EXECUTOR_PREFIX, FPGA) + val TASK_FPGA_ID = ResourceID(SPARK_TASK_PREFIX, FPGA) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 70d368e..3edbbeb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -32,10 +32,12 @@ import org.scalatest.concurrent.Eventually import org.scalatest.mockito.MockitoSugar._ import org.apache.spark._ -import org.apache.spark.ResourceName.GPU import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.rdd.RDD +import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -187,12 +189,12 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val conf = new SparkConf() .set(EXECUTOR_CORES, 3) - .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "3") .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test .setMaster( "coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]") .setAppName("test") - setTaskResourceRequirement(conf, GPU, 1) + conf.set(TASK_GPU_ID.amountConf, "1") + conf.set(EXECUTOR_GPU_ID.amountConf, "1") sc = new SparkContext(conf) val backend = sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend] diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala index b4ff730..0109d1f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.ResourceName.GPU +import org.apache.spark.resource.ResourceUtils.GPU class ExecutorResourceInfoSuite extends SparkFunSuite { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index 233bc73..5839532f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -23,9 +23,9 @@ import java.util.Properties import scala.collection.mutable.HashMap -import org.apache.spark.ResourceInformation -import org.apache.spark.ResourceName.GPU import org.apache.spark.SparkFunSuite +import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.ResourceUtils.GPU class TaskDescriptionSuite extends SparkFunSuite { test("encoding and then decoding a TaskDescription results in the same TaskDescription") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9a4c7a9..d1b1616 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -29,9 +29,10 @@ import org.scalatest.concurrent.Eventually import org.scalatest.mockito.MockitoSugar import org.apache.spark._ -import org.apache.spark.ResourceName.GPU import org.apache.spark.internal.Logging import org.apache.spark.internal.config +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.util.ManualClock class FakeSchedulerBackend extends SchedulerBackend { @@ -1249,12 +1250,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val taskGpus = 1 val executorGpus = 4 val executorCpus = 4 + val taskScheduler = setupScheduler(numCores = executorCpus, config.CPUS_PER_TASK.key -> taskCpus.toString, - s"${config.SPARK_TASK_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}" -> - taskGpus.toString, - s"${config.SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}" -> - executorGpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, config.EXECUTOR_CORES.key -> executorCpus.toString) val taskSet = FakeTask.createTaskSet(3) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 3d09b10..da566dd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -27,9 +27,10 @@ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.apache.spark._ -import org.apache.spark.ResourceName.GPU import org.apache.spark.internal.Logging import org.apache.spark.internal.config +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AccumulatorV2, ManualClock} @@ -1639,7 +1640,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg import TestUtils._ sc = new SparkContext("local", "test") - setTaskResourceRequirement(sc.conf, GPU, 2) + sc.conf.set(TASK_GPU_ID.amountConf, "2") sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index ddad02a..b1b7751 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -32,8 +32,8 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{SPARK_RESOURCE_AMOUNT_SUFFIX, SPARK_RESOURCE_VENDOR_SUFFIX} import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.resource.ResourceUtils import org.apache.spark.util.{Clock, SystemClock, Utils} import org.apache.spark.util.Utils.getHadoopFileSystem @@ -226,20 +226,14 @@ private[spark] object KubernetesUtils extends Logging { def buildResourcesQuantities( componentName: String, sparkConf: SparkConf): Map[String, Quantity] = { - val allResources = sparkConf.getAllWithPrefix(componentName) - val vendors = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_VENDOR_SUFFIX).toMap - val amounts = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap - val uniqueResources = SparkConf.getBaseOfConfigs(allResources) - - uniqueResources.map { rName => - val vendorDomain = vendors.get(rName).getOrElse(throw new SparkException("Resource: " + - s"$rName was requested, but vendor was not specified.")) - val amount = amounts.get(rName).getOrElse(throw new SparkException(s"Resource: $rName " + - "was requested, but count was not specified.")) + val requests = ResourceUtils.parseAllResourceRequests(sparkConf, componentName) + requests.map { request => + val vendorDomain = request.vendor.getOrElse(throw new SparkException("Resource: " + + s"${request.id.resourceName} was requested, but vendor was not specified.")) val quantity = new QuantityBuilder(false) - .withAmount(amount) + .withAmount(request.amount.toString) .build() - (KubernetesConf.buildKubernetesResourceName(vendorDomain, rName), quantity) + (KubernetesConf.buildKubernetesResourceName(vendorDomain, request.id.resourceName), quantity) }.toMap } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index d10f69f..1944ba9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -89,7 +89,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) } val driverResourceQuantities = - KubernetesUtils.buildResourcesQuantities(SPARK_DRIVER_RESOURCE_PREFIX, conf.sparkConf) + KubernetesUtils.buildResourcesQuantities(SPARK_DRIVER_PREFIX, conf.sparkConf) val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT) val driverBlockManagerPort = conf.sparkConf.getInt( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index d46a9b8..d648755 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -96,7 +96,7 @@ private[spark] class BasicExecutorFeatureStep( .build() val executorResourceQuantities = - KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_RESOURCE_PREFIX, + KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_PREFIX, kubernetesConf.sparkConf) val executorEnv: Seq[EnvVar] = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index df326a2..3706721 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -28,6 +28,8 @@ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ +import org.apache.spark.resource.ResourceID +import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.util.Utils class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -45,7 +47,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { } test("Check the pod respects all configurations from the user.") { - val resources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com"))) + val resourceID = ResourceID(SPARK_DRIVER_PREFIX, GPU) + val resources = + Map(("nvidia.com/gpu" -> TestResourceInformation(resourceID, "2", "nvidia.com"))) val sparkConf = new SparkConf() .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") .set(DRIVER_CORES, 2) @@ -55,12 +59,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(CONTAINER_IMAGE, "spark-driver:latest") .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS) resources.foreach { case (_, testRInfo) => - sparkConf.set( - s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", - testRInfo.count) - sparkConf.set( - s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}", - testRInfo.vendor) + sparkConf.set(testRInfo.rId.amountConf, testRInfo.count) + sparkConf.set(testRInfo.rId.vendorConf, testRInfo.vendor) } val kubernetesConf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index eb5532e..51067bd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -34,6 +34,9 @@ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso import org.apache.spark.internal.config import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ +import org.apache.spark.resource.ResourceID +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -93,13 +96,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } test("test spark resource missing vendor") { - val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com"))) - // test missing vendor - gpuResources.foreach { case (_, testRInfo) => - baseConf.set( - s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", - testRInfo.count) - } + baseConf.set(EXECUTOR_GPU_ID.amountConf, "2") val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val error = intercept[SparkException] { val executor = step.configurePod(SparkPod.initialPod()) @@ -108,30 +105,24 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } test("test spark resource missing amount") { - val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com"))) - // test missing count - gpuResources.foreach { case (_, testRInfo) => - baseConf.set( - s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}", - testRInfo.vendor) - } + baseConf.set(EXECUTOR_GPU_ID.vendorConf, "nvidia.com") + val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val error = intercept[SparkException] { val executor = step.configurePod(SparkPod.initialPod()) }.getMessage() - assert(error.contains("Resource: gpu was requested, but count was not specified")) + assert(error.contains("You must specify an amount for gpu")) } test("basic executor pod with resources") { - val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")), - ("foo.com/fpga" -> TestResourceInformation("fpga", "f1", "foo.com"))) + val fpgaResourceID = ResourceID(SPARK_EXECUTOR_PREFIX, FPGA) + val gpuExecutorResourceID = ResourceID(SPARK_EXECUTOR_PREFIX, GPU) + val gpuResources = + Map(("nvidia.com/gpu" -> TestResourceInformation(gpuExecutorResourceID, "2", "nvidia.com")), + ("foo.com/fpga" -> TestResourceInformation(fpgaResourceID, "1", "foo.com"))) gpuResources.foreach { case (_, testRInfo) => - baseConf.set( - s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", - testRInfo.count) - baseConf.set( - s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}", - testRInfo.vendor) + baseConf.set(testRInfo.rId.amountConf, testRInfo.count) + baseConf.set(testRInfo.rId.vendorConf, testRInfo.vendor) } val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val executor = step.configurePod(SparkPod.initialPod()) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala index e8be3b0..284887f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala @@ -25,6 +25,7 @@ import org.mockito.Mockito.{mock, when} import org.mockito.invocation.InvocationOnMock import org.apache.spark.deploy.k8s.SparkPod +import org.apache.spark.resource.ResourceID object KubernetesFeaturesTestUtils { @@ -67,5 +68,5 @@ object KubernetesFeaturesTestUtils { list.filter(_.getClass() == desired).map(_.asInstanceOf[T]) } - case class TestResourceInformation(rName: String, count: String, vendor: String) + case class TestResourceInformation(rId: ResourceID, count: String, vendor: String) } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index b70be7a..79a57ad 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -36,10 +36,11 @@ import org.mockito.ArgumentMatchers.{any, anyLong, eq => meq} import org.mockito.Mockito._ import org.scalatest.mockito.MockitoSugar -import org.apache.spark.{LocalSparkContext, ResourceInformation, SparkConf, SparkContext, +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.mesos.config._ import org.apache.spark.executor.MesosExecutorBackend +import org.apache.spark.resource.ResourceInformation import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.ExecutorInfo diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0eade29..5b361d1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -248,7 +248,7 @@ private[spark] class Client( sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap } val amResources = yarnAMResources ++ - getYarnResourcesFromSparkResources(SPARK_DRIVER_RESOURCE_PREFIX, sparkConf) + getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf) logDebug(s"AM resources: $amResources") val appContext = newApp.getApplicationSubmissionContext appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark")) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index 7480dd8..cb0c68d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceID import org.apache.spark.util.{CausedBy, Utils} /** @@ -68,13 +69,13 @@ private object ResourceRequestHelper extends Logging { (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"), (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"), (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"), - (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}", + (ResourceID(SPARK_EXECUTOR_PREFIX, "fpga").amountConf, s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"), - (s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}", + (ResourceID(SPARK_DRIVER_PREFIX, "fpga").amountConf, s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"), - (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}", + (ResourceID(SPARK_EXECUTOR_PREFIX, "gpu").amountConf, s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"), - (s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}", + (ResourceID(SPARK_DRIVER_PREFIX, "gpu").amountConf, s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}")) val errorMessage = new mutable.StringBuilder() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 069b287..6e634b9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -143,7 +143,7 @@ private[yarn] class YarnAllocator( private val executorResourceRequests = sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap ++ - getYarnResourcesFromSparkResources(SPARK_EXECUTOR_RESOURCE_PREFIX, sparkConf) + getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf) // Resource capability requested for each executor private[yarn] val resource: Resource = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 0f11820..6b87eec 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -26,8 +26,9 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils +import org.apache.spark.resource.ResourceID +import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.util.Utils object YarnSparkHadoopUtil { @@ -53,10 +54,9 @@ object YarnSparkHadoopUtil { confPrefix: String, sparkConf: SparkConf ): Map[String, String] = { - Map("gpu" -> YARN_GPU_RESOURCE_CONFIG, "fpga" -> YARN_FPGA_RESOURCE_CONFIG).map { + Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map { case (rName, yarnName) => - val resourceCountSparkConf = s"${confPrefix}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}" - (yarnName -> sparkConf.get(resourceCountSparkConf, "0")) + (yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, "0")) }.filter { case (_, count) => count.toLong > 0 } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index 380da36..2e5748b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -68,7 +68,8 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFile) + arguments.hostname, arguments.cores, arguments.userClassPath, env, + arguments.resourcesFileOpt) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 884e0f5..d5f1992 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -23,7 +23,6 @@ import java.util.Properties import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -42,6 +41,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceID import org.apache.spark.util.{SparkConfWithEnv, Utils} class ClientSuite extends SparkFunSuite with Matchers { @@ -400,7 +400,7 @@ class ClientSuite extends SparkFunSuite with Matchers { conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2") } resources.values.foreach { rName => - conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3") + conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") } val error = intercept[SparkException] { @@ -423,7 +423,7 @@ class ClientSuite extends SparkFunSuite with Matchers { conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}", "2") } resources.values.foreach { rName => - conf.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3") + conf.set(ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3") } val error = intercept[SparkException] { @@ -447,7 +447,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") resources.values.foreach { rName => - conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3") + conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") } // also just set yarn one that we don't convert conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 0040369..ca89af2 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ +import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.SplitInfo import org.apache.spark.util.ManualClock @@ -187,10 +188,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource) ResourceRequestTestHelper.initializeResourceTypes(yarnResources) val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) - val sparkResources = - Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> "3", - s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> "2", + Map(EXECUTOR_GPU_ID.amountConf -> "3", + EXECUTOR_FPGA_ID.amountConf -> "2", s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5") val handler = createAllocator(1, mockAmClient, sparkResources) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org