This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2d5de25 [SPARK-29415][CORE] Stage Level Sched: Add base ResourceProfile and Request classes 2d5de25 is described below commit 2d5de25a999e0e5580cf4024937b61e6c9265672 Author: Thomas Graves <tgra...@nvidia.com> AuthorDate: Mon Nov 25 09:36:39 2019 -0600 [SPARK-29415][CORE] Stage Level Sched: Add base ResourceProfile and Request classes ### What changes were proposed in this pull request? This PR is adding the base classes needed for Stage level scheduling. Its adding a ResourceProfile and the executor and task resource request classes. These are made private for now until we get all the parts implemented, at which point this will become public interfaces. I am adding them first as all the other subtasks for this feature require these classes. If people have better ideas on breaking this feature up please let me know. See https://issues.apache.org/jira/browse/SPARK-29415 for more detailed design. ### Why are the changes needed? New API for stage level scheduling. Its easier to add these first because the other jira for this features will all use them. ### Does this PR introduce any user-facing change? Yes adds API to create a ResourceProfile with executor/task resources, see the spip jira https://issues.apache.org/jira/browse/SPARK-27495 Example of the api: val rp = new ResourceProfile() rp.require(new ExecutorResourceRequest("cores", 2)) rp.require(new ExecutorResourceRequest("gpu", 1, Some("/opt/gpuScripts/getGpus"))) rp.require(new TaskResourceRequest("gpu", 1)) ### How was this patch tested? Tested using Unit tests added with this PR. Closes #26284 from tgravescs/SPARK-29415. Authored-by: Thomas Graves <tgra...@nvidia.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../spark/resource/ExecutorResourceRequest.scala | 77 ++++++++++ .../spark/resource/ExecutorResourceRequests.scala | 122 +++++++++++++++ .../apache/spark/resource/ResourceProfile.scala | 147 ++++++++++++++++++ .../org/apache/spark/resource/ResourceUtils.scala | 7 +- .../spark/resource/TaskResourceRequest.scala | 43 ++++++ .../spark/resource/TaskResourceRequests.scala | 75 ++++++++++ .../spark/resource/JavaResourceProfileSuite.java | 66 ++++++++ .../spark/resource/ResourceProfileSuite.scala | 166 +++++++++++++++++++++ 8 files changed, 701 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala new file mode 100644 index 0000000..88ceaad --- /dev/null +++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import scala.collection.mutable + +import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT + +/** + * An Executor resource request. This is used in conjunction with the ResourceProfile to + * programmatically specify the resources needed for an RDD that will be applied at the + * stage level. + * + * This is used to specify what the resource requirements are for an Executor and how + * Spark can find out specific details about those resources. Not all the parameters are + * required for every resource type. The resources names supported + * correspond to the regular Spark configs with the prefix removed. For instance overhead + * memory in this api is memoryOverhead, which is spark.executor.memoryOverhead with + * spark.executor removed. Resources like GPUs are resource.gpu + * (spark configs spark.executor.resource.gpu.*). The amount, discoveryScript, and vendor + * parameters for resources are all the same parameters a user would specify through the + * configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}. + * + * For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has + * to specify the resource name (resource.gpu), the amount or number of GPUs per Executor, + * the discovery script would be specified so that when the Executor starts up it can + * discovery what GPU addresses are available for it to use because YARN doesn't tell + * Spark that, then vendor would not be used because its specific for Kubernetes. + * + * See the configuration and cluster specific docs for more details. + * + * Use ExecutorResourceRequests class as a convenience API. + * + * @param resourceName Name of the resource + * @param amount Amount requesting + * @param discoveryScript Optional script used to discover the resources. This is required on some + * cluster managers that don't tell Spark the addresses of the resources + * allocated. The script runs on Executors startup to discover the addresses + * of the resources available. + * @param vendor Optional vendor, required for some cluster managers + * + * This api is currently private until the rest of the pieces are in place and then it + * will become public. + */ +private[spark] class ExecutorResourceRequest( + val resourceName: String, + val amount: Long, + val discoveryScript: String = "", + val vendor: String = "") extends Serializable { + + // A list of allowed Spark internal resources. Custom resources (spark.executor.resource.*) + // like GPUs/FPGAs are also allowed, see the check below. + private val allowedExecutorResources = mutable.HashSet[String]( + ResourceProfile.MEMORY, + ResourceProfile.OVERHEAD_MEM, + ResourceProfile.PYSPARK_MEM, + ResourceProfile.CORES) + + if (!allowedExecutorResources.contains(resourceName) && !resourceName.startsWith(RESOURCE_DOT)) { + throw new IllegalArgumentException(s"Executor resource not allowed: $resourceName") + } +} diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala new file mode 100644 index 0000000..6ffcc0c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import scala.collection.mutable + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.resource.ResourceProfile._ + +/** + * A set of Executor resource requests. This is used in conjunction with the ResourceProfile to + * programmatically specify the resources needed for an RDD that will be applied at the + * stage level. + * + * This api is currently private until the rest of the pieces are in place and then it + * will become public. + */ +private[spark] class ExecutorResourceRequests() extends Serializable { + + private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]() + + def requests: Map[String, ExecutorResourceRequest] = _executorResources.toMap + + /** + * Specify heap memory. The value specified will be converted to MiB. + * + * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g). + * Default unit is MiB if not specified. + */ + def memory(amount: String): this.type = { + val amountMiB = JavaUtils.byteStringAsMb(amount) + val rr = new ExecutorResourceRequest(MEMORY, amountMiB) + _executorResources(MEMORY) = rr + this + } + + /** + * Specify overhead memory. The value specified will be converted to MiB. + * + * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g). + * Default unit is MiB if not specified. + */ + def memoryOverhead(amount: String): this.type = { + val amountMiB = JavaUtils.byteStringAsMb(amount) + val rr = new ExecutorResourceRequest(OVERHEAD_MEM, amountMiB) + _executorResources(OVERHEAD_MEM) = rr + this + } + + /** + * Specify pyspark memory. The value specified will be converted to MiB. + * + * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g). + * Default unit is MiB if not specified. + */ + def pysparkMemory(amount: String): this.type = { + val amountMiB = JavaUtils.byteStringAsMb(amount) + val rr = new ExecutorResourceRequest(PYSPARK_MEM, amountMiB) + _executorResources(PYSPARK_MEM) = rr + this + } + + /** + * Specify number of cores per Executor. + * + * @param amount Number of cores to allocate per Executor. + */ + def cores(amount: Int): this.type = { + val t = new ExecutorResourceRequest(CORES, amount) + _executorResources(CORES) = t + this + } + + /** + * Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported + * correspond to the regular Spark configs with the prefix removed. For instance, resources + * like GPUs are resource.gpu (spark configs spark.executor.resource.gpu.*) + * + * @param resourceName Name of the resource. + * @param amount amount of that resource per executor to use. + * @param discoveryScript Optional script used to discover the resources. This is required on + * some cluster managers that don't tell Spark the addresses of + * the resources allocated. The script runs on Executors startup to + * of the resources available. + * @param vendor Optional vendor, required for some cluster managers + */ + def resource( + resourceName: String, + amount: Long, + discoveryScript: String = "", + vendor: String = ""): this.type = { + // a bit weird but for Java api use empty string as meaning None because empty + // string is otherwise invalid for those paramters anyway + val eReq = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor) + _executorResources(resourceName) = eReq + this + } + + def addRequest(ereq: ExecutorResourceRequest): this.type = { + _executorResources(ereq.resourceName) = ereq + this + } + + override def toString: String = { + s"Executor resource requests: ${_executorResources}" + } +} diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala new file mode 100644 index 0000000..876a655 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import java.util.{Map => JMap} +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.annotation.Evolving +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceUtils.RESOURCE_PREFIX + +/** + * Resource profile to associate with an RDD. A ResourceProfile allows the user to + * specify executor and task requirements for an RDD that will get applied during a + * stage. This allows the user to change the resource requirements between stages. + * + * This class is private now for initial development, once we have the feature in place + * this will become public. + */ +@Evolving +private[spark] class ResourceProfile() extends Serializable { + + private val _id = ResourceProfile.getNextProfileId + private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]() + private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]() + + def id: Int = _id + def taskResources: Map[String, TaskResourceRequest] = _taskResources.toMap + def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.toMap + + /** + * (Java-specific) gets a Java Map of resources to TaskResourceRequest + */ + def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asJava + + /** + * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest + */ + def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = _executorResources.asJava + + def reset(): Unit = { + _taskResources.clear() + _executorResources.clear() + } + + def require(requests: ExecutorResourceRequests): this.type = { + _executorResources ++= requests.requests + this + } + + def require(requests: TaskResourceRequests): this.type = { + _taskResources ++= requests.requests + this + } + + override def toString(): String = { + s"Profile: id = ${_id}, executor resources: ${_executorResources}, " + + s"task resources: ${_taskResources}" + } +} + +private[spark] object ResourceProfile extends Logging { + val UNKNOWN_RESOURCE_PROFILE_ID = -1 + val DEFAULT_RESOURCE_PROFILE_ID = 0 + + val CPUS = "cpus" + val CORES = "cores" + val MEMORY = "memory" + val OVERHEAD_MEM = "memoryOverhead" + val PYSPARK_MEM = "pyspark.memory" + + private lazy val nextProfileId = new AtomicInteger(0) + + // The default resource profile uses the application level configs. + // Create the default profile immediately to get ID 0, its initialized later when fetched. + private val defaultProfileRef: AtomicReference[ResourceProfile] = + new AtomicReference[ResourceProfile](new ResourceProfile()) + + assert(defaultProfileRef.get().id == DEFAULT_RESOURCE_PROFILE_ID, + s"Default Profile must have the default profile id: $DEFAULT_RESOURCE_PROFILE_ID") + + def getNextProfileId: Int = nextProfileId.getAndIncrement() + + def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = { + val defaultProf = defaultProfileRef.get() + // check to see if the default profile was initialized yet + if (defaultProf.executorResources == Map.empty) { + synchronized { + val prof = defaultProfileRef.get() + if (prof.executorResources == Map.empty) { + addDefaultTaskResources(prof, conf) + addDefaultExecutorResources(prof, conf) + } + prof + } + } else { + defaultProf + } + } + + private def addDefaultTaskResources(rprof: ResourceProfile, conf: SparkConf): Unit = { + val cpusPerTask = conf.get(CPUS_PER_TASK) + val treqs = new TaskResourceRequests().cpus(cpusPerTask) + val taskReq = ResourceUtils.parseResourceRequirements(conf, SPARK_TASK_PREFIX) + taskReq.foreach { req => + val name = s"${RESOURCE_PREFIX}.${req.resourceName}" + treqs.resource(name, req.amount) + } + rprof.require(treqs) + } + + private def addDefaultExecutorResources(rprof: ResourceProfile, conf: SparkConf): Unit = { + val ereqs = new ExecutorResourceRequests() + ereqs.cores(conf.get(EXECUTOR_CORES)) + ereqs.memory(conf.get(EXECUTOR_MEMORY).toString) + val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX) + execReq.foreach { req => + val name = s"${RESOURCE_PREFIX}.${req.id.resourceName}" + ereqs.resource(name, req.amount, req.discoveryScript.getOrElse(""), + req.vendor.getOrElse("")) + } + rprof.require(ereqs) + } + + // for testing purposes + def resetDefaultProfile(conf: SparkConf): Unit = getOrCreateDefaultProfile(conf).reset() +} diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index e5ae7a9..ce4fd05 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -36,7 +36,7 @@ import org.apache.spark.util.Utils.executeAndGetOutput * @param resourceName gpu, fpga, etc */ private[spark] case class ResourceID(componentName: String, resourceName: String) { - def confPrefix: String = s"$componentName.resource.$resourceName." // with ending dot + def confPrefix: String = s"$componentName.${ResourceUtils.RESOURCE_PREFIX}.$resourceName." def amountConf: String = s"$confPrefix${ResourceUtils.AMOUNT}" def discoveryScriptConf: String = s"$confPrefix${ResourceUtils.DISCOVERY_SCRIPT}" def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}" @@ -111,7 +111,7 @@ private[spark] object ResourceUtils extends Logging { } def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = { - sparkConf.getAllWithPrefix(s"$componentName.resource.").map { case (key, _) => + sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_DOT").map { case (key, _) => key.substring(0, key.indexOf('.')) }.toSet.toSeq.map(name => ResourceID(componentName, name)) } @@ -258,4 +258,7 @@ private[spark] object ResourceUtils extends Logging { // known types of resources final val GPU: String = "gpu" final val FPGA: String = "fpga" + + final val RESOURCE_PREFIX: String = "resource" + final val RESOURCE_DOT: String = s"$RESOURCE_PREFIX." } diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala new file mode 100644 index 0000000..22eda52 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import scala.collection.mutable + +import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT + +/** + * A task resource request. This is used in conjuntion with the ResourceProfile to + * programmatically specify the resources needed for an RDD that will be applied at the + * stage level. + * + * Use TaskResourceRequests class as a convenience API. + * + * This api is currently private until the rest of the pieces are in place and then it + * will become public. + */ +private[spark] class TaskResourceRequest(val resourceName: String, val amount: Double) + extends Serializable { + + assert(amount <= 0.5 || amount % 1 == 0, + s"The resource amount ${amount} must be either <= 0.5, or a whole number.") + + if (!resourceName.equals(ResourceProfile.CPUS) && !resourceName.startsWith(RESOURCE_DOT)) { + throw new IllegalArgumentException(s"Task resource not allowed: $resourceName") + } +} diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala new file mode 100644 index 0000000..21cbc5d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import scala.collection.mutable + +import org.apache.spark.resource.ResourceProfile._ +import org.apache.spark.resource.ResourceUtils._ + +/** + * A set of task resource requests. This is used in conjuntion with the ResourceProfile to + * programmatically specify the resources needed for an RDD that will be applied at the + * stage level. + * + * This api is currently private until the rest of the pieces are in place and then it + * will become public. + */ +private[spark] class TaskResourceRequests() extends Serializable { + + private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]() + + def requests: Map[String, TaskResourceRequest] = _taskResources.toMap + + /** + * Specify number of cpus per Task. + * + * @param amount Number of cpus to allocate per Task. + */ + def cpus(amount: Int): this.type = { + val t = new TaskResourceRequest(CPUS, amount) + _taskResources(CPUS) = t + this + } + + /** + * Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported + * correspond to the regular Spark configs with the prefix removed. For instance, resources + * like GPUs are resource.gpu (spark configs spark.task.resource.gpu.*) + * + * @param resourceName Name of the resource. + * @param amount Amount requesting as a Double to support fractional resource requests. + * Valid values are less than or equal to 0.5 or whole numbers. This essentially + * lets you configure X number of tasks to run on a single resource, + * ie amount equals 0.5 translates into 2 tasks per resource address. + */ + def resource(rName: String, amount: Double): this.type = { + val t = new TaskResourceRequest(rName, amount) + _taskResources(rName) = t + this + } + + def addRequest(treq: TaskResourceRequest): this.type = { + _taskResources(treq.resourceName) = treq + this + } + + override def toString: String = { + s"Task resource requests: ${_taskResources}" + } +} diff --git a/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java new file mode 100644 index 0000000..0771207 --- /dev/null +++ b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource; + +import java.util.Map; + +import static org.junit.Assert.*; +import org.junit.Test; + +// Test the ResourceProfile and Request api's from Java +public class JavaResourceProfileSuite { + + String GpuResource = "resource.gpu"; + String FPGAResource = "resource.fpga"; + + @Test + public void testResourceProfileAccessFromJava() throws Exception { + ExecutorResourceRequests execReqGpu = + new ExecutorResourceRequests().resource(GpuResource, 2,"myscript", ""); + ExecutorResourceRequests execReqFpga = + new ExecutorResourceRequests().resource(FPGAResource, 3, "myfpgascript", "nvidia"); + + ResourceProfile rprof = new ResourceProfile(); + rprof.require(execReqGpu); + rprof.require(execReqFpga); + TaskResourceRequests taskReq1 = new TaskResourceRequests().resource(GpuResource, 1); + rprof.require(taskReq1); + + assertEquals(rprof.executorResources().size(), 2); + Map<String, ExecutorResourceRequest> eresources = rprof.executorResourcesJMap(); + assert(eresources.containsKey(GpuResource)); + ExecutorResourceRequest gpuReq = eresources.get(GpuResource); + assertEquals(gpuReq.amount(), 2); + assertEquals(gpuReq.discoveryScript(), "myscript"); + assertEquals(gpuReq.vendor(), ""); + + assert(eresources.containsKey(FPGAResource)); + ExecutorResourceRequest fpgaReq = eresources.get(FPGAResource); + assertEquals(fpgaReq.amount(), 3); + assertEquals(fpgaReq.discoveryScript(), "myfpgascript"); + assertEquals(fpgaReq.vendor(), "nvidia"); + + assertEquals(rprof.taskResources().size(), 1); + Map<String, TaskResourceRequest> tresources = rprof.taskResourcesJMap(); + assert(tresources.containsKey(GpuResource)); + TaskResourceRequest taskReq = tresources.get(GpuResource); + assertEquals(taskReq.amount(), 1.0, 0); + assertEquals(taskReq.resourceName(), GpuResource); + } +} + diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala new file mode 100644 index 0000000..a087f18 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class ResourceProfileSuite extends SparkFunSuite { + + override def afterEach() { + try { + ResourceProfile.resetDefaultProfile(new SparkConf) + } finally { + super.afterEach() + } + } + + test("Default ResourceProfile") { + val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf) + assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + assert(rprof.executorResources.size === 2, + "Executor resources should contain cores and memory by default") + assert(rprof.executorResources(ResourceProfile.CORES).amount === 1, + s"Executor resources should have 1 core") + assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 1024, + s"Executor resources should have 1024 memory") + assert(rprof.taskResources.size === 1, + "Task resources should just contain cpus by default") + assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1, + s"Task resources should have 1 cpu") + } + + test("Default ResourceProfile with app level resources specified") { + val conf = new SparkConf + conf.set("spark.task.resource.gpu.amount", "1") + conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.amount", "1") + conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.discoveryScript", "nameOfScript") + val rprof = ResourceProfile.getOrCreateDefaultProfile(conf) + assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + val execResources = rprof.executorResources + assert(execResources.size === 3, + "Executor resources should contain cores, memory, and gpu " + execResources) + assert(rprof.taskResources.size === 2, + "Task resources should just contain cpus and gpu") + assert(execResources.contains("resource.gpu"), "Executor resources should have gpu") + assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu") + } + + test("Create ResourceProfile") { + val rprof = new ResourceProfile() + assert(rprof.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + assert(rprof.executorResources === Map.empty) + assert(rprof.taskResources === Map.empty) + + val taskReq = new TaskResourceRequests().resource("resource.gpu", 1) + val eReq = new ExecutorResourceRequests().resource("resource.gpu", 2, "myscript", "nvidia") + rprof.require(taskReq).require(eReq) + + assert(rprof.executorResources.size === 1) + assert(rprof.executorResources.contains("resource.gpu"), + "Executor resources should have gpu") + assert(rprof.executorResources.get("resource.gpu").get.vendor === "nvidia", + "gpu vendor should be nvidia") + assert(rprof.executorResources.get("resource.gpu").get.discoveryScript === "myscript", + "discoveryScript should be myscript") + assert(rprof.executorResources.get("resource.gpu").get.amount === 2, + "gpu amount should be 2") + + assert(rprof.taskResources.size === 1, "Should have 1 task resource") + assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu") + assert(rprof.taskResources.get("resource.gpu").get.amount === 1, + "Task resources should have 1 gpu") + + val ereqs = new ExecutorResourceRequests() + ereqs.cores(2).memory("4096") + ereqs.memoryOverhead("2048").pysparkMemory("1024") + val treqs = new TaskResourceRequests() + treqs.cpus(1) + + rprof.require(treqs) + rprof.require(ereqs) + + assert(rprof.executorResources.size === 5) + assert(rprof.executorResources(ResourceProfile.CORES).amount === 2, + s"Executor resources should have 2 cores") + assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096, + s"Executor resources should have 4096 memory") + assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2048, + s"Executor resources should have 2048 overhead memory") + assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024, + s"Executor resources should have 1024 pyspark memory") + + assert(rprof.taskResources.size === 2) + assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu") + + val error = intercept[IllegalArgumentException] { + rprof.require(new ExecutorResourceRequests().resource("bogusResource", 1)) + }.getMessage() + assert(error.contains("Executor resource not allowed")) + + val taskError = intercept[IllegalArgumentException] { + rprof.require(new TaskResourceRequests().resource("bogusTaskResource", 1)) + }.getMessage() + assert(taskError.contains("Task resource not allowed")) + } + + test("Test ExecutorResourceRequests memory helpers") { + val rprof = new ResourceProfile() + val ereqs = new ExecutorResourceRequests() + ereqs.memory("4g") + ereqs.memoryOverhead("2000m").pysparkMemory("512000k") + rprof.require(ereqs) + + assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096, + s"Executor resources should have 4096 memory") + assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2000, + s"Executor resources should have 2000 overhead memory") + assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500, + s"Executor resources should have 512 pyspark memory") + } + + test("Test TaskResourceRequest fractional") { + val rprof = new ResourceProfile() + val treqs = new TaskResourceRequests().resource("resource.gpu", 0.33) + rprof.require(treqs) + + assert(rprof.taskResources.size === 1, "Should have 1 task resource") + assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu") + assert(rprof.taskResources.get("resource.gpu").get.amount === 0.33, + "Task resources should have 0.33 gpu") + + val fpgaReqs = new TaskResourceRequests().resource("resource.fpga", 4.0) + rprof.require(fpgaReqs) + + assert(rprof.taskResources.size === 2, "Should have 2 task resource") + assert(rprof.taskResources.contains("resource.fpga"), "Task resources should have gpu") + assert(rprof.taskResources.get("resource.fpga").get.amount === 4.0, + "Task resources should have 4.0 gpu") + + var taskError = intercept[AssertionError] { + rprof.require(new TaskResourceRequests().resource("resource.gpu", 1.5)) + }.getMessage() + assert(taskError.contains("The resource amount 1.5 must be either <= 0.5, or a whole number.")) + + taskError = intercept[AssertionError] { + rprof.require(new TaskResourceRequests().resource("resource.gpu", 0.7)) + }.getMessage() + assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number.")) + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org