This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ae2e00e8f8e9 [SPARK-45527][CORE] Use fraction to do the resource calculation ae2e00e8f8e9 is described below commit ae2e00e8f8e96fe85ecd5539aac0803216b8e66f Author: Bobby Wang <wbo4...@gmail.com> AuthorDate: Thu Jan 4 09:19:22 2024 -0600 [SPARK-45527][CORE] Use fraction to do the resource calculation ### What changes were proposed in this pull request? This (PR) introduces the utilization of fractions instead of slots, which is similar to the CPU strategy, for determining whether a worker offer can provide the necessary resources to tasks. For instance, when an executor reports to the driver with [gpu, ["1,", "2"]], the driver constructs an executor data map. The keys in the map represent the GPU addresses, and their default values are set to 1.0, indicating one whole GPU. Consequently, the available resource amounts for the executor are as follows: { "1" -> 1.0f, "2" -> 1.0f }. When offering resources to a task that requires 1 CPU and 0.08 GPU, the worker offer examines the available resource amounts. It identifies that the capacity of GPU address "1.0" is greater than the task's GPU requirement (1.0 >= 0.08). Therefore, Spark assigns the GPU address "1" to this task. After the assignment, the available resource amounts for this executor are updated to { "1" -> 0.92, "2" -> 1.0}, ensuring that the remaining resources can be allocated to other tasks. In scenarios where other tasks, using different task resource profiles, request varying GPU amounts when dynamic allocation is disabled, Spark applies the same comparison approach. It compares the task's GPU requirement with the available resource amounts to determine if the resources can be assigned to the task. ### Why are the changes needed? The existing resources offering including gpu, fpga is based on "slots per address", which is defined by the default resource profile. and it's a fixed number for all different resource profiles when dynamic allcation is disabled. Consider the below test case, ``` scala withTempDir { dir => val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""") val conf = new SparkConf() .setAppName("test") .setMaster("local-cluster[1, 12, 1024]") .set("spark.executor.cores", "12") conf.set("spark.worker.resource.gpu.amount", "1") conf.set("spark.worker.resource.gpu.discoveryScript", scriptPath) conf.set("spark.executor.resource.gpu.amount", "1") conf.set("spark.task.resource.gpu.amount", "0.08") sc = new SparkContext(conf) val rdd = sc.range(0, 100, 1, 4) var rdd1 = rdd.repartition(3) val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0) val rp = new ResourceProfileBuilder().require(treqs).build rdd1 = rdd1.withResources(rp) assert(rdd1.collect().size === 100) } ``` During the initial stages, Spark generates a default resource profile based on the configurations. The calculation for determining the slots per GPU address is performed as "spark.executor.resource.gpu.amount / spark.task.resource.gpu.amount", resulting in a value of 12 (1/0.08 = 12). This means that Spark can accommodate up to 12 tasks running on each GPU address simultaneously. The job is then divided into two stages. The first stage, which consists of 4 tasks, runs concurrently based on the default resource profile. However, the second stage, comprising 3 tasks, runs sequentially using a new task resource profile. This new profile specifies that each task requires 1 CPU and 1.0 full GPU. In reality, the tasks in the second stage are running in parallel, which is the underlying issue. The problem lies in the line `new TaskResourceRequests().cpus(1).resource("gpu", 1.0)`. The value of 1.0 for the GPU, or any value below 1.0 (specifically, (0, 0.5] which is rounded up to 1.0, spark throws an exception if the value is in (0.5, 1)), is merely requesting the number of slots. In this case, it is requesting only 1 slot. Consequently, each task necessitates 1 CPU core and 1 GPU slot, resulting in all tasks running simultaneously. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? To ensure all tests got passed ### Was this patch authored or co-authored using generative AI tooling? No Closes #43494 from wbo4958/SPARK-45527. Authored-by: Bobby Wang <wbo4...@gmail.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../apache/spark/deploy/master/WorkerInfo.scala | 22 +- .../executor/CoarseGrainedExecutorBackend.scala | 15 +- .../scala/org/apache/spark/executor/Executor.scala | 6 +- .../apache/spark/resource/ResourceAllocator.scala | 118 +++-- .../apache/spark/resource/ResourceProfile.scala | 28 +- .../org/apache/spark/resource/ResourceUtils.scala | 13 +- .../spark/resource/TaskResourceRequest.scala | 6 +- .../spark/scheduler/ExecutorResourceInfo.scala | 21 +- .../spark/scheduler/ExecutorResourcesAmounts.scala | 217 ++++++++ .../apache/spark/scheduler/TaskDescription.scala | 39 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 51 +- .../apache/spark/scheduler/TaskSetManager.scala | 8 +- .../org/apache/spark/scheduler/WorkerOffer.scala | 4 +- .../cluster/CoarseGrainedClusterMessage.scala | 4 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 28 +- .../scala/org/apache/spark/SparkConfSuite.scala | 2 +- .../CoarseGrainedExecutorBackendSuite.scala | 36 +- .../org/apache/spark/executor/ExecutorSuite.scala | 2 +- .../spark/resource/ResourceProfileSuite.scala | 36 +- .../apache/spark/resource/ResourceUtilsSuite.scala | 247 ++++++++++ .../CoarseGrainedSchedulerBackendSuite.scala | 5 +- .../scheduler/ExecutorResourceInfoSuite.scala | 132 ++++- .../scheduler/ExecutorResourcesAmountsSuite.scala | 545 +++++++++++++++++++++ .../spark/scheduler/TaskDescriptionSuite.scala | 21 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 477 +++++++++++++++++- .../spark/scheduler/TaskSetManagerSuite.scala | 10 +- 26 files changed, 1840 insertions(+), 253 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 4d475fa8a6e8..bdc9e9c6106c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -20,8 +20,8 @@ package org.apache.spark.deploy.master import scala.collection.mutable import org.apache.spark.resource.{ResourceAllocator, ResourceInformation, ResourceRequirement} +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String]) @@ -29,12 +29,20 @@ private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String override protected def resourceName = this.name override protected def resourceAddresses = this.addresses - override protected def slotsPerAddress: Int = 1 + /** + * Acquire the resources. + * @param amount How many addresses are requesting. + * @return ResourceInformation + */ def acquire(amount: Int): ResourceInformation = { - val allocated = availableAddrs.take(amount) - acquire(allocated) - new ResourceInformation(resourceName, allocated.toArray) + // Any available address from availableAddrs must be a whole resource + // since worker needs to do full resources to the executors. + val addresses = availableAddrs.take(amount) + assert(addresses.length == amount) + + acquire(addresses.map(addr => addr -> ONE_ENTIRE_RESOURCE).toMap) + new ResourceInformation(resourceName, addresses.toArray) } } @@ -163,7 +171,7 @@ private[spark] class WorkerInfo( */ def recoverResources(expected: Map[String, ResourceInformation]): Unit = { expected.foreach { case (rName, rInfo) => - resources(rName).acquire(rInfo.addresses.toImmutableArraySeq) + resources(rName).acquire(rInfo.addresses.map(addr => addr -> ONE_ENTIRE_RESOURCE).toMap) } } @@ -173,7 +181,7 @@ private[spark] class WorkerInfo( */ private def releaseResources(allocated: Map[String, ResourceInformation]): Unit = { allocated.foreach { case (rName, rInfo) => - resources(rName).release(rInfo.addresses.toImmutableArraySeq) + resources(rName).release(rInfo.addresses.map(addrs => addrs -> ONE_ENTIRE_RESOURCE).toMap) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 366b481bf6a4..b507d27f14c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -20,7 +20,6 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer import java.util.Locale -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.{Failure, Success} @@ -65,16 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend( private var _resources = Map.empty[String, ResourceInformation] - /** - * Map each taskId to the information about the resource allocated to it, Please refer to - * [[ResourceInformation]] for specifics. - * CHM is used to ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227) - * Exposed for testing only. - */ - private[executor] val taskResources = new ConcurrentHashMap[ - Long, Map[String, ResourceInformation] - ] - private var decommissioned = false // Track the last time in ns that at least one task is running. If no task is running and all @@ -192,7 +181,6 @@ private[spark] class CoarseGrainedExecutorBackend( } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) - taskResources.put(taskDesc.taskId, taskDesc.resources) executor.launchTask(this, taskDesc) } @@ -272,11 +260,10 @@ private[spark] class CoarseGrainedExecutorBackend( } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = { - val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation]) + val resources = executor.runningTasks.get(taskId).taskDescription.resources val cpus = executor.runningTasks.get(taskId).taskDescription.cpus val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources) if (TaskState.isFinished(state)) { - taskResources.remove(taskId) lastTaskFinishTime.set(System.nanoTime()) } driver match { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 12471915cd97..dae00a72285d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -619,13 +619,17 @@ private[spark] class Executor( threadMXBean.getCurrentThreadCpuTime } else 0L var threwException = true + // Convert resources amounts info to ResourceInformation + val resources = taskDescription.resources.map { case (rName, addressesAmounts) => + rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray) + } val value = Utils.tryWithSafeFinally { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem, cpus = taskDescription.cpus, - resources = taskDescription.resources, + resources = resources, plugins = plugins) threwException = false res diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala index 7b97d9704282..e9bb11721725 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala @@ -20,6 +20,49 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +private[spark] object ResourceAmountUtils { + /** + * Using "double" to do the resource calculation may encounter a problem of precision loss. Eg + * + * scala> val taskAmount = 1.0 / 9 + * taskAmount: Double = 0.1111111111111111 + * + * scala> var total = 1.0 + * total: Double = 1.0 + * + * scala> for (i <- 1 to 9 ) { + * | if (total >= taskAmount) { + * | total -= taskAmount + * | println(s"assign $taskAmount for task $i, total left: $total") + * | } else { + * | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total") + * | } + * | } + * assign 0.1111111111111111 for task 1, total left: 0.8888888888888888 + * assign 0.1111111111111111 for task 2, total left: 0.7777777777777777 + * assign 0.1111111111111111 for task 3, total left: 0.6666666666666665 + * assign 0.1111111111111111 for task 4, total left: 0.5555555555555554 + * assign 0.1111111111111111 for task 5, total left: 0.44444444444444425 + * assign 0.1111111111111111 for task 6, total left: 0.33333333333333315 + * assign 0.1111111111111111 for task 7, total left: 0.22222222222222204 + * assign 0.1111111111111111 for task 8, total left: 0.11111111111111094 + * ERROR Can't assign 0.1111111111111111 for task 9, total left: 0.11111111111111094 + * + * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation. + * Double can display up to 16 decimal places, so we set the factor to + * 10, 000, 000, 000, 000, 000L. + */ + final val ONE_ENTIRE_RESOURCE: Long = 10000000000000000L + + def isOneEntireResource(amount: Long): Boolean = amount == ONE_ENTIRE_RESOURCE + + def toInternalResource(amount: Double): Long = (amount * ONE_ENTIRE_RESOURCE).toLong + + def toFractionalResource(amount: Long): Double = amount.toDouble / ONE_ENTIRE_RESOURCE + +} /** * Trait used to help executor/worker allocate resources. @@ -29,59 +72,53 @@ private[spark] trait ResourceAllocator { protected def resourceName: String protected def resourceAddresses: Seq[String] - protected def slotsPerAddress: Int /** - * Map from an address to its availability, a value > 0 means the address is available, - * while value of 0 means the address is fully assigned. - * - * For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), this value - * can be a multiple, such that each address can be allocated up to [[slotsPerAddress]] - * times. + * Map from an address to its availability default to 1.0 (we multiply ONE_ENTIRE_RESOURCE + * to avoid precision error), a value > 0 means the address is available, while value of + * 0 means the address is fully assigned. */ private lazy val addressAvailabilityMap = { - mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*) + mutable.HashMap(resourceAddresses.map(address => address -> ONE_ENTIRE_RESOURCE): _*) } /** - * Sequence of currently available resource addresses. - * - * With [[slotsPerAddress]] greater than 1, [[availableAddrs]] can contain duplicate addresses - * e.g. with [[slotsPerAddress]] == 2, availableAddrs for addresses 0 and 1 can look like - * Seq("0", "0", "1"), where address 0 has two assignments available, and 1 has one. + * Get the amounts of resources that have been multiplied by ONE_ENTIRE_RESOURCE. + * @return the resources amounts + */ + def resourcesAmounts: Map[String, Long] = addressAvailabilityMap.toMap + + /** + * Sequence of currently available resource addresses which are not fully assigned. */ def availableAddrs: Seq[String] = addressAvailabilityMap - .flatMap { case (addr, available) => - (0 until available).map(_ => addr) - }.toSeq.sorted + .filter(addresses => addresses._2 > 0).keys.toSeq.sorted /** * Sequence of currently assigned resource addresses. - * - * With [[slotsPerAddress]] greater than 1, [[assignedAddrs]] can contain duplicate addresses - * e.g. with [[slotsPerAddress]] == 2, assignedAddrs for addresses 0 and 1 can look like - * Seq("0", "1", "1"), where address 0 was assigned once, and 1 was assigned twice. */ private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap - .flatMap { case (addr, available) => - (0 until slotsPerAddress - available).map(_ => addr) - }.toSeq.sorted + .filter(addresses => addresses._2 < ONE_ENTIRE_RESOURCE).keys.toSeq.sorted /** * Acquire a sequence of resource addresses (to a launched task), these addresses must be * available. When the task finishes, it will return the acquired resource addresses. * Throw an Exception if an address is not available or doesn't exist. */ - def acquire(addrs: Seq[String]): Unit = { - addrs.foreach { address => - val isAvailable = addressAvailabilityMap.getOrElse(address, + def acquire(addressesAmounts: Map[String, Long]): Unit = { + addressesAmounts.foreach { case (address, amount) => + val prevAmount = addressAvailabilityMap.getOrElse(address, throw new SparkException(s"Try to acquire an address that doesn't exist. $resourceName " + - s"address $address doesn't exist.")) - if (isAvailable > 0) { - addressAvailabilityMap(address) -= 1 + s"address $address doesn't exist.")) + + val left = prevAmount - amount + + if (left < 0) { + throw new SparkException(s"Try to acquire $resourceName address $address " + + s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}, but only " + + s"${ResourceAmountUtils.toFractionalResource(prevAmount)} left.") } else { - throw new SparkException("Try to acquire an address that is not available. " + - s"$resourceName address $address is not available.") + addressAvailabilityMap(address) = left } } } @@ -91,16 +128,21 @@ private[spark] trait ResourceAllocator { * addresses are released when a task has finished. * Throw an Exception if an address is not assigned or doesn't exist. */ - def release(addrs: Seq[String]): Unit = { - addrs.foreach { address => - val isAvailable = addressAvailabilityMap.getOrElse(address, + def release(addressesAmounts: Map[String, Long]): Unit = { + addressesAmounts.foreach { case (address, amount) => + val prevAmount = addressAvailabilityMap.getOrElse(address, throw new SparkException(s"Try to release an address that doesn't exist. $resourceName " + s"address $address doesn't exist.")) - if (isAvailable < slotsPerAddress) { - addressAvailabilityMap(address) += 1 + + val total = prevAmount + amount + + if (total > ONE_ENTIRE_RESOURCE) { + throw new SparkException(s"Try to release $resourceName address $address " + + s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}. But the total amount: " + + s"${ResourceAmountUtils.toFractionalResource(total)} " + + s"after release should be <= 1") } else { - throw new SparkException(s"Try to release an address that is not assigned. $resourceName " + - s"address $address is not assigned.") + addressAvailabilityMap(address) = total } } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 69c0672562c2..4a55b2f619e6 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -49,6 +49,8 @@ class ResourceProfile( val executorResources: Map[String, ExecutorResourceRequest], val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging { + validate() + // _id is only a var for testing purposes private var _id = ResourceProfile.getNextProfileId // This is used for any resources that use fractional amounts, the key is the resource name @@ -59,6 +61,19 @@ class ResourceProfile( private var _maxTasksPerExecutor: Option[Int] = None private var _coresLimitKnown: Boolean = false + /** + * Validate the ResourceProfile + */ + protected def validate(): Unit = { + // The task.amount in ResourceProfile falls within the range of 0 to 0.5, + // or it's a whole number + for ((_, taskReq) <- taskResources) { + val taskAmount = taskReq.amount + assert(taskAmount <= 0.5 || taskAmount % 1 == 0, + s"The task resource amount ${taskAmount} must be either <= 0.5, or a whole number.") + } + } + /** * A unique id of this ResourceProfile */ @@ -105,12 +120,8 @@ class ResourceProfile( /* * This function takes into account fractional amounts for the task resource requirement. - * Spark only supports fractional amounts < 1 to basically allow for multiple tasks - * to use the same resource address. - * The way the scheduler handles this is it adds the same address the number of slots per - * address times and then the amount becomes 1. This way it re-uses the same address - * the correct number of times. ie task requirement amount=0.25 -> addrs["0", "0", "0", "0"] - * and scheduler task amount=1. See ResourceAllocator.slotsPerAddress. + * Spark only supports fractional amounts < 1 to basically allow for multiple tasks + * to use the same resource address or a whole number to use the multiple whole addresses. */ private[spark] def getSchedulerTaskResourceAmount(resource: String): Int = { val taskAmount = taskResources.getOrElse(resource, @@ -280,6 +291,11 @@ private[spark] class TaskResourceProfile( override val taskResources: Map[String, TaskResourceRequest]) extends ResourceProfile(Map.empty, taskResources) { + // The task.amount in TaskResourceProfile falls within the range of 0 to 1.0, + // or it's a whole number, and it has been checked in the TaskResourceRequest. + // Therefore, we can safely skip this check. + override protected def validate(): Unit = {} + override protected[spark] def getCustomExecutorResources() : Map[String, ExecutorResourceRequest] = { if (SparkEnv.get == null) { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index a6f2ac35af7a..8718ce8ea083 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -169,18 +169,17 @@ private[spark] object ResourceUtils extends Logging { // Used to take a fraction amount from a task resource requirement and split into a real // integer amount and the number of slots per address. For instance, if the amount is 0.5, - // the we get (1, 2) back out. This indicates that for each 1 address, it has 2 slots per - // address, which allows you to put 2 tasks on that address. Note if amount is greater - // than 1, then the number of slots per address has to be 1. This would indicate that a - // would have multiple addresses assigned per task. This can be used for calculating - // the number of tasks per executor -> (executorAmount * numParts) / (integer amount). + // the we get (1, 2) back out. This indicates that for each 1 address, it allows you to + // put 2 tasks on that address. Note if amount is greater than 1, then the number of + // running tasks per address has to be 1. This can be used for calculating + // the number of tasks per executor = (executorAmount * numParts) / (integer amount). // Returns tuple of (integer amount, numParts) def calculateAmountAndPartsForFraction(doubleAmount: Double): (Int, Int) = { - val parts = if (doubleAmount <= 0.5) { + val parts = if (doubleAmount <= 1.0) { Math.floor(1.0 / doubleAmount).toInt } else if (doubleAmount % 1 != 0) { throw new SparkException( - s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole number.") + s"The resource amount ${doubleAmount} must be either <= 1.0, or a whole number.") } else { 1 } diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala index cbd57808213a..9fc0d93373b5 100644 --- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{Evolving, Since} * * @param resourceName Resource name * @param amount Amount requesting as a Double to support fractional resource requests. - * Valid values are less than or equal to 0.5 or whole numbers. This essentially + * Valid values are less than or equal to 1.0 or whole numbers. This essentially * lets you configure X number of tasks to run on a single resource, * ie amount equals 0.5 translates into 2 tasks per resource address. */ @@ -37,8 +37,8 @@ import org.apache.spark.annotation.{Evolving, Since} class TaskResourceRequest(val resourceName: String, val amount: Double) extends Serializable { - assert(amount <= 0.5 || amount % 1 == 0, - s"The resource amount ${amount} must be either <= 0.5, or a whole number.") + assert(amount <= 1.0 || amount % 1 == 0, + s"The resource amount ${amount} must be either <= 1.0, or a whole number.") override def equals(obj: Any): Boolean = { obj match { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index 508c6cebd9fe..d9fbd23f3aa4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -25,16 +25,27 @@ import org.apache.spark.resource.{ResourceAllocator, ResourceInformation} * information. * @param name Resource name * @param addresses Resource addresses provided by the executor - * @param numParts Number of ways each resource is subdivided when scheduling tasks */ private[spark] class ExecutorResourceInfo( name: String, - addresses: Seq[String], - numParts: Int) + addresses: Seq[String]) extends ResourceInformation(name, addresses.toArray) with ResourceAllocator { override protected def resourceName = this.name override protected def resourceAddresses = this.addresses - override protected def slotsPerAddress: Int = numParts - def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress + + /** + * Calculate how many parts the executor can offer according to the task resource amount + * @param taskAmount how many resource amount the task required + * @return the total parts + */ + def totalParts(taskAmount: Double): Int = { + assert(taskAmount > 0.0) + if (taskAmount >= 1.0) { + addresses.length / taskAmount.ceil.toInt + } else { + addresses.length * Math.floor(1.0 / taskAmount).toInt + } + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourcesAmounts.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourcesAmounts.scala new file mode 100644 index 000000000000..a93f2863ac2f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourcesAmounts.scala @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.HashMap + +import org.apache.spark.SparkException +import org.apache.spark.resource.{ResourceAmountUtils, ResourceProfile} +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +/** + * Class to hold information about a series of resources belonging to an executor. + * A resource could be a GPU, FPGA, etc. And it is used as a temporary + * class to calculate the resources amounts when offering resources to + * the tasks in the [[TaskSchedulerImpl]] + * + * One example is GPUs, where the addresses would be the indices of the GPUs + * + * @param resources The executor available resources and amount. eg, + * Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.2), + * "1" -> ResourceAmountUtils.toInternalResource(1.0)), + * "fpga" -> Map("a" -> ResourceAmountUtils.toInternalResource(0.3), + * "b" -> ResourceAmountUtils.toInternalResource(0.9)) + * ) + */ +private[spark] class ExecutorResourcesAmounts( + private val resources: Map[String, Map[String, Long]]) extends Serializable { + + /** + * convert the resources to be mutable HashMap + */ + private val internalResources: Map[String, HashMap[String, Long]] = { + resources.map { case (rName, addressAmounts) => + rName -> HashMap(addressAmounts.toSeq: _*) + } + } + + /** + * The total address count of each resource. Eg, + * Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.5), + * "1" -> ResourceAmountUtils.toInternalResource(0.5), + * "2" -> ResourceAmountUtils.toInternalResource(0.5)), + * "fpga" -> Map("a" -> ResourceAmountUtils.toInternalResource(0.5), + * "b" -> ResourceAmountUtils.toInternalResource(0.5))) + * the resourceAmount will be Map("gpu" -> 3, "fpga" -> 2) + */ + lazy val resourceAddressAmount: Map[String, Int] = internalResources.map { + case (rName, addressMap) => rName -> addressMap.size + } + + /** + * For testing purpose. convert internal resources back to the "fraction" resources. + */ + private[spark] def availableResources: Map[String, Map[String, Double]] = { + internalResources.map { case (rName, addressMap) => + rName -> addressMap.map { case (address, amount) => + address -> ResourceAmountUtils.toFractionalResource(amount) + }.toMap + } + } + + /** + * Acquire the resource. + * @param assignedResource the assigned resource information + */ + def acquire(assignedResource: Map[String, Map[String, Long]]): Unit = { + assignedResource.foreach { case (rName, taskResAmounts) => + val availableResourceAmounts = internalResources.getOrElse(rName, + throw new SparkException(s"Try to acquire an address from $rName that doesn't exist")) + taskResAmounts.foreach { case (address, amount) => + val prevInternalTotalAmount = availableResourceAmounts.getOrElse(address, + throw new SparkException(s"Try to acquire an address that doesn't exist. $rName " + + s"address $address doesn't exist.")) + + val left = prevInternalTotalAmount - amount + if (left < 0) { + throw new SparkException(s"The total amount " + + s"${ResourceAmountUtils.toFractionalResource(left)} " + + s"after acquiring $rName address $address should be >= 0") + } + internalResources(rName)(address) = left + } + } + } + + /** + * Release the assigned resources to the resource pool + * @param assignedResource resource to be released + */ + def release(assignedResource: Map[String, Map[String, Long]]): Unit = { + assignedResource.foreach { case (rName, taskResAmounts) => + val availableResourceAmounts = internalResources.getOrElse(rName, + throw new SparkException(s"Try to release an address from $rName that doesn't exist")) + taskResAmounts.foreach { case (address, amount) => + val prevInternalTotalAmount = availableResourceAmounts.getOrElse(address, + throw new SparkException(s"Try to release an address that is not assigned. $rName " + + s"address $address is not assigned.")) + val total = prevInternalTotalAmount + amount + if (total > ONE_ENTIRE_RESOURCE) { + throw new SparkException(s"The total amount " + + s"${ResourceAmountUtils.toFractionalResource(total)} " + + s"after releasing $rName address $address should be <= 1.0") + } + internalResources(rName)(address) = total + } + } + } + + /** + * Try to assign the addresses according to the task requirement. This function always goes + * through the available resources starting from the "small" address. If the resources amount + * of the address is matching the task requirement, we will assign this address to this task. + * Eg, assuming the available resources are {"gpu" -> {"0"-> 0.7, "1" -> 1.0}) and the + * task requirement is 0.5, this function will return Some(Map("gpu" -> {"0" -> 0.5})). + * + * TODO: as we consistently allocate addresses beginning from the "small" address, it can + * potentially result in an undesired consequence where a portion of the resource is being wasted. + * Eg, assuming the available resources are {"gpu" -> {"0"-> 1.0, "1" -> 0.5}) and the + * task amount requirement is 0.5, this function will return + * Some(Map("gpu" -> {"0" -> 0.5})), and the left available resource will be + * {"gpu" -> {"0"-> 0.5, "1" -> 0.5}) which can't assign to the task that + * requires > 0.5 any more. + * + * @param taskSetProf assign resources based on which resource profile + * @return the optional assigned resources amounts. returns None if any + * of the task requests for resources aren't met. + */ + def assignAddressesCustomResources(taskSetProf: ResourceProfile): + Option[Map[String, Map[String, Long]]] = { + // only look at the resource other than cpus + val tsResources = taskSetProf.getCustomTaskResources() + if (tsResources.isEmpty) { + return Some(Map.empty) + } + + val allocatedAddresses = HashMap[String, Map[String, Long]]() + + // Go through all resources here so that we can make sure they match and also get what the + // assignments are for the next task + for ((rName, taskReqs) <- tsResources) { + // TaskResourceRequest checks the task amount should be in (0, 1] or a whole number + var taskAmount = taskReqs.amount + + internalResources.get(rName) match { + case Some(addressesAmountMap) => + val allocatedAddressesMap = HashMap[String, Long]() + + // Always sort the addresses + val addresses = addressesAmountMap.keys.toSeq.sorted + + // task.amount is a whole number + if (taskAmount >= 1.0) { + for (address <- addresses if taskAmount > 0) { + // The address is still a whole resource + if (ResourceAmountUtils.isOneEntireResource(addressesAmountMap(address))) { + taskAmount -= 1.0 + // Assign the full resource of the address + allocatedAddressesMap(address) = ONE_ENTIRE_RESOURCE + } + } + } else if (taskAmount > 0.0) { // 0 < task.amount < 1.0 + val internalTaskAmount = ResourceAmountUtils.toInternalResource(taskAmount) + for (address <- addresses if taskAmount > 0) { + if (addressesAmountMap(address) >= internalTaskAmount) { + // Assign the part of the address. + allocatedAddressesMap(address) = internalTaskAmount + taskAmount = 0 + } + } + } + + if (taskAmount == 0 && allocatedAddressesMap.size > 0) { + allocatedAddresses.put(rName, allocatedAddressesMap.toMap) + } else { + return None + } + + case None => return None + } + } + Some(allocatedAddresses.toMap) + } + +} + +private[spark] object ExecutorResourcesAmounts { + + /** + * Create an empty ExecutorResourcesAmounts + */ + def empty: ExecutorResourcesAmounts = new ExecutorResourcesAmounts(Map.empty) + + /** + * Converts executor infos to ExecutorResourcesAmounts + */ + def apply(executorInfos: Map[String, ExecutorResourceInfo]): ExecutorResourcesAmounts = { + new ExecutorResourcesAmounts( + executorInfos.map { case (rName, rInfo) => rName -> rInfo.resourcesAmounts } + ) + } + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 75032086ead7..df5f32612bea 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -23,11 +23,10 @@ import java.nio.charset.StandardCharsets import java.util.Properties import scala.collection.immutable -import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.collection.mutable.{HashMap, Map} import scala.jdk.CollectionConverters._ import org.apache.spark.{JobArtifactSet, JobArtifactState} -import org.apache.spark.resource.ResourceInformation import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} /** @@ -57,7 +56,10 @@ private[spark] class TaskDescription( val artifacts: JobArtifactSet, val properties: Properties, val cpus: Int, - val resources: immutable.Map[String, ResourceInformation], + // resources is the total resources assigned to the task + // Eg, Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.7))): + // assign 0.7 of the gpu address "0" to this task + val resources: immutable.Map[String, immutable.Map[String, Long]], val serializedTask: ByteBuffer) { assert(cpus > 0, "CPUs per task should be > 0") @@ -74,14 +76,16 @@ private[spark] object TaskDescription { } } - private def serializeResources(map: immutable.Map[String, ResourceInformation], + private def serializeResources(map: immutable.Map[String, immutable.Map[String, Long]], dataOut: DataOutputStream): Unit = { dataOut.writeInt(map.size) - map.foreach { case (key, value) => - dataOut.writeUTF(key) - dataOut.writeUTF(value.name) - dataOut.writeInt(value.addresses.length) - value.addresses.foreach(dataOut.writeUTF(_)) + map.foreach { case (rName, addressAmountMap) => + dataOut.writeUTF(rName) + dataOut.writeInt(addressAmountMap.size) + addressAmountMap.foreach { case (address, amount) => + dataOut.writeUTF(address) + dataOut.writeLong(amount) + } } } @@ -172,21 +176,22 @@ private[spark] object TaskDescription { } private def deserializeResources(dataIn: DataInputStream): - immutable.Map[String, ResourceInformation] = { - val map = new HashMap[String, ResourceInformation]() + immutable.Map[String, immutable.Map[String, Long]] = { + val map = new HashMap[String, immutable.Map[String, Long]]() val mapSize = dataIn.readInt() var i = 0 while (i < mapSize) { val resType = dataIn.readUTF() - val name = dataIn.readUTF() - val numIdentifier = dataIn.readInt() - val identifiers = new ArrayBuffer[String](numIdentifier) + val addressAmountMap = new HashMap[String, Long]() + val addressAmountSize = dataIn.readInt() var j = 0 - while (j < numIdentifier) { - identifiers += dataIn.readUTF() + while (j < addressAmountSize) { + val address = dataIn.readUTF() + val amount = dataIn.readLong() + addressAmountMap(address) = amount j += 1 } - map(resType) = new ResourceInformation(name, identifiers.toArray) + map.put(resType, addressAmountMap.toMap) i += 1 } map.toMap diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 454e7ed3ce61..21f62097a4bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import com.google.common.cache.CacheBuilder @@ -35,7 +35,7 @@ import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ -import org.apache.spark.resource.{ResourceInformation, ResourceProfile} +import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEndpoint import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality @@ -389,7 +389,7 @@ private[spark] class TaskSchedulerImpl( maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], - availableResources: Array[Map[String, Buffer[String]]], + availableResources: Array[ExecutorResourcesAmounts], tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : (Boolean, Option[TaskLocality]) = { var noDelayScheduleRejects = true @@ -429,13 +429,7 @@ private[spark] class TaskSchedulerImpl( minLaunchedLocality = minTaskLocality(minLaunchedLocality, Some(locality)) availableCpus(i) -= taskCpus assert(availableCpus(i) >= 0) - resources.foreach { case (rName, rInfo) => - // Remove the first n elements from availableResources addresses, these removed - // addresses are the same as that we allocated in taskResourceAssignments since it's - // synchronized. We don't remove the exact addresses allocated because the current - // approach produces the identical result with less time complexity. - availableResources(i)(rName).remove(0, rInfo.addresses.length) - } + availableResources(i).acquire(resources) } } catch { case e: TaskNotSerializableException => @@ -468,33 +462,14 @@ private[spark] class TaskSchedulerImpl( private def resourcesMeetTaskRequirements( taskSet: TaskSetManager, availCpus: Int, - availWorkerResources: Map[String, Buffer[String]] - ): Option[Map[String, ResourceInformation]] = { + availWorkerResources: ExecutorResourcesAmounts): Option[Map[String, Map[String, Long]]] = { val rpId = taskSet.taskSet.resourceProfileId val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, conf) // check if the ResourceProfile has cpus first since that is common case if (availCpus < taskCpus) return None // only look at the resource other than cpus - val tsResources = taskSetProf.getCustomTaskResources() - if (tsResources.isEmpty) return Some(Map.empty) - val localTaskReqAssign = HashMap[String, ResourceInformation]() - // we go through all resources here so that we can make sure they match and also get what the - // assignments are for the next task - for ((rName, taskReqs) <- tsResources) { - val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName) - availWorkerResources.get(rName) match { - case Some(workerRes) => - if (workerRes.size >= taskAmount) { - localTaskReqAssign.put(rName, new ResourceInformation(rName, - workerRes.take(taskAmount).toArray)) - } else { - return None - } - case None => return None - } - } - Some(localTaskReqAssign.toMap) + availWorkerResources.assignAddressesCustomResources(taskSetProf) } private def minTaskLocality( @@ -576,13 +551,8 @@ private[spark] class TaskSchedulerImpl( // value is -1 val numBarrierSlotsAvailable = if (taskSet.isBarrier) { val rpId = taskSet.taskSet.resourceProfileId - val availableResourcesAmount = availableResources.map { resourceMap => - // available addresses already takes into account if there are fractional - // task resource requests - resourceMap.map { case (name, addresses) => (name, addresses.length) } - } - calculateAvailableSlots(this, conf, rpId, resourceProfileIds, availableCpus, - availableResourcesAmount) + val resAmounts = availableResources.map(_.resourceAddressAmount) + calculateAvailableSlots(this, conf, rpId, resourceProfileIds, availableCpus, resAmounts) } else { -1 } @@ -715,9 +685,8 @@ private[spark] class TaskSchedulerImpl( barrierPendingLaunchTasks.foreach { task => // revert all assigned resources availableCpus(task.assignedOfferIndex) += task.assignedCores - task.assignedResources.foreach { case (rName, rInfo) => - availableResources(task.assignedOfferIndex)(rName).appendAll(rInfo.addresses) - } + availableResources(task.assignedOfferIndex).release( + task.assignedResources) // re-add the task to the schedule pending list taskSet.addPendingTask(task.index) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d17e6735c4ec..e15ba28eeda0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -21,7 +21,6 @@ import java.io.NotSerializableException import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, TimeUnit} -import scala.collection.immutable.Map import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.math.max import scala.util.control.NonFatal @@ -33,7 +32,6 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ -import org.apache.spark.resource.ResourceInformation import org.apache.spark.scheduler.SchedulingMode._ import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils} import org.apache.spark.util.collection.PercentileHeap @@ -444,7 +442,7 @@ private[spark] class TaskSetManager( host: String, maxLocality: TaskLocality.TaskLocality, taskCpus: Int = sched.CPUS_PER_TASK, - taskResourceAssignments: Map[String, ResourceInformation] = Map.empty) + taskResourceAssignments: Map[String, Map[String, Long]] = Map.empty) : (Option[TaskDescription], Boolean, Int) = { val offerExcluded = taskSetExcludelistHelperOpt.exists { excludeList => @@ -512,7 +510,7 @@ private[spark] class TaskSetManager( taskLocality: TaskLocality.Value, speculative: Boolean, taskCpus: Int, - taskResourceAssignments: Map[String, ResourceInformation], + taskResourceAssignments: Map[String, Map[String, Long]], launchTime: Long): TaskDescription = { // Found a task; do some bookkeeping and return a task description val task = tasks(index) @@ -1381,7 +1379,7 @@ private[scheduler] case class BarrierPendingLaunchTask( host: String, index: Int, taskLocality: TaskLocality.TaskLocality, - assignedResources: Map[String, ResourceInformation]) { + assignedResources: Map[String, Map[String, Long]]) { // Stored the corresponding index of the WorkerOffer which is responsible to launch the task. // Used to revert the assigned resources (e.g., cores, custome resources) when the barrier // task set doesn't launch successfully in a single resourceOffers round. diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index 92a12f13576c..a7d63a8949e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler -import scala.collection.mutable.Buffer - import org.apache.spark.resource.ResourceProfile /** @@ -32,5 +30,5 @@ case class WorkerOffer( // `address` is an optional hostPort string, it provide more useful information than `host` // when multiple executors are launched on the same host. address: Option[String] = None, - resources: Map[String, Buffer[String]] = Map.empty, + resources: ExecutorResourcesAmounts = ExecutorResourcesAmounts.empty, resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index b49f5269169d..1f452ae7d109 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -80,7 +80,7 @@ private[spark] object CoarseGrainedClusterMessages { state: TaskState, data: SerializableBuffer, taskCpus: Int, - resources: Map[String, ResourceInformation] = Map.empty) + resources: Map[String, Map[String, Long]] = Map.empty) extends CoarseGrainedClusterMessage object StatusUpdate { @@ -91,7 +91,7 @@ private[spark] object CoarseGrainedClusterMessages { state: TaskState, data: ByteBuffer, taskCpus: Int, - resources: Map[String, ResourceInformation]): StatusUpdate = { + resources: Map[String, Map[String, Long]]): StatusUpdate = { StatusUpdate(executorId, taskId, state, new SerializableBuffer(data), taskCpus, resources) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 831fbd45edd7..7e124302c726 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -172,9 +172,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += taskCpus - resources.foreach { case (k, v) => - executorInfo.resourcesInfo.get(k).foreach { r => - r.release(v.addresses.toImmutableArraySeq) + resources.foreach { case (rName, addressAmount) => + executorInfo.resourcesInfo.get(rName).foreach { r => + r.release(addressAmount) } } makeOffers(executorId) @@ -271,12 +271,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val resourcesInfo = resources.map { case (rName, info) => - // tell the executor it can schedule resources up to numSlotsPerAddress times, - // as configured by the user, or set to 1 as that is the default (1 task/resource) - val numParts = scheduler.sc.resourceProfileManager - .resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf) - (info.name, - new ExecutorResourceInfo(info.name, info.addresses.toImmutableArraySeq, numParts)) + (info.name, new ExecutorResourceInfo(info.name, info.addresses.toIndexedSeq)) } // If we've requested the executor figure out when we did. val reqTs: Option[Long] = CoarseGrainedSchedulerBackend.this.synchronized { @@ -385,9 +380,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } private def buildWorkerOffer(executorId: String, executorData: ExecutorData) = { - val resources = executorData.resourcesInfo.map { case (rName, rInfo) => - (rName, rInfo.availableAddrs.toBuffer) - } + val resources = ExecutorResourcesAmounts(executorData.resourcesInfo) WorkerOffer( executorId, executorData.executorHost, @@ -446,11 +439,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Do resources allocation here. The allocated resources will get released after the task // finishes. executorData.freeCores -= task.cpus - task.resources.foreach { case (rName, rInfo) => - assert(executorData.resourcesInfo.contains(rName)) - executorData.resourcesInfo(rName).acquire(rInfo.addresses.toImmutableArraySeq) + task.resources.foreach { case (rName, addressAmounts) => + executorData.resourcesInfo(rName).acquire(addressAmounts) } - logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.") @@ -766,7 +757,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ( executor.resourceProfileId, executor.totalCores, - executor.resourcesInfo.map { case (name, rInfo) => (name, rInfo.totalAddressAmount) } + executor.resourcesInfo.map { case (name, rInfo) => + val taskAmount = rp.taskResources.get(name).get.amount + (name, rInfo.totalParts(taskAmount)) + } ) }.unzip3 } diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 5f7958167507..8eb8d9c6dc85 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -465,7 +465,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst case (ratio, slots) => val conf = new SparkConf() conf.set(TASK_GPU_ID.amountConf, ratio.toString) - if (ratio > 0.5 && ratio % 1 != 0) { + if (ratio > 1.0 && ratio % 1 != 0) { assertThrows[SparkException] { parseResourceRequirements(conf, SPARK_TASK_PREFIX) } diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 45c27aea6022..a8c9550c6b76 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -304,14 +304,15 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", "host1", "host1", 4, env, None, resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)) - assert(backend.taskResources.isEmpty) val taskId = 1000000L + val resourcesAmounts = Map(GPU -> Map( + "0" -> ResourceAmountUtils.toInternalResource(0.15), + "1" -> ResourceAmountUtils.toInternalResource(0.76))) // We don't really verify the data, just pass it around. val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19, - 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, - Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) + 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, resourcesAmounts, data) val serializedTaskDescription = TaskDescription.encode(taskDescription) backend.rpcEnv.setupEndpoint("Executor 1", backend) backend.executor = mock[Executor](CALLS_REAL_METHODS) @@ -342,21 +343,22 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // Launch a new task shall add an entry to `taskResources` map. backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription))) eventually(timeout(10.seconds)) { - assert(backend.taskResources.size == 1) assert(runningTasks.size == 1) - val resources = backend.taskResources.get(taskId) - assert(resources(GPU).addresses sameElements Array("0", "1")) + val resources = backend.executor.runningTasks.get(taskId).taskDescription.resources + assert(resources(GPU).keys.toArray.sorted sameElements Array("0", "1")) + assert(executor.runningTasks.get(taskId).taskDescription.resources + === resourcesAmounts) } // Update the status of a running task shall not affect `taskResources` map. backend.statusUpdate(taskId, TaskState.RUNNING, data) - assert(backend.taskResources.size == 1) - val resources = backend.taskResources.get(taskId) - assert(resources(GPU).addresses sameElements Array("0", "1")) + val resources = backend.executor.runningTasks.get(taskId).taskDescription.resources + assert(resources(GPU).keys.toArray.sorted sameElements Array("0", "1")) + assert(executor.runningTasks.get(taskId).taskDescription.resources + === resourcesAmounts) // Update the status of a finished task shall remove the entry from `taskResources` map. backend.statusUpdate(taskId, TaskState.FINISHED, data) - assert(backend.taskResources.isEmpty) } finally { if (backend != null) { backend.rpcEnv.shutdown() @@ -424,11 +426,14 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val tasksKilled = new TrieMap[Long, Boolean]() val tasksExecuted = new TrieMap[Long, Boolean]() + val resourcesAmounts = Map(GPU -> Map( + "0" -> ResourceAmountUtils.toInternalResource(0.15), + "1" -> ResourceAmountUtils.toInternalResource(0.76))) + // Fake tasks with different taskIds. val taskDescriptions = (1 to numTasks).map { taskId => new TaskDescription(taskId, 2, "1", s"TASK $taskId", 19, - 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, - Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) + 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, resourcesAmounts, data) } assert(taskDescriptions.length == numTasks) @@ -513,11 +518,14 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val tasksKilled = new TrieMap[Long, Boolean]() val tasksExecuted = new TrieMap[Long, Boolean]() + val resourcesAmounts = Map(GPU -> Map( + "0" -> ResourceAmountUtils.toInternalResource(0.15), + "1" -> ResourceAmountUtils.toInternalResource(0.76))) + // Fake tasks with different taskIds. val taskDescriptions = (1 to numTasks).map { taskId => new TaskDescription(taskId, 2, "1", s"TASK $taskId", 19, - 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, - Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) + 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, resourcesAmounts, data) } assert(taskDescriptions.length == numTasks) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 49e19dd2a00e..805e7ca46749 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -642,7 +642,7 @@ class ExecutorSuite extends SparkFunSuite JobArtifactSet.emptyJobArtifactSet, properties = new Properties, cpus = 1, - resources = immutable.Map[String, ResourceInformation](), + resources = Map.empty, serializedTask) } diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala index be38315cd75f..8f68fd547fb3 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala @@ -357,12 +357,11 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar { var taskError = intercept[AssertionError] { rprof.require(new TaskResourceRequests().resource("gpu", 1.5)) }.getMessage() - assert(taskError.contains("The resource amount 1.5 must be either <= 0.5, or a whole number.")) + assert(taskError.contains("The resource amount 1.5 must be either <= 1.0, or a whole number.")) - taskError = intercept[AssertionError] { - rprof.require(new TaskResourceRequests().resource("gpu", 0.7)) - }.getMessage() - assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number.")) + rprof.require(new TaskResourceRequests().resource("gpu", 0.7)) + rprof.require(new TaskResourceRequests().resource("gpu", 1.0)) + rprof.require(new TaskResourceRequests().resource("gpu", 2.0)) } test("ResourceProfile has correct custom executor resources") { @@ -393,6 +392,33 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar { "Task resources should have 1 custom resource") } + test("SPARK-45527 fractional TaskResourceRequests in ResourceProfile") { + val ereqs = new ExecutorResourceRequests().cores(6).resource("gpus", 6) + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + new ResourceProfileBuilder().require(ereqs).require(treqs).build() + + treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.5) + new ResourceProfileBuilder().require(ereqs).require(treqs).build() + + treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.7) + + val msg = intercept[AssertionError] { + new ResourceProfileBuilder().require(ereqs).require(treqs).build() + }.getMessage + assert(msg.contains("The task resource amount 0.7 must be either <= 0.5, or a whole number")) + } + + test("SPARK-45527 fractional TaskResourceRequests in TaskResourceProfile") { + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + new ResourceProfileBuilder().require(treqs).build() + + treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.5) + new ResourceProfileBuilder().require(treqs).build() + + treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.7) + new ResourceProfileBuilder().require(treqs).build() + } + private def withMockSparkEnv(conf: SparkConf)(f: => Unit): Unit = { val previousEnv = SparkEnv.get val mockEnv = mock[SparkEnv] diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala index 20d6cc767158..7fd8a11463d8 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -26,6 +26,7 @@ import org.json4s.{DefaultFormats, Extraction} import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkFunSuite} import org.apache.spark.TestUtils._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests.RESOURCES_WARNING_TESTING import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.util.Utils @@ -336,4 +337,250 @@ class ResourceUtilsSuite extends SparkFunSuite assert(error.contains("User is expecting to use resource: gpu, but " + "didn't specify a discovery script!")) } + + test("SPARK-45527 warnOnWastedResources for ResourceProfile") { + val conf = new SparkConf() + conf.set("spark.executor.cores", "10") + conf.set("spark.task.cpus", "1") + conf.set(RESOURCES_WARNING_TESTING, true) + + // cpu limiting task number = 10/1, gpu limiting task number = 1/0.1 + var ereqs = new ExecutorResourceRequests().cores(10).resource("gpu", 1) + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + var rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + // no exception, + warnOnWastedResources(rp, conf) + + ereqs = new ExecutorResourceRequests().cores(10).resource("gpu", 1) + treqs = new TaskResourceRequests().cpus(2).resource("gpu", 0.2) + rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + // no exception, + warnOnWastedResources(rp, conf) + + ereqs = new ExecutorResourceRequests().cores(20).resource("gpu", 2) + treqs = new TaskResourceRequests().cpus(2).resource("gpu", 0.2) + rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + // no exception, + warnOnWastedResources(rp, conf) + + var msg: String = "" + + // Test cpu limiting task number + // format: (executor.core, task.cpus, executor.gpu, task.gpu, expected runnable tasks) + Seq( + (10, 2, 1, 0.1, 5), // cpu limiting task number=10/2=5, gpu limiting task number = 1/0.1 = 10 + (10, 3, 1, 0.1, 3), // cpu limiting task number=10/3=3, gpu limiting task number = 1/0.1 = 10 + (10, 4, 1, 0.1, 2), // cpu limiting task number=10/4=2, gpu limiting task number = 1/0.1 = 10 + (10, 5, 1, 0.1, 2), // cpu limiting task number=10/5=2, gpu limiting task number = 1/0.1 = 10 + (10, 6, 1, 0.1, 1), // cpu limiting task number=10/6=1, gpu limiting task number = 1/0.1 = 10 + (10, 10, 1, 0.1, 1), // cpu limiting task number=10/6=1, gpu limiting task number = 1/0.1 = 10 + (20, 7, 1, 0.1, 2), // cpu limiting task number=20/7=3, gpu limiting task number = 1/0.1 = 10 + (30, 7, 1, 0.1, 4), // cpu limiting task number=30/7=4, gpu limiting task number = 1/0.1 = 10 + (50, 14, 1, 0.1, 3) // cpu limiting task number=50/14=3, gpu limiting task number = 1/0.1=10 + ).foreach { case (executorCores, taskCpus: Int, executorGpus: Int, taskGpus: Double, + expectedTaskNumber: Int) => + ereqs = new ExecutorResourceRequests().cores(executorCores).resource("gpu", executorGpus) + treqs = new TaskResourceRequests().cpus(taskCpus).resource("gpu", taskGpus) + rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of resource: gpu (exec = 1, task = 0.1/10, runnable " + + "tasks = 10) will result in wasted resources due to resource cpus limiting the number of " + + s"runnable tasks per executor to: ${expectedTaskNumber}. Please adjust your configuration.") + ) + } + + // Test gpu limiting task number + // format: (executor.core, task.cpus, executor.gpu, task.gpu, expected runnable tasks) + Seq( + (10, 1, 1, 1.0/9, 9), // cpu limiting task number=10, gpu limiting task number = 9 + (10, 1, 1, 1.0/8, 8), // cpu limiting task number=10, gpu limiting task number = 8 + (10, 1, 1, 1.0/7, 7), // cpu limiting task number=10, gpu limiting task number = 7 + (10, 1, 1, 1.0/6, 6), // cpu limiting task number=10, gpu limiting task number = 6 + (10, 1, 1, 1.0/5, 5), // cpu limiting task number=10, gpu limiting task number = 5 + (10, 1, 1, 1.0/4, 4), // cpu limiting task number=10, gpu limiting task number = 4 + (10, 1, 1, 1.0/3, 3), // cpu limiting task number=10, gpu limiting task number = 3 + (10, 1, 1, 1.0/2, 2), // cpu limiting task number=10, gpu limiting task number = 2 + (10, 1, 1, 1.0, 1), // cpu limiting task number=10, gpu limiting task number = 1 + (30, 1, 2, 1.0/9, 2*9), // cpu limiting task number=30, gpu limiting task number = 2*9 + (30, 1, 2, 1.0/8, 2*8), // cpu limiting task number=30, gpu limiting task number = 2*8 + (30, 1, 2, 1.0/7, 2*7), // cpu limiting task number=30, gpu limiting task number = 2*7 + (30, 1, 2, 1.0/6, 2*6), // cpu limiting task number=30, gpu limiting task number = 2*6 + (30, 1, 2, 1.0/5, 2*5), // cpu limiting task number=30, gpu limiting task number = 2*5 + (30, 1, 2, 1.0/4, 2*4), // cpu limiting task number=30, gpu limiting task number = 2*4 + (30, 1, 2, 1.0/3, 2*3), // cpu limiting task number=30, gpu limiting task number = 2*3 + (30, 1, 2, 1.0/2, 2*2), // cpu limiting task number=30, gpu limiting task number = 2*2 + (30, 1, 2, 1.0, 2*1), // cpu limiting task number=30, gpu limiting task number = 2*1 + (30, 1, 2, 2.0, 1), // cpu limiting task number=30, gpu limiting task number = 1 + (70, 2, 7, 0.5, 7*2), // cpu limiting task number=30, gpu limiting task number = 7*1/0.5= + (80, 3, 9, 2.0, 9/2) // cpu limiting task number=30, gpu limiting task number = 9/2 + ).foreach { case (executorCores, taskCpus: Int, executorGpus: Int, taskGpus: Double, + expectedTaskNumber: Int) => + ereqs = new ExecutorResourceRequests().cores(executorCores).resource("gpu", executorGpus) + treqs = new TaskResourceRequests().cpus(taskCpus).resource("gpu", taskGpus) + rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains(s"The configuration of cores (exec = ${executorCores} task = " + + s"${taskCpus}, runnable tasks = ${executorCores/taskCpus}) " + + "will result in wasted resources due to resource gpu limiting the number of runnable " + + s"tasks per executor to: ${expectedTaskNumber}. Please adjust your configuration")) + } + } + + private class FakedTaskResourceProfile( + val defaultRp: ResourceProfile, + override val taskResources: Map[String, TaskResourceRequest]) + extends TaskResourceProfile(taskResources) { + override protected[spark] def getCustomExecutorResources() + : Map[String, ExecutorResourceRequest] = defaultRp.getCustomExecutorResources() + } + + test("SPARK-45527 warnOnWastedResources for TaskResourceProfile when executor number = 1") { + val conf = new SparkConf() + conf.set("spark.executor.cores", "10") + conf.set("spark.task.cpus", "1") + conf.set(TASK_GPU_ID.amountConf, "0.1") + conf.set(EXECUTOR_GPU_ID.amountConf, "1") + conf.set(RESOURCES_WARNING_TESTING, true) + + val defaultDp = ResourceProfile.getOrCreateDefaultProfile(conf) + + // cpu limiting task number = 10/1, gpu limiting task number = 1/0.1 + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + var rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + // no exception, + warnOnWastedResources(rp, conf) + + var msg: String = "" + + // Test cpu limiting task number + // format: (task cpu cores, task gpu amount, expected runnable tasks) + // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, TASK_GPU_ID=0.1 + Seq( + (2, 0.1, 5), // cpu limiting task number = 10/2=5, gpu limiting task number = 1/0.1 = 10 + (3, 0.1, 3), // cpu limiting task number = 10/3 = 3, gpu limiting task number = 1/0.1 = 10 + (4, 0.1, 2), // cpu limiting task number = 10/4 = 2, gpu limiting task number = 1/0.1 = 10 + (5, 0.1, 2), // cpu limiting task number = 10/5 = 2, gpu limiting task number = 1/0.1 = 10 + (6, 0.1, 1), // cpu limiting task number = 10/6 = 1, gpu limiting task number = 1/0.1 = 10 + (7, 0.1, 1), // cpu limiting task number = 10/7 = 1, gpu limiting task number = 1/0.1 = 10 + (10, 0.1, 1) // cpu limiting task number = 10/10 = 1, gpu limiting task number = 1/0.1 = 10 + ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) => + treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of resource: gpu (exec = 1, task = 0.1/10, runnable " + + "tasks = 10) will result in wasted resources due to resource cpus limiting the number of " + + s"runnable tasks per executor to: $expectedTaskNumber. Please adjust your configuration.") + ) + } + + // Test gpu limiting task number + // format: (task cpu cores, task gpu amount, expected runnable tasks) + // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, TASK_GPU_ID=0.1 + Seq( + (1, 0.111, 9), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.111=9 + (1, 0.125, 8), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.125=8 + (1, 0.142, 7), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.142=7 + (1, 0.166, 6), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.166=6 + (1, 0.2, 5), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.2=5 + (1, 0.25, 4), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.25=4 + (1, 0.333, 3), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.333=3 + (1, 0.5, 2), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.166=2 + (1, 0.6, 1), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.6=1 + (1, 0.7, 1), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.7=1 + (1, 0.8, 1), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.8=1 + (1, 0.9, 1), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.9=1 + (1, 1.0, 1) // cpu limiting task number = 10/1, gpu limiting task number = 1/1.0=1 + ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) => + treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of cores (exec = 10 task = 1, runnable tasks = 10) " + + "will result in wasted resources due to resource gpu limiting the number of runnable " + + s"tasks per executor to: $expectedTaskNumber. Please adjust your configuration")) + } + } + + test("SPARK-45527 warnOnWastedResources for TaskResourceProfile when executor number > 1") { + val conf = new SparkConf() + conf.set("spark.executor.cores", "60") + conf.set("spark.task.cpus", "1") + conf.set(TASK_GPU_ID.amountConf, "0.1") + conf.set(EXECUTOR_GPU_ID.amountConf, "6") + conf.set(RESOURCES_WARNING_TESTING, true) + + val defaultDp = ResourceProfile.getOrCreateDefaultProfile(conf) + + // cpu limiting task number = 60/1, gpu limiting task number = 6/0.1 + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + var rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + // no exception, + warnOnWastedResources(rp, conf) + + // cpu limiting task number = 60/2 = 30, gpu limiting task number = 6/0.2 = 30 + treqs = new TaskResourceRequests().cpus(2).resource("gpu", 0.2) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + // no exception + warnOnWastedResources(rp, conf) + + var msg: String = "" + + // Test cpu limiting task number + // format: (task cpu cores, task gpu amount, expected runnable tasks) + // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, TASK_GPU_ID=0.1 + Seq( + (4, 0.2, 15), // cpu limiting task number = 60/4=15, gpu limiting task number = 6/0.2 = 30 + (7, 0.2, 8), // cpu limiting task number = 60/7 = 8, gpu limiting task number = 6/0.2 = 30 + (30, 0.2, 2), // cpu limiting task number = 60/30 = 2, gpu limiting task number = 6/0.2 = 30 + (31, 0.2, 1), // cpu limiting task number = 60/31 = 1, gpu limiting task number = 6/0.2 = 30 + (55, 0.2, 1), // cpu limiting task number = 60/55 = 1, gpu limiting task number = 6/0.2 = 30 + (60, 0.2, 1) // cpu limiting task number = 60/60 = 1, gpu limiting task number = 6/0.2 = 30 + ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) => + treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of resource: gpu (exec = 6, task = 0.2/5, runnable " + + "tasks = 30) will result in wasted resources due to resource cpus limiting the number of " + + s"runnable tasks per executor to: $expectedTaskNumber. Please adjust your configuration.") + ) + } + + // Test gpu limiting task number + // format: (task cpu cores, task gpu amount, expected runnable tasks) + // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, TASK_GPU_ID=0.1 + Seq( + (1, 0.111, 54), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.111=54 + (1, 0.125, 48), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.125=48 + (1, 0.142, 42), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.142=42 + (1, 0.166, 36), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.166=36 + (1, 0.2, 30), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.2=30 + (1, 0.25, 24), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.25=24 + (1, 0.33, 18), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.33=18 + (1, 0.5, 12), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.5=12 + (1, 0.7, 6), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6*1/0.7 = 6 + (1, 1.0, 6), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/1 = 6 + (1, 2.0, 3), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/2 = 3 + (1, 3.0, 2), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/3 = 2 + (1, 4.0, 1), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/4 = 1 + (1, 6.0, 1) // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/6 = 1 + ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) => + treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of cores (exec = 60 task = 1, runnable tasks = 60) " + + "will result in wasted resources due to resource gpu limiting the number of runnable " + + s"tasks per executor to: $expectedTaskNumber. Please adjust your configuration")) + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index e9b8ae4bffe6..1b444aa60474 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.rdd.RDD import org.apache.spark.resource.{ExecutorResourceRequests, ResourceInformation, ResourceProfile, TaskResourceRequests} +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout} @@ -254,7 +255,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3") assert(exec3ResourceProfileId === rp.id) - val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0"))) + val taskResources = Map(GPU -> Map("0" -> ONE_ENTIRE_RESOURCE)) val taskCpus = 1 val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", "t1", 0, 1, JobArtifactSet.emptyJobArtifactSet, new Properties(), @@ -361,7 +362,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3") assert(exec3ResourceProfileId === rp.id) - val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0"))) + val taskResources = Map(GPU -> Map("0" -> ONE_ENTIRE_RESOURCE)) val taskCpus = 1 val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", "t1", 0, 1, JobArtifactSet.emptyJobArtifactSet, new Properties(), diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala index 203e30cac1a7..ac1a6b7c3ec0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala @@ -18,74 +18,87 @@ package org.apache.spark.scheduler import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.resource.ResourceAmountUtils import org.apache.spark.resource.ResourceUtils.GPU -import org.apache.spark.util.ArrayImplicits._ class ExecutorResourceInfoSuite extends SparkFunSuite { + implicit def convertMapLongToDouble(resources: Map[String, Long]): Map[String, Double] = { + resources.map { case (k, v) => k -> ResourceAmountUtils.toFractionalResource(v) } + } + + implicit def convertMapDoubleToLong(resources: Map[String, Double]): Map[String, Long] = { + resources.map { case (k, v) => k -> ResourceAmountUtils.toInternalResource(v) } + } + test("Track Executor Resource information") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3")) assert(info.assignedAddrs.isEmpty) + val reqResource = Seq("0", "1").map(addrs => addrs -> 1.0).toMap // Acquire addresses - info.acquire(Array("0", "1").toImmutableArraySeq) + info.acquire(reqResource) assert(info.availableAddrs.sorted sameElements Seq("2", "3")) assert(info.assignedAddrs.sorted sameElements Seq("0", "1")) // release addresses - info.release(Array("0", "1").toImmutableArraySeq) + info.release(reqResource) assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3")) assert(info.assignedAddrs.isEmpty) } test("Don't allow acquire address that is not available") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) // Acquire some addresses. - info.acquire(Seq("0", "1")) + val reqResource = Seq("0", "1").map(addrs => addrs -> 1.0).toMap + info.acquire(reqResource) assert(!info.availableAddrs.contains("1")) // Acquire an address that is not available val e = intercept[SparkException] { - info.acquire(Array("1").toImmutableArraySeq) + info.acquire(convertMapDoubleToLong(Map("1" -> 1.0))) } - assert(e.getMessage.contains("Try to acquire an address that is not available.")) + assert(e.getMessage.contains("Try to acquire gpu address 1 amount: 1.0, but only 0.0 left.")) } test("Don't allow acquire address that doesn't exist") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) assert(!info.availableAddrs.contains("4")) // Acquire an address that doesn't exist val e = intercept[SparkException] { - info.acquire(Array("4").toImmutableArraySeq) + info.acquire(convertMapDoubleToLong(Map("4" -> 1.0))) } assert(e.getMessage.contains("Try to acquire an address that doesn't exist.")) } test("Don't allow release address that is not assigned") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) // Acquire addresses - info.acquire(Array("0", "1").toImmutableArraySeq) + val reqResource = Seq("0", "1").map(addrs => addrs -> 1.0).toMap + info.acquire(reqResource) assert(!info.assignedAddrs.contains("2")) // Release an address that is not assigned val e = intercept[SparkException] { - info.release(Array("2").toImmutableArraySeq) + info.release(convertMapDoubleToLong(Map("2" -> 1.0))) } - assert(e.getMessage.contains("Try to release an address that is not assigned.")) + assert(e.getMessage.contains("Try to release gpu address 2 amount: 1.0. " + + "But the total amount: 2.0 after release should be <= 1")) } test("Don't allow release address that doesn't exist") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) assert(!info.assignedAddrs.contains("4")) // Release an address that doesn't exist val e = intercept[SparkException] { - info.release(Array("4").toImmutableArraySeq) + info.release(convertMapDoubleToLong(Map("4" -> 1.0))) } assert(e.getMessage.contains("Try to release an address that doesn't exist.")) } @@ -94,23 +107,92 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { val slotSeq = Seq(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) val addresses = ArrayBuffer("0", "1", "2", "3") slotSeq.foreach { slots => - val info = new ExecutorResourceInfo(GPU, addresses.toSeq, slots) + val taskAmount = 1.0 / slots + val info = new ExecutorResourceInfo(GPU, addresses.toSeq) for (_ <- 0 until slots) { - addresses.foreach(addr => info.acquire(Seq(addr))) + addresses.foreach(addr => info.acquire(convertMapDoubleToLong(Map(addr -> taskAmount)))) } - // assert that each address was assigned `slots` times - info.assignedAddrs - .groupBy(identity) - .transform((_, v) => v.size) - .foreach(x => assert(x._2 == slots)) + // All addresses has been assigned + assert(info.resourcesAmounts.values.toSeq.toSet.size == 1) + // The left amount of any address should < taskAmount + assert(ResourceAmountUtils.toFractionalResource(info.resourcesAmounts("0")) < taskAmount) addresses.foreach { addr => assertThrows[SparkException] { - info.acquire(Seq(addr)) + info.acquire(convertMapDoubleToLong(Map(addr -> taskAmount))) } - assert(!info.availableAddrs.contains(addr)) } } } + + def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], + eps: Double = 0.00000001): Boolean = { + lhs.size == rhs.size && + lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => + lName == rName && (lAmount - rAmount).abs < eps + } + } + + test("assign/release resource for different task requirements") { + val execInfo = new ExecutorResourceInfo("gpu", Seq("0", "1", "2", "3")) + + def testAllocation(taskAddressAmount: Map[String, Double], + expectedLeftRes: Map[String, Double] + ): Unit = { + execInfo.acquire(taskAddressAmount) + val leftRes = execInfo.resourcesAmounts + assert(compareMaps(leftRes, expectedLeftRes)) + } + + def testRelease(releasedRes: Map[String, Double], + expectedLeftRes: Map[String, Double] + ): Unit = { + execInfo.release(releasedRes) + val leftRes = execInfo.resourcesAmounts + assert(compareMaps(leftRes, expectedLeftRes)) + } + + testAllocation(taskAddressAmount = Map("0" -> 0.2), + expectedLeftRes = Map("0" -> 0.8, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 0.2), + expectedLeftRes = Map("0" -> 0.6, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("1" -> 1.0, "2" -> 1.0), + expectedLeftRes = Map("0" -> 0.6, "1" -> 0.0, "2" -> 0.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 0.1, "2" -> 0.8), + expectedLeftRes = Map("0" -> 0.7, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 0.50002), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("3" -> 1.0), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 0.0)) + + testAllocation(taskAddressAmount = Map("2" -> 0.2), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.6, "3" -> 0.0)) + + testRelease(releasedRes = Map("0" -> 0.80002, "1" -> 1.0, "2" -> 0.4, "3" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 1.0, "1" -> 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 1.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 0.0, "3" -> 0.0)) + + testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala new file mode 100644 index 000000000000..e512327fefa7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions + +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.resource.{ResourceAmountUtils, ResourceProfileBuilder, TaskResourceRequests} +import org.apache.spark.resource.ResourceUtils.GPU + +class ExecutorResourcesAmountsSuite extends SparkFunSuite { + + implicit def toFractionalResource(resources: Map[String, Long]): Map[String, Double] = + resources.map { case (k, v) => k -> ResourceAmountUtils.toFractionalResource(v) } + + implicit def toInternalResource(resources: Map[String, Double]): Map[String, Long] = + resources.map { case (k, v) => k -> ResourceAmountUtils.toInternalResource(v) } + + implicit def toInternalResourceMap(resources: Map[String, Map[String, Double]]): + Map[String, Map[String, Long]] = + resources.map { case (resName, addressesAmountMap) => + resName -> addressesAmountMap.map { case (k, v) => + k -> ResourceAmountUtils.toInternalResource(v) } + } + + def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], + eps: Double = 0.00000001): Boolean = { + lhs.size == rhs.size && + lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => + lName == rName && (lAmount - rAmount).abs < eps + } + } + + test("assign to rp without task resources requirement") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val treqs = new TaskResourceRequests().cpus(1) + val rp = new ResourceProfileBuilder().require(treqs).build() + + // assign nothing to rp without resource profile + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(assigned.isDefined) + assigned.foreach { case resource => assert(resource.isEmpty) } + } + + test("Convert ExecutorResourceInfos to ExecutorResourcesAmounts") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + // default resources amounts of executors info + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // Update executors info + // executors info shouldn't be changed. + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + rInfo.acquire(toInternalResource(Map("2" -> 0.4, "6" -> 0.6))) + } else { + rInfo.acquire(toInternalResource(Map("aa" -> 0.2, "bb" -> 0.7))) + } + } + + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 0.6, "4" -> 1.0, "6" -> 0.4))) + } else { + assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 0.8, "bb" -> 0.3))) + } + } + + val availableExecResAmounts1 = ExecutorResourcesAmounts(executorsInfo) + assert(availableExecResAmounts1.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val availableRes1 = availableExecResAmounts1.availableResources + availableRes1.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 0.6, "4" -> 1.0, "6" -> 0.4))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 0.8, "bb" -> 0.3))) + } + } + + } + + test("ExecutorResourcesAmounts shouldn't change ExecutorResourceInfo") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + + // default resources amounts of executors info + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val gpuTaskAmount = 0.1 + val treqs = new TaskResourceRequests().resource("gpu", gpuTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + // taskMount = 0.1 < 1.0 which can be assigned. + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + // update the value + availableExecResAmounts.acquire(assigned.get) + + val availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, + Map("2" -> (1.0 - gpuTaskAmount), "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // executors info shouldn't be changed. + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + } + + test("executor resources are not matching to the task requirement") { + val totalRes = Map("gpu" -> Map("2" -> 0.4)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + val gpuTaskAmount = 0.6 + val treqs = new TaskResourceRequests() + .resource("gpu", gpuTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(assigned.isEmpty) + } + + test("part of executor resources are not matching to the task requirement") { + val totalRes = Map("gpu" -> Map("2" -> 0.4), "fpga" -> Map("aa" -> 0.8)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + // normal allocation + val gpuTaskAmount = 0.3 + val fpgaTaskAmount = 0.8 + val treqs = new TaskResourceRequests() + .resource("gpu", gpuTaskAmount) + .resource("fpga", fpgaTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + var assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(!assigned.isEmpty) + assigned.foreach { case resource => assert(!resource.isEmpty)} + + val treqs1 = new TaskResourceRequests() + .resource("gpu", gpuTaskAmount) + .resource("fpga", 0.9) // couldn't allocate fpga + val rp1 = new ResourceProfileBuilder().require(treqs1).build() + + assigned = availableExecResAmounts.assignAddressesCustomResources(rp1) + assert(assigned.isEmpty) + } + + test("the total amount after release should be <= 1.0") { + val totalRes = Map("gpu" -> Map("2" -> 0.4)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + val e = intercept[SparkException] { + availableExecResAmounts.release(toInternalResourceMap(Map("gpu" -> Map("2" -> 0.7)))) + } + assert(e.getMessage.contains("after releasing gpu address 2 should be <= 1.0")) + + availableExecResAmounts.release(toInternalResourceMap(Map("gpu" -> Map("2" -> 0.6)))) + assert(compareMaps(availableExecResAmounts.availableResources("gpu"), Map("2" -> 1.0))) + } + + test("the total amount after acquire should be >= 0") { + val totalRes = Map("gpu" -> Map("2" -> 0.4)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + val e = intercept[SparkException] { + availableExecResAmounts.acquire(toInternalResourceMap(Map("gpu" -> Map("2" -> 0.6)))) + } + assert(e.getMessage.contains("after acquiring gpu address 2 should be >= 0")) + + availableExecResAmounts.acquire(toInternalResourceMap(Map("gpu" -> Map("2" -> 0.4)))) + assert(compareMaps(availableExecResAmounts.availableResources("gpu"), Map("2" -> 0.0))) + } + + test("Ensure that we can acquire the same fractions of a resource") { + val slotSeq = Seq(31235, 1024, 512, 256, 128, 64, 32, 16, 12, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + val addresses = ArrayBuffer("0", "1", "2", "3") + val info = new ExecutorResourceInfo(GPU, addresses.toSeq) + + slotSeq.foreach { slots => + val taskAmount = 1.0 / slots + val availableExecResAmounts = ExecutorResourcesAmounts(Map(GPU -> info)) + for (_ <- 0 until slots) { + addresses.foreach(addr => + availableExecResAmounts.acquire( + toInternalResourceMap(Map(GPU -> Map(addr -> taskAmount))))) + } + + assert(availableExecResAmounts.availableResources.size === 1) + // All addresses has been assigned + assert(availableExecResAmounts.availableResources(GPU).values.toSeq.toSet.size === 1) + // The left amount of any address should < taskAmount + assert(availableExecResAmounts.availableResources(GPU)("0") < taskAmount) + + addresses.foreach { addr => + assertThrows[SparkException] { + availableExecResAmounts.acquire( + toInternalResourceMap(Map(GPU -> Map(addr -> taskAmount)))) + } + } + } + } + + test("assign acquire release on single task resource request") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val gpuTaskAmount = 0.1 + val treqs = new TaskResourceRequests().resource("gpu", gpuTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + // taskMount = 0.1 < 1.0 which can be assigned. + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(!assigned.isEmpty) + assigned.foreach { case resource => + assert(resource.size === 1) + assert(resource.keys.toSeq === Seq("gpu")) + assert(resource("gpu").size === 1) + assert(resource("gpu").keys.toSeq === Seq("2")) + assert(ResourceAmountUtils.toFractionalResource(resource("gpu")("2")) === gpuTaskAmount) + } + + // assign will not update the real value. + var availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // acquire will updates the value + availableExecResAmounts.acquire(assigned.get) + + // after acquire + availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map( + "2" -> (1.0 - gpuTaskAmount), + "4" -> 1.0, + "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // release + availableExecResAmounts.release(assigned.get) + + availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + } + + test("assign acquire release on multiple task resources request") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val gpuTaskAmount = 0.1 + val fpgaTaskAmount = 0.3 + val treqs = new TaskResourceRequests() + .resource("gpu", gpuTaskAmount) + .resource("fpga", fpgaTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + // taskMount = 0.1 < 1.0 which can be assigned. + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(!assigned.isEmpty) + assigned.foreach { case resourceAmounts => + assert(resourceAmounts.size === 2) + assert(resourceAmounts.keys.toSeq.sorted === Seq("gpu", "fpga").sorted) + + assert(resourceAmounts("gpu").size === 1) + assert(resourceAmounts("gpu").keys.toSeq === Seq("2")) + assert(ResourceAmountUtils.toFractionalResource(resourceAmounts("gpu")("2")) === + gpuTaskAmount) + + assert(resourceAmounts("fpga").size === 1) + assert(resourceAmounts("fpga").keys.toSeq === Seq("aa")) + assert(ResourceAmountUtils.toFractionalResource(resourceAmounts("fpga")("aa")) === + fpgaTaskAmount) + } + + // assign will not update the real value. + var availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // acquire will updates the value + availableExecResAmounts.acquire(assigned.get) + + // after acquire + availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, + Map("2" -> (1.0 - gpuTaskAmount), "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> (1.0 - fpgaTaskAmount), "bb" -> 1.0))) + } + } + + // release + availableExecResAmounts.release(assigned.get) + + availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + } + + test("assign/release resource for different task requirements") { + val totalRes = Map("gpu" -> Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + def testAllocation(taskAmount: Double, + expectedAssignedAddress: Array[String], + expectedAssignedAmount: Array[Double], + expectedLeftRes: Map[String, Double] + ): Unit = { + val treqs = new TaskResourceRequests().resource("gpu", taskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(!assigned.isEmpty) + assigned.foreach { case resources => + assert( + resources("gpu").values.toArray.sorted.map(ResourceAmountUtils.toFractionalResource(_)) + === expectedAssignedAmount.sorted) + + availableExecResAmounts.acquire(resources) + + val leftRes = availableExecResAmounts.availableResources + assert(leftRes.size == 1) + assert(leftRes.keys.toSeq(0) == "gpu") + assert(compareMaps(leftRes("gpu"), expectedLeftRes)) + } + } + + def testRelease(releasedRes: Map[String, Double], + expectedLeftRes: Map[String, Double] + ): Unit = { + availableExecResAmounts.release(Map("gpu" -> releasedRes)) + + val leftRes = availableExecResAmounts.availableResources + assert(leftRes.size == 1) + assert(leftRes.keys.toSeq(0) == "gpu") + assert(compareMaps(leftRes("gpu"), expectedLeftRes)) + } + + // request 0.2 gpu, ExecutorResourcesAmounts should assign "0", + testAllocation(taskAmount = 0.2, + expectedAssignedAddress = Array("0"), + expectedAssignedAmount = Array(0.2), + expectedLeftRes = Map("0" -> 0.8, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 0.2 gpu again, ExecutorResourcesAmounts should assign "0", + testAllocation(taskAmount = 0.2, + expectedAssignedAddress = Array("0"), + expectedAssignedAmount = Array(0.2), + expectedLeftRes = Map("0" -> 0.6, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 2 gpus, ExecutorResourcesAmounts should assign "1" and "2", + testAllocation(taskAmount = 2, + expectedAssignedAddress = Array("1", "2"), + expectedAssignedAmount = Array(1.0, 1.0), + expectedLeftRes = Map("0" -> 0.6, "1" -> 0.0, "2" -> 0.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 0.1, "2" -> 0.8), + expectedLeftRes = Map("0" -> 0.7, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0)) + + // request 0.50002 gpu, ExecutorResourcesAmounts should assign "0", + testAllocation(taskAmount = 0.50002, + expectedAssignedAddress = Array("0"), + expectedAssignedAmount = Array(0.50002), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0)) + + // request 1 gpu, ExecutorResourcesAmounts should assign "3", + testAllocation(taskAmount = 1.0, + expectedAssignedAddress = Array("3"), + expectedAssignedAmount = Array(1.0), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 0.0)) + + // request 0.2 gpu, ExecutorResourcesAmounts should assign "2", + testAllocation(taskAmount = 0.2, + expectedAssignedAddress = Array("2"), + expectedAssignedAmount = Array(0.2), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.6, "3" -> 0.0)) + + testRelease(releasedRes = Map("0" -> 0.80002, "1" -> 1.0, "2" -> 0.4, "3" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 1 gpus, ExecutorResourcesAmounts should assign "0" + testAllocation(taskAmount = 1.0, + expectedAssignedAddress = Array("0"), + expectedAssignedAmount = Array(1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 2 gpus, ExecutorResourcesAmounts should assign "0", "1" + testAllocation(taskAmount = 2.0, + expectedAssignedAddress = Array("0", "1"), + expectedAssignedAmount = Array(1.0, 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 1.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 4 gpus, ExecutorResourcesAmounts should assign "0", "1", "2", "3" + testAllocation(taskAmount = 4.0, + expectedAssignedAddress = Array("0", "1", "2", "3"), + expectedAssignedAmount = Array(1.0, 1.0, 1.0, 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 0.0, "3" -> 0.0)) + + testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + } + + test("Don't allow acquire resource or address that is not available") { + // Init Executor Resource. + val totalRes = Map("gpu" -> Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + // Acquire an address from a resource that doesn't exist + val e = intercept[SparkException] { + availableExecResAmounts.acquire(toInternalResourceMap(Map("fpga" -> Map("1" -> 1.0)))) + } + assert(e.getMessage.contains("Try to acquire an address from fpga that doesn't exist")) + + // Acquire an address that is not available + val e1 = intercept[SparkException] { + availableExecResAmounts.acquire(toInternalResourceMap(Map("gpu" -> Map("6" -> 1.0)))) + } + assert(e1.getMessage.contains("Try to acquire an address that doesn't exist")) + } + + test("Don't allow release resource or address that is not available") { + // Init Executor Resource. + val totalRes = Map("gpu" -> Map("0" -> 0.5, "1" -> 0.5, "2" -> 0.5, "3" -> 0.5)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + // Acquire an address from a resource that doesn't exist + val e = intercept[SparkException] { + availableExecResAmounts.release(toInternalResourceMap(Map("fpga" -> Map("1" -> 0.1)))) + } + assert(e.getMessage.contains("Try to release an address from fpga that doesn't exist")) + + // Acquire an address that is not available + val e1 = intercept[SparkException] { + availableExecResAmounts.release(toInternalResourceMap(Map("gpu" -> Map("6" -> 0.1)))) + } + assert(e1.getMessage.contains("Try to release an address that is not assigned")) + } + +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index b36363d0f4cd..5aaacd2ec1c3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -24,7 +24,7 @@ import java.util.Properties import scala.collection.mutable.HashMap import org.apache.spark.{JobArtifactSet, SparkFunSuite} -import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.ResourceAmountUtils import org.apache.spark.resource.ResourceUtils.GPU class TaskDescriptionSuite extends SparkFunSuite { @@ -59,8 +59,10 @@ class TaskDescriptionSuite extends SparkFunSuite { } } - val originalResources = - Map(GPU -> new ResourceInformation(GPU, Array("1", "2", "3"))) + val originalResources = Map(GPU -> + Map("1" -> ResourceAmountUtils.toInternalResource(0.2), + "2" -> ResourceAmountUtils.toInternalResource(0.5), + "3" -> ResourceAmountUtils.toInternalResource(0.1))) // Create a dummy byte buffer for the task. val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) @@ -99,17 +101,8 @@ class TaskDescriptionSuite extends SparkFunSuite { assert(decodedTaskDescription.artifacts.equals(artifacts)) assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties)) assert(decodedTaskDescription.cpus.equals(originalTaskDescription.cpus)) - assert(equalResources(decodedTaskDescription.resources, originalTaskDescription.resources)) + assert(decodedTaskDescription.resources === originalTaskDescription.resources) assert(decodedTaskDescription.serializedTask.equals(taskBuffer)) - - def equalResources(original: Map[String, ResourceInformation], - target: Map[String, ResourceInformation]): Boolean = { - original.size == target.size && original.forall { case (name, info) => - target.get(name).exists { targetInfo => - info.name.equals(targetInfo.name) && - info.addresses.sameElements(targetInfo.addresses) - } - } - } } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 2ab7df0d9cfd..72d0354c5577 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -22,8 +22,10 @@ import java.util.Properties import java.util.concurrent.{CountDownLatch, ExecutorService, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.duration._ +import scala.language.implicitConversions import scala.language.reflectiveCalls import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq} @@ -33,7 +35,8 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.config -import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceProfile, TaskResourceRequests} +import org.apache.spark.resource.{ExecutorResourceRequests, ResourceAmountUtils, ResourceProfile, TaskResourceProfile, TaskResourceRequests} +import org.apache.spark.resource.ResourceAmountUtils.{ONE_ENTIRE_RESOURCE} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.status.api.v1.ThreadStackTrace @@ -145,6 +148,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler } + // Convert resources to ExecutorResourcesAmounts automatically + implicit def toExecutorResourcesAmounts(resources: Map[String, mutable.Buffer[String]]): + ExecutorResourcesAmounts = { + // convert the old resources to ExecutorResourcesAmounts + new ExecutorResourcesAmounts(resources.map { case (rName, addresses) => + rName -> addresses.map(address => address -> ONE_ENTIRE_RESOURCE).toMap + }) + } + test("SPARK-32653: Decommissioned host/executor should be considered as inactive") { val scheduler = setupScheduler() val exec0 = "exec0" @@ -1738,7 +1750,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val singleCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None, resources)) val zeroGpuWorkerOffers = - IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None, Map.empty)) + IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None)) taskScheduler.submitTasks(taskSet) // WorkerOffer doesn't contain GPU resource, don't launch any task. var taskDescriptions = taskScheduler.resourceOffers(zeroGpuWorkerOffers).flatten @@ -1748,8 +1760,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten assert(2 === taskDescriptions.length) assert(!failedTaskSet) - assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.addresses) - assert(ArrayBuffer("1") === taskDescriptions(1).resources.get(GPU).get.addresses) + assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.keys.toArray.sorted) + assert(ArrayBuffer("1") === taskDescriptions(1).resources.get(GPU).get.keys.toArray.sorted) } test("Scheduler correctly accounts for GPUs per task with fractional amount") { @@ -1775,9 +1787,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten assert(3 === taskDescriptions.length) assert(!failedTaskSet) - assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.addresses) - assert(ArrayBuffer("0") === taskDescriptions(1).resources.get(GPU).get.addresses) - assert(ArrayBuffer("0") === taskDescriptions(2).resources.get(GPU).get.addresses) + assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.keys.toArray.sorted) + assert(ArrayBuffer("0") === taskDescriptions(1).resources.get(GPU).get.keys.toArray.sorted) + assert(ArrayBuffer("0") === taskDescriptions(2).resources.get(GPU).get.keys.toArray.sorted) } test("Scheduler works with multiple ResourceProfiles and gpus") { @@ -1815,10 +1827,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext var has1Gpu = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - if (tDesc.resources(GPU).addresses.length == 2) { + if (tDesc.resources(GPU).keys.size == 2) { has2Gpus += 1 } - if (tDesc.resources(GPU).addresses.length == 1) { + if (tDesc.resources(GPU).keys.size == 1) { has1Gpu += 1 } } @@ -1830,13 +1842,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext // clear the first 2 worker offers so they don't have any room and add a third // for the resource profile val workerOffers3 = IndexedSeq( - new WorkerOffer("executor0", "host0", 0, None, Map.empty), - new WorkerOffer("executor1", "host1", 0, None, Map.empty, rp.id), + new WorkerOffer("executor0", "host0", 0, None), + new WorkerOffer("executor1", "host1", 0, None, ExecutorResourcesAmounts.empty, rp.id), new WorkerOffer("executor2", "host2", 6, None, resources3, rp.id)) taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten assert(2 === taskDescriptions.length) assert(taskDescriptions.head.resources.contains(GPU)) - assert(2 == taskDescriptions.head.resources(GPU).addresses.length) + assert(2 == taskDescriptions.head.resources(GPU).keys.size) } test("Scheduler works with task resource profiles") { @@ -1875,10 +1887,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext var has1Gpu = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - if (tDesc.resources(GPU).addresses.length == 2) { + if (tDesc.resources(GPU).keys.size == 2) { has2Gpus += 1 } - if (tDesc.resources(GPU).addresses.length == 1) { + if (tDesc.resources(GPU).keys.size == 1) { has1Gpu += 1 } } @@ -1890,13 +1902,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext // clear the first 2 worker offers so they don't have any room and add a third // for the resource profile val workerOffers3 = IndexedSeq( - WorkerOffer("executor0", "host0", 0, None, Map.empty), - WorkerOffer("executor1", "host1", 0, None, Map.empty), + WorkerOffer("executor0", "host0", 0, None), + WorkerOffer("executor1", "host1", 0, None), WorkerOffer("executor2", "host2", 4, None, resources3)) taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten assert(2 === taskDescriptions.length) assert(taskDescriptions.head.resources.contains(GPU)) - assert(2 == taskDescriptions.head.resources(GPU).addresses.length) + assert(2 == taskDescriptions.head.resources(GPU).keys.size) } test("Calculate available tasks slots for task resource profiles") { @@ -1922,11 +1934,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val workerOffers = IndexedSeq(WorkerOffer("executor0", "host0", 4, None, resources0), WorkerOffer("executor1", "host1", 4, None, resources1)) - val availableResourcesAmount = workerOffers.map(_.resources).map { resourceMap => - // available addresses already takes into account if there are fractional - // task resource requests - resourceMap.map { case (name, addresses) => (name, addresses.length) } - } + val availableResourcesAmount = workerOffers.map(_.resources).map { resAmounts => + // available addresses already takes into account if there are fractional + // task resource requests + resAmounts.resourceAddressAmount + } val taskSlotsForRp = TaskSchedulerImpl.calculateAvailableSlots( taskScheduler, taskScheduler.conf, rp.id, workerOffers.map(_.resourceProfileId).toArray, @@ -2283,4 +2295,425 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler.handleFailedTask(tsm, tid, state, reason) } + private implicit def toInternalResource(resources: Map[String, Double]): Map[String, Long] = + resources.map { case (k, v) => k -> ResourceAmountUtils.toInternalResource(v) } + + // 1 executor with 4 GPUS + Seq(true, false).foreach { barrierMode => + val barrier = if (barrierMode) "barrier" else "" + (1 to 20).foreach { taskNum => + val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum) + test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can " + + s"restrict $taskNum $barrier tasks run in the same executor") { + val taskCpus = 1 + val executorCpus = 100 // cpu will not limit the concurrent tasks number + val executorGpus = 1 + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> gpuTaskAmount.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val taskSet = if (barrierMode) { + FakeTask.createTaskSet(100) + } else { + FakeTask.createBarrierTaskSet(4 * taskNum) + } + + val resources = new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)))) + + val workerOffers = + IndexedSeq(WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources)) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements. + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 * taskNum === taskDescriptions.length) + assert(!failedTaskSet) + var gpuAddress = -1 + for (taskId <- 0 until 4 * taskNum) { + if (taskId % taskNum == 0) { + gpuAddress += 1 + } + assert(ArrayBuffer(gpuAddress.toString) === + taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted) + } + } + } + } + + // 4 executors, each of which has 1 GPU + Seq(true, false).foreach { barrierMode => + val barrier = if (barrierMode) "barrier" else "" + (1 to 20).foreach { taskNum => + val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum) + test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can " + + s"restrict $taskNum $barrier tasks run on the different executor") { + val taskCpus = 1 + val executorCpus = 100 // cpu will not limit the concurrent tasks number + val executorGpus = 1 + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> gpuTaskAmount.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val taskSet = if (barrierMode) { + FakeTask.createTaskSet(100) + } else { + FakeTask.createBarrierTaskSet(4 * taskNum) + } + + val workerOffers = + IndexedSeq( + WorkerOffer("executor0", "host0", executorCpus, Some("host0"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("0" -> 1.0))))), + WorkerOffer("executor1", "host1", executorCpus, Some("host1"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("1" -> 1.0))))), + WorkerOffer("executor2", "host2", executorCpus, Some("host2"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("2" -> 1.0))))), + WorkerOffer("executor3", "host3", executorCpus, Some("host3"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("3" -> 1.0)))))) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements + + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 * taskNum === taskDescriptions.length) + assert(!failedTaskSet) + val assignedGpus: HashMap[String, Int] = HashMap.empty + for (taskId <- 0 until 4 * taskNum) { + val gpus = taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted + assert(gpus.length == 1) + val addr = gpus(0) + if (!assignedGpus.contains(addr)) { + assignedGpus(addr) = 1 + } else { + assignedGpus(addr) += 1 + } + } + assert(assignedGpus.toMap === + Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum)) + } + } + } + + // 1 executor with 4 GPUS + Seq(true, false).foreach { barrierMode => + val barrier = if (barrierMode) "barrier" else "" + (1 to 20).foreach { taskNum => + val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum) + test(s"SPARK-45527 TaskResourceProfile with task.gpu.amount=${gpuTaskAmount} can " + + s"restrict $taskNum $barrier tasks run in the same executor") { + val executorCpus = 100 // cpu will not limit the concurrent tasks number + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> "1", + TASK_GPU_ID.amountConf -> "0.1", + EXECUTOR_GPU_ID.amountConf -> "4", + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, gpuTaskAmount) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val taskSet = if (barrierMode) { + FakeTask.createTaskSet(100, 0, 1, 1, rp.id) + } else { + FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id) + } + val resources = new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)))) + + val workerOffers = IndexedSeq( + WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources, rp.id) + ) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements. + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 * taskNum === taskDescriptions.length) + assert(!failedTaskSet) + var gpuAddress = -1 + for (taskId <- 0 until 4 * taskNum) { + if (taskId % taskNum == 0) { + gpuAddress += 1 + } + assert(ArrayBuffer(gpuAddress.toString) === + taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted) + } + } + } + } + + // 4 executors, each of which has 1 GPU + Seq(true, false).foreach { barrierMode => + val barrier = if (barrierMode) "barrier" else "" + (1 to 20).foreach { taskNum => + val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum) + test(s"SPARK-45527 TaskResourceProfile with task.gpu.amount=${gpuTaskAmount} can " + + s"restrict $taskNum $barrier tasks run on the different executor") { + val executorCpus = 100 // cpu will not limit the concurrent tasks number + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> "1", + TASK_GPU_ID.amountConf -> "0.1", + EXECUTOR_GPU_ID.amountConf -> "1", + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, gpuTaskAmount) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val taskSet = if (barrierMode) { + FakeTask.createTaskSet(100, 0, 1, 1, rp.id) + } else { + FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id) + } + + val workerOffers = + IndexedSeq( + WorkerOffer("executor0", "host0", executorCpus, Some("host1"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("0" -> 1.0)))), + rp.id), + WorkerOffer("executor1", "host1", executorCpus, Some("host2"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("1" -> 1.0)))), + rp.id), + WorkerOffer("executor2", "host2", executorCpus, Some("host3"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("2" -> 1.0)))), + rp.id), + WorkerOffer("executor3", "host3", executorCpus, Some("host4"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("3" -> 1.0)))), + rp.id) + ) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements + + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 * taskNum === taskDescriptions.length) + assert(!failedTaskSet) + val assignedGpus: HashMap[String, Int] = HashMap.empty + for (taskId <- 0 until 4 * taskNum) { + val gpus = taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted + assert(gpus.length == 1) + val addr = gpus(0) + if (!assignedGpus.contains(addr)) { + assignedGpus(addr) = 1 + } else { + assignedGpus(addr) += 1 + } + } + assert(assignedGpus.toMap === + Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum)) + } + } + } + + test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 executor " + + "can assign to other taskset") { + val taskCpus = 1 + val taskGpus = 0.3 + val executorGpus = 4 + val executorCpus = 1000 + + // each tasks require 0.3 gpu + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString + ) + val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + + // each task require 0.7 gpu + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, stageAttemptId = 0, + priority = 0, rpId = rp.id) + + val workerOffers = + IndexedSeq( + // cpu won't be a problem + WorkerOffer("executor0", "host0", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))) + ) + + taskScheduler.submitTasks(lowerTaskSet) + taskScheduler.submitTasks(higherRpTaskSet) + + // should have 3 for default profile and 2 for additional resource profile + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(8 === taskDescriptions.length) + var index = 0 + for (tDesc <- taskDescriptions) { + assert(tDesc.resources.contains(GPU)) + val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted + assert(addresses.length == 1) + if (index < 4) { // the first 4 tasks will grab 0.7 gpu + assert(addresses(0) == index.toString) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get(index.toString)) == 0.7) + } else { + assert(addresses(0) == (index - 4).toString) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get((index - 4).toString)) == 0.3) + } + index += 1 + } + } + + test("SPARK-45527 TaskResourceProfile: the left gpu resources on multiple executors " + + "can assign to other taskset") { + val taskCpus = 1 + val taskGpus = 0.3 + val executorGpus = 4 + val executorCpus = 1000 + + // each tasks require 0.3 gpu + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString + ) + val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + + // each task require 0.7 gpu + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, stageAttemptId = 0, + priority = 0, rpId = rp.id) + + val workerOffers = + IndexedSeq( + // cpu won't be a problem + WorkerOffer("executor0", "host0", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0))))), + WorkerOffer("executor1", "host1", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("1" -> 1.0))))), + WorkerOffer("executor2", "host2", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("2" -> 1.0))))), + WorkerOffer("executor3", "host3", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("3" -> 1.0))))) + ) + + taskScheduler.submitTasks(lowerTaskSet) + taskScheduler.submitTasks(higherRpTaskSet) + + // should have 3 for default profile and 2 for additional resource profile + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(8 === taskDescriptions.length) + + var index = 0 + val higherAssignedExecutorsGpus = ArrayBuffer[(String, String)]() + val lowerAssignedExecutorsGpus = ArrayBuffer[(String, String)]() + + for (tDesc <- taskDescriptions) { + assert(tDesc.resources.contains(GPU)) + val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted + assert(addresses.length == 1) + val address = addresses(0) + + // Executor 0, executor 1, executor 2, executor 3 + // task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 + if (index % 2 == 0) { + higherAssignedExecutorsGpus.append( + (tDesc.executorId, address)) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get(address)) == 0.7) + } else { + lowerAssignedExecutorsGpus.append( + (tDesc.executorId, address)) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get(address)) == 0.3) + } + index += 1 + } + + assert(higherAssignedExecutorsGpus.sorted sameElements + ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), ("executor3", "3") + )) + assert(lowerAssignedExecutorsGpus.sorted sameElements + ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), ("executor3", "3") + )) + } + + test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 executor " + + "can't assign to other taskset due to not enough gpu resource") { + val taskCpus = 1 + val taskGpus = 0.4 + val executorGpus = 4 + val executorCpus = 4 + + // each tasks require 0.3 gpu + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString + ) + val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + + // each task require 0.7 gpu + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, stageAttemptId = 0, + priority = 0, rpId = rp.id) + + val workerOffers = + IndexedSeq( + // cpu won't be a problem + WorkerOffer("executor0", "host0", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))) + ) + + taskScheduler.submitTasks(lowerTaskSet) + taskScheduler.submitTasks(higherRpTaskSet) + + // only offer the resources to the higher taskset + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 === taskDescriptions.length) + var index = 0 + for (tDesc <- taskDescriptions) { + assert(tDesc.resources.contains(GPU)) + val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted + assert(addresses.length == 1) + assert(addresses(0) == index.toString) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get(index.toString)) == 0.7) + index += 1 + } + } + + test("SPARK-45527 schedule tasks for a barrier taskSet if all tasks can be launched together") { + val taskCpus = 2 + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", + config.CPUS_PER_TASK.key -> taskCpus.toString) + + val numFreeCores = 3 + val workerOffers = IndexedSeq( + new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625")), + new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627")), + new WorkerOffer("executor2", "host2", numFreeCores, Some("192.168.0.101:49629"))) + val attempt1 = FakeTask.createBarrierTaskSet(3) + + // submit attempt 1, offer some resources, all tasks get launched together + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(3 === taskDescriptions.length) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2f8b6df8beac..26b38bfcc9ab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -38,7 +38,8 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.{SKIP_VALIDATE_CORES_TESTING, TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED} -import org.apache.spark.resource.{ResourceInformation, ResourceProfile} +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE +import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -1825,7 +1826,8 @@ class TaskSetManagerSuite val taskSet = FakeTask.createTaskSet(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))) + val taskResourceAssignments = Map( + GPU -> Map("0" -> ONE_ENTIRE_RESOURCE, "1" -> ONE_ENTIRE_RESOURCE)) val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF, 2, taskResourceAssignments)._1 assert(taskOption.isDefined) @@ -1833,7 +1835,9 @@ class TaskSetManagerSuite val allocatedResources = taskOption.get.resources assert(allocatedCpus == 2) assert(allocatedResources.size == 1) - assert(allocatedResources(GPU).addresses sameElements Array("0", "1")) + assert(allocatedResources(GPU).keys.toArray.sorted sameElements Array("0", "1")) + assert(allocatedResources === taskResourceAssignments) + } test("SPARK-26755 Ensure that a speculative task is submitted only once for execution") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org