This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 90560dce85b0 [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage 90560dce85b0 is described below commit 90560dce85b06041e90699f168f67ed2af5a5ca2 Author: Bobby Wang <wbo4...@gmail.com> AuthorDate: Tue Mar 19 10:07:43 2024 -0500 [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage ### What changes were proposed in this pull request? This PR addresses the problem of calculating the maximum concurrent tasks while evaluating the number of slots for barrier stages, specifically for the case when the task resource amount is greater than 1. ### Why are the changes needed? ``` scala test("problem of calculating the maximum concurrent task") { withTempDir { dir => val discoveryScript = createTempScriptWithExpectedOutput( dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""") val conf = new SparkConf() // Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU. .setMaster("local-cluster[1, 6, 1024]") .setAppName("test-cluster") .set(WORKER_GPU_ID.amountConf, "4") .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) .set(EXECUTOR_GPU_ID.amountConf, "4") .set(TASK_GPU_ID.amountConf, "2") // disable barrier stage retry to fail the application as soon as possible .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1) sc = new SparkContext(conf) TestUtils.waitUntilExecutorsUp(sc, 1, 60000) // Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU. // Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage // can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total. assert(sc.parallelize(Range(1, 10), 2) .barrier() .mapPartitions { iter => iter } .collect() sameElements Range(1, 10).toArray[Int]) } } ``` In the described test scenario, the executor has 6 CPU cores and 4 GPUs, and each task requires 1 CPU core and 2 GPUs. Consequently, the maximum number of concurrent tasks should be 2. However, the issue arises when attempting to launch the subsequent 2 barrier tasks, as the 'checkBarrierStageWithNumSlots' function gets the incorrect concurrent task limit that is 1 instead of 2. The bug needs to be fixed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The existing and newly added unit tests should pass ### Was this patch authored or co-authored using generative AI tooling? No Closes #45528 from wbo4958/2-gpu. Authored-by: Bobby Wang <wbo4...@gmail.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../spark/scheduler/ExecutorResourceInfo.scala | 15 +----- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 6 ++- .../cluster/CoarseGrainedSchedulerBackend.scala | 3 +- .../CoarseGrainedSchedulerBackendSuite.scala | 44 ++++++++++++++++- .../spark/scheduler/TaskSchedulerImplSuite.scala | 56 +++++++++++++++++++++- 5 files changed, 105 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index d9fbd23f3aa4..c0f4475d20cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -33,19 +33,6 @@ private[spark] class ExecutorResourceInfo( override protected def resourceName = this.name override protected def resourceAddresses = this.addresses - - /** - * Calculate how many parts the executor can offer according to the task resource amount - * @param taskAmount how many resource amount the task required - * @return the total parts - */ - def totalParts(taskAmount: Double): Int = { - assert(taskAmount > 0.0) - if (taskAmount >= 1.0) { - addresses.length / taskAmount.ceil.toInt - } else { - addresses.length * Math.floor(1.0 / taskAmount).toInt - } - } + def totalAddressesAmount: Int = this.addresses.length } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4ae8a91a2b96..dc202aa1bb71 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -1257,7 +1257,11 @@ private[spark] object TaskSchedulerImpl { numTasksPerExecCores } else { val availAddrs = resources.getOrElse(limitingResource, 0) - val resourceLimit = (availAddrs / taskLimit).toInt + val resourceLimit = if (taskLimit >= 1.0) { + availAddrs / taskLimit.ceil.toInt + } else { + availAddrs * Math.floor(1.0 / taskLimit).toInt + } // when executor cores config isn't set, we can't calculate the real limiting resource // and number of tasks per executor ahead of time, so calculate it now based on what // is available. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7e124302c726..cebe885378e3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -758,8 +758,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executor.resourceProfileId, executor.totalCores, executor.resourcesInfo.map { case (name, rInfo) => - val taskAmount = rp.taskResources.get(name).get.amount - (name, rInfo.totalParts(taskAmount)) + (name, rInfo.totalAddressesAmount) } ) }.unzip3 diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 1b444aa60474..6e94f9abe67b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -33,10 +33,11 @@ import org.scalatest.concurrent.Eventually import org.scalatestplus.mockito.MockitoSugar._ import org.apache.spark._ +import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.rdd.RDD -import org.apache.spark.resource.{ExecutorResourceRequests, ResourceInformation, ResourceProfile, TaskResourceRequests} +import org.apache.spark.resource.{ExecutorResourceRequests, ResourceInformation, ResourceProfile, ResourceProfileBuilder, TaskResourceRequests} import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ @@ -137,6 +138,47 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo } } + test("SPARK-47458 compute max number of concurrent tasks with resources limiting") { + withTempDir { dir => + val discoveryScript = createTempScriptWithExpectedOutput( + dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""") + val conf = new SparkConf() + .set(CPUS_PER_TASK, 1) + .setMaster("local-cluster[1, 20, 1024]") + .setAppName("test") + .set(WORKER_GPU_ID.amountConf, "4") + .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) + .set(EXECUTOR_GPU_ID.amountConf, "4") + .set(TASK_GPU_ID.amountConf, "0.2") + sc = new SparkContext(conf) + eventually(timeout(executorUpTimeout)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 1) + } + // The concurrent tasks should be min of {20/1, 4 * (1/0.2)} + assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf)) == 20) + + val gpuTaskAmountToExpectedTasks = Map( + 0.3 -> 12, // 4 * (1/0.3).toInt + 0.4 -> 8, // 4 * (1/0.4).toInt + 0.5 -> 8, // 4 * (1/0.5).toInt + 0.8 -> 4, // 4 * (1/0.8).toInt + 1.0 -> 4, // 4 / 1 + 2.0 -> 2, // 4 / 2 + 3.0 -> 1, // 4 / 3 + 4.0 -> 1 // 4 / 4 + ) + + // It's the GPU resource that limits the concurrent number + gpuTaskAmountToExpectedTasks.keys.foreach { taskGpu => + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, taskGpu) + val rp: ResourceProfile = new ResourceProfileBuilder().require(treqs).build() + sc.resourceProfileManager.addResourceProfile(rp) + assert(sc.maxNumConcurrentTasks(rp) == gpuTaskAmountToExpectedTasks(taskGpu)) + } + } + } + // Here we just have test for one happy case instead of all cases: other cases are covered in // FsHistoryProviderSuite. test("custom log url for Spark UI is applied") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index ea4ca8cf16c7..d9db5f656176 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.resource.{ExecutorResourceRequests, ResourceAmountUtils, ResourceProfile, TaskResourceProfile, TaskResourceRequests} -import org.apache.spark.resource.ResourceAmountUtils.{ONE_ENTIRE_RESOURCE} +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.status.api.v1.ThreadStackTrace @@ -2687,4 +2687,58 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten assert(3 === taskDescriptions.length) } + + // 1 executor with 4 GPUS + Seq(true, false).foreach { barrierMode => + val barrier = if (barrierMode) "in barrier" else "" + Seq(1, 2, 3, 4).foreach { gpuTaskAmount => + test(s"SPARK-47458 GPU fraction resource should work when " + + s"gpu task amount = ${gpuTaskAmount} $barrier") { + + val executorCpus = 10 // cpu will not limit the concurrent tasks number + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> "1", + TASK_GPU_ID.amountConf -> gpuTaskAmount.toString, + EXECUTOR_GPU_ID.amountConf -> "4", + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val taskNum = 4 / gpuTaskAmount + val taskSet = if (barrierMode) { + FakeTask.createBarrierTaskSet(taskNum) + } else { + FakeTask.createTaskSet(10) + } + val resources = new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)))) + + val workerOffers = IndexedSeq( + WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources) + ) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements. + var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + taskDescriptions = taskDescriptions.sortBy(t => t.index) + assert(taskNum === taskDescriptions.length) + assert(!failedTaskSet) + + // The key is gpuTaskAmount + // The values are the gpu addresses of each task. + val gpuTaskAmountToExpected = Map( + 1 -> Seq(Array("0"), Array("1"), Array("2"), Array("3")), + 2 -> Seq(Array("0", "1"), Array("2", "3")), + 3 -> Seq(Array("0", "1", "2")), + 4 -> Seq(Array("0", "1", "2", "3")) + ) + + taskDescriptions.foreach { task => + val taskResources = task.resources(GPU).keys.toArray.sorted + val expected = gpuTaskAmountToExpected(gpuTaskAmount)(task.index) + assert(taskResources sameElements expected) + } + } + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org