This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 90560dce85b0 [SPARK-47458][CORE] Fix the problem with calculating the 
maximum concurrent tasks for the barrier stage
90560dce85b0 is described below

commit 90560dce85b06041e90699f168f67ed2af5a5ca2
Author: Bobby Wang <wbo4...@gmail.com>
AuthorDate: Tue Mar 19 10:07:43 2024 -0500

    [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent 
tasks for the barrier stage
    
    ### What changes were proposed in this pull request?
    
    This PR addresses the problem of calculating the maximum concurrent tasks 
while evaluating the number of slots for barrier stages, specifically for the 
case when the task resource amount is greater than 1.
    
    ### Why are the changes needed?
    
    ``` scala
      test("problem of calculating the maximum concurrent task") {
        withTempDir { dir =>
          val discoveryScript = createTempScriptWithExpectedOutput(
            dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", 
"2", "3"]}""")
    
          val conf = new SparkConf()
            // Setup a local cluster which would only has one executor with 2 
CPUs and 1 GPU.
            .setMaster("local-cluster[1, 6, 1024]")
            .setAppName("test-cluster")
            .set(WORKER_GPU_ID.amountConf, "4")
            .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
            .set(EXECUTOR_GPU_ID.amountConf, "4")
            .set(TASK_GPU_ID.amountConf, "2")
            // disable barrier stage retry to fail the application as soon as 
possible
            .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
          sc = new SparkContext(conf)
          TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
    
          // Setup a barrier stage which contains 2 tasks and each task 
requires 1 CPU and 1 GPU.
          // Therefore, the total resources requirement (2 CPUs and 2 GPUs) of 
this barrier stage
          // can not be satisfied since the cluster only has 2 CPUs and 1 GPU 
in total.
          assert(sc.parallelize(Range(1, 10), 2)
            .barrier()
            .mapPartitions { iter => iter }
            .collect() sameElements Range(1, 10).toArray[Int])
        }
      }
    ```
    
    In the described test scenario, the executor has 6 CPU cores and 4 GPUs, 
and each task requires 1 CPU core and 2 GPUs. Consequently, the maximum number 
of concurrent tasks should be 2. However, the issue arises when attempting to 
launch the subsequent 2 barrier tasks, as the 'checkBarrierStageWithNumSlots' 
function gets the incorrect concurrent task limit that is 1 instead of 2. The 
bug needs to be fixed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    The existing and newly added unit tests should pass
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45528 from wbo4958/2-gpu.
    
    Authored-by: Bobby Wang <wbo4...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../spark/scheduler/ExecutorResourceInfo.scala     | 15 +-----
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  6 ++-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  3 +-
 .../CoarseGrainedSchedulerBackendSuite.scala       | 44 ++++++++++++++++-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 56 +++++++++++++++++++++-
 5 files changed, 105 insertions(+), 19 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
index d9fbd23f3aa4..c0f4475d20cc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
@@ -33,19 +33,6 @@ private[spark] class ExecutorResourceInfo(
 
   override protected def resourceName = this.name
   override protected def resourceAddresses = this.addresses
-
-  /**
-   * Calculate how many parts the executor can offer according to the task 
resource amount
-   * @param taskAmount how many resource amount the task required
-   * @return the total parts
-   */
-  def totalParts(taskAmount: Double): Int = {
-    assert(taskAmount > 0.0)
-    if (taskAmount >= 1.0) {
-      addresses.length / taskAmount.ceil.toInt
-    } else {
-      addresses.length * Math.floor(1.0 / taskAmount).toInt
-    }
-  }
+  def totalAddressesAmount: Int = this.addresses.length
 
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4ae8a91a2b96..dc202aa1bb71 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -1257,7 +1257,11 @@ private[spark] object TaskSchedulerImpl {
           numTasksPerExecCores
         } else {
           val availAddrs = resources.getOrElse(limitingResource, 0)
-          val resourceLimit = (availAddrs / taskLimit).toInt
+          val resourceLimit = if (taskLimit >= 1.0) {
+            availAddrs / taskLimit.ceil.toInt
+          } else {
+            availAddrs * Math.floor(1.0 / taskLimit).toInt
+          }
           // when executor cores config isn't set, we can't calculate the real 
limiting resource
           // and number of tasks per executor ahead of time, so calculate it 
now based on what
           // is available.
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7e124302c726..cebe885378e3 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -758,8 +758,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
             executor.resourceProfileId,
             executor.totalCores,
             executor.resourcesInfo.map { case (name, rInfo) =>
-              val taskAmount = rp.taskResources.get(name).get.amount
-              (name, rInfo.totalParts(taskAmount))
+              (name, rInfo.totalAddressesAmount)
             }
           )
         }.unzip3
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 1b444aa60474..6e94f9abe67b 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -33,10 +33,11 @@ import org.scalatest.concurrent.Eventually
 import org.scalatestplus.mockito.MockitoSugar._
 
 import org.apache.spark._
+import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.rdd.RDD
-import org.apache.spark.resource.{ExecutorResourceRequests, 
ResourceInformation, ResourceProfile, TaskResourceRequests}
+import org.apache.spark.resource.{ExecutorResourceRequests, 
ResourceInformation, ResourceProfile, ResourceProfileBuilder, 
TaskResourceRequests}
 import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
@@ -137,6 +138,47 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     }
   }
 
+  test("SPARK-47458 compute max number of concurrent tasks with resources 
limiting") {
+    withTempDir { dir =>
+      val discoveryScript = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", 
"2", "3"]}""")
+      val conf = new SparkConf()
+        .set(CPUS_PER_TASK, 1)
+        .setMaster("local-cluster[1, 20, 1024]")
+        .setAppName("test")
+        .set(WORKER_GPU_ID.amountConf, "4")
+        .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+        .set(EXECUTOR_GPU_ID.amountConf, "4")
+        .set(TASK_GPU_ID.amountConf, "0.2")
+      sc = new SparkContext(conf)
+      eventually(timeout(executorUpTimeout)) {
+        // Ensure all executors have been launched.
+        assert(sc.getExecutorIds().length == 1)
+      }
+      // The concurrent tasks should be min of {20/1, 4 * (1/0.2)}
+      
assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf))
 == 20)
+
+      val gpuTaskAmountToExpectedTasks = Map(
+        0.3 -> 12,  // 4 * (1/0.3).toInt
+        0.4 -> 8,   // 4 * (1/0.4).toInt
+        0.5 -> 8,   // 4 * (1/0.5).toInt
+        0.8 -> 4,   // 4 * (1/0.8).toInt
+        1.0 -> 4,   // 4 / 1
+        2.0 -> 2,   // 4 / 2
+        3.0 -> 1,   // 4 / 3
+        4.0 -> 1    // 4 / 4
+      )
+
+      // It's the GPU resource that limits the concurrent number
+      gpuTaskAmountToExpectedTasks.keys.foreach { taskGpu =>
+        val treqs = new TaskResourceRequests().cpus(1).resource(GPU, taskGpu)
+        val rp: ResourceProfile = new 
ResourceProfileBuilder().require(treqs).build()
+        sc.resourceProfileManager.addResourceProfile(rp)
+        assert(sc.maxNumConcurrentTasks(rp) == 
gpuTaskAmountToExpectedTasks(taskGpu))
+      }
+    }
+  }
+
   // Here we just have test for one happy case instead of all cases: other 
cases are covered in
   // FsHistoryProviderSuite.
   test("custom log url for Spark UI is applied") {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index ea4ca8cf16c7..d9db5f656176 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
 import org.apache.spark._
 import org.apache.spark.internal.config
 import org.apache.spark.resource.{ExecutorResourceRequests, 
ResourceAmountUtils, ResourceProfile, TaskResourceProfile, TaskResourceRequests}
-import org.apache.spark.resource.ResourceAmountUtils.{ONE_ENTIRE_RESOURCE}
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.status.api.v1.ThreadStackTrace
@@ -2687,4 +2687,58 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
     assert(3 === taskDescriptions.length)
   }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "in barrier" else ""
+    Seq(1, 2, 3, 4).foreach { gpuTaskAmount =>
+      test(s"SPARK-47458 GPU fraction resource should work when " +
+        s"gpu task amount = ${gpuTaskAmount} $barrier") {
+
+        val executorCpus = 10 // cpu will not limit the concurrent tasks number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> "4",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskNum = 4 / gpuTaskAmount
+        val taskSet = if (barrierMode) {
+          FakeTask.createBarrierTaskSet(taskNum)
+        } else {
+          FakeTask.createTaskSet(10)
+        }
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 
1.0, "3" -> 1.0))))
+
+        val workerOffers = IndexedSeq(
+          WorkerOffer("executor0", "host0", executorCpus, Some("host0"), 
resources)
+        )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        var taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
+        taskDescriptions = taskDescriptions.sortBy(t => t.index)
+        assert(taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+
+        // The key is gpuTaskAmount
+        // The values are the gpu addresses of each task.
+        val gpuTaskAmountToExpected = Map(
+          1 -> Seq(Array("0"), Array("1"), Array("2"), Array("3")),
+          2 -> Seq(Array("0", "1"), Array("2", "3")),
+          3 -> Seq(Array("0", "1", "2")),
+          4 -> Seq(Array("0", "1", "2", "3"))
+        )
+
+        taskDescriptions.foreach { task =>
+          val taskResources = task.resources(GPU).keys.toArray.sorted
+          val expected = gpuTaskAmountToExpected(gpuTaskAmount)(task.index)
+          assert(taskResources sameElements expected)
+        }
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to