This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a36514e  [3.0][SPARK-32518][CORE] 
CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should consider all kinds 
of resources
a36514e is described below

commit a36514e1fbf6920f0d18a7d8fb690822843eb2a2
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Aug 18 06:50:05 2020 +0000

    [3.0][SPARK-32518][CORE] 
CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should consider all kinds 
of resources
    
    ### What changes were proposed in this pull request?
    
    1.  Make `CoarseGrainedSchedulerBackend.maxNumConcurrentTasks()` considers 
all kinds of resources when calculating the max concurrent tasks
    
    2. Refactor `calculateAvailableSlots()` to make it be able to be used for 
both `CoarseGrainedSchedulerBackend` and `TaskSchedulerImpl`
    
    ### Why are the changes needed?
    
    Currently, `CoarseGrainedSchedulerBackend.maxNumConcurrentTasks()` only 
considers the CPU for the max concurrent tasks. This can cause the application 
to hang when a barrier stage requires extra custom resources but the cluster 
doesn't have enough corresponding resources. Because, without the checking for 
other custom resources in `maxNumConcurrentTasks`, the barrier stage can be 
submitted to the `TaskSchedulerImpl`. But the `TaskSchedulerImpl` won't launch 
tasks for the barrier stage [...]
    
    If the barrier stage doesn't launch all the tasks in one true, the 
application will fail and suggest user to disable delay scheduling. However, 
this actually a misleading suggestion since the real root cause is not enough 
resources.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. In case of a barrier stage requires more custom resources than the 
cluster has, previously, the application will fail with misleading suggestion 
of disabling delay scheduling. After this PR, the application will fail with 
the error message saying not enough resources.
    
    ### How was this patch tested?
    
    Added a unit test.
    
    Closes #29395 from Ngone51/backport-spark-32518.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../main/scala/org/apache/spark/SparkContext.scala | 10 +++---
 .../org/apache/spark/internal/config/Tests.scala   | 15 +++++++++
 .../scheduler/BarrierJobAllocationFailed.scala     |  4 +--
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 10 +++---
 .../spark/scheduler/ExecutorResourceInfo.scala     |  1 +
 .../apache/spark/scheduler/SchedulerBackend.scala  |  3 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 36 +++++++++++++++++++++-
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 22 +++++++++++--
 .../spark/BarrierStageOnSubmittedSuite.scala       | 36 ++++++++++++++++++++++
 .../scala/org/apache/spark/SparkContextSuite.scala |  2 ++
 10 files changed, 124 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 65c08cf..66fe1d7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1597,7 +1597,8 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
-   * Get the max number of tasks that can be concurrent launched currently.
+   * Get the max number of tasks that can be concurrent launched based on the 
resources
+   * could be used, even if some of them are being used at the moment.
    * Note that please don't cache the value returned by this method, because 
the number can change
    * due to add/remove executors.
    *
@@ -2776,8 +2777,9 @@ object SparkContext extends Logging {
       }
       // some cluster managers don't set the EXECUTOR_CORES config by default 
(standalone
       // and mesos coarse grained), so we can't rely on that config for those.
-      val shouldCheckExecCores = executorCores.isDefined || 
sc.conf.contains(EXECUTOR_CORES) ||
+      var shouldCheckExecCores = executorCores.isDefined || 
sc.conf.contains(EXECUTOR_CORES) ||
         (master.equalsIgnoreCase("yarn") || master.startsWith("k8s"))
+      shouldCheckExecCores &= !sc.conf.get(SKIP_VALIDATE_CORES_TESTING)
 
       // Number of cores per executor must meet at least one task requirement.
       if (shouldCheckExecCores && execCores < taskCores) {
@@ -2833,7 +2835,7 @@ object SparkContext extends Logging {
           limitingResourceName = taskReq.resourceName
         }
       }
-      if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sc.conf)) {
+      if(!shouldCheckExecCores) {
         // if we can't rely on the executor cores config throw a warning for 
user
         logWarning("Please ensure that the number of slots available on your " 
+
           "executors is limited by the number of cores to task cpus and not 
another " +
@@ -2857,7 +2859,7 @@ object SparkContext extends Logging {
             s"result in wasted resources due to resource 
${limitingResourceName} limiting the " +
             s"number of runnable tasks per executor to: ${numSlots}. Please 
adjust " +
             s"your configuration."
-          if (Utils.isTesting) {
+          if (sc.conf.get(RESOURCES_WARNING_TESTING)) {
             throw new SparkException(message)
           } else {
             logWarning(message)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
index 232264d6..e328ed0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
@@ -61,4 +61,19 @@ private[spark] object Tests {
     .version("3.0.0")
     .intConf
     .createWithDefault(2)
+
+  val RESOURCES_WARNING_TESTING = 
ConfigBuilder("spark.resources.warnings.testing")
+    .version("3.0.1")
+    .booleanConf
+    .createWithDefault(false)
+
+  // This configuration is used for unit tests to allow skipping the task cpus 
to cores validation
+  // to allow emulating standalone mode behavior while running in local mode. 
Standalone mode
+  // by default doesn't specify a number of executor cores, it just uses all 
the ones available
+  // on the host.
+  val SKIP_VALIDATE_CORES_TESTING =
+  ConfigBuilder("spark.testing.skipValidateCores")
+    .version("3.0.1")
+    .booleanConf
+    .createWithDefault(false)
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
index 2274e68..043c6b9 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
@@ -60,6 +60,6 @@ private[spark] object BarrierJobAllocationFailed {
   val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
     "[SPARK-24819]: Barrier execution mode does not allow run a barrier stage 
that requires " +
       "more slots than the total number of slots in the cluster currently. 
Please init a new " +
-      "cluster with more CPU cores or repartition the input RDD(s) to reduce 
the number of " +
-      "slots required to run this barrier stage."
+      "cluster with more resources(e.g. CPU, GPU) or repartition the input 
RDD(s) to reduce " +
+      "the number of slots required to run this barrier stage."
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 51445bf..b483b52 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -453,10 +453,12 @@ private[spark] class DAGScheduler(
    * submission.
    */
   private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
-    val numPartitions = rdd.getNumPartitions
-    val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
-    if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
-      throw new BarrierJobSlotsNumberCheckFailed(numPartitions, 
maxNumConcurrentTasks)
+    if (rdd.isBarrier()) {
+      val numPartitions = rdd.getNumPartitions
+      val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
+      if (numPartitions > maxNumConcurrentTasks) {
+        throw new BarrierJobSlotsNumberCheckFailed(numPartitions, 
maxNumConcurrentTasks)
+      }
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
index fd04db8..508c6ce 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
@@ -36,4 +36,5 @@ private[spark] class ExecutorResourceInfo(
   override protected def resourceName = this.name
   override protected def resourceAddresses = this.addresses
   override protected def slotsPerAddress: Int = numParts
+  def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 9159d2a..7b76af2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -77,7 +77,8 @@ private[spark] trait SchedulerBackend {
   def getDriverAttributes: Option[Map[String, String]] = None
 
   /**
-   * Get the max number of tasks that can be concurrent launched currently.
+   * Get the max number of tasks that can be concurrent launched based on the 
resources
+   * could be used, even if some of them are being used at the moment.
    * Note that please don't cache the value returned by this method, because 
the number can change
    * due to add/remove executors.
    *
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 97125f6..46641e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -446,7 +446,17 @@ private[spark] class TaskSchedulerImpl(
     // of locality levels so that it gets a chance to launch local tasks on 
all of them.
     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
     for (taskSet <- sortedTaskSets) {
-      val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
+      // we only need to calculate available slots if using barrier 
scheduling, otherwise the
+      // value is -1
+      val availableSlots = if (taskSet.isBarrier) {
+        val availableResourcesAmount = availableResources.map { resourceMap =>
+          // note that the addresses here have been expanded according to the 
numParts
+          resourceMap.map { case (name, addresses) => (name, addresses.length) 
}
+        }
+        calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
+      } else {
+        -1
+      }
       // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
       if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
         // Skip the launch process.
@@ -934,6 +944,30 @@ private[spark] object TaskSchedulerImpl {
   val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key
 
   /**
+   * Calculate the max available task slots given the `availableCpus` and 
`availableResources`
+   * from a collection of executors.
+   *
+   * @param scheduler the TaskSchedulerImpl instance
+   * @param availableCpus an Array of the amount of available cpus from the 
executors.
+   * @param availableResources an Array of the resources map from the 
executors. In the resource
+   *                           map, it maps from the resource name to its 
amount.
+   * @return the number of max task slots
+   */
+  def calculateAvailableSlots(
+      scheduler: TaskSchedulerImpl,
+      availableCpus: Array[Int],
+      availableResources: Array[Map[String, Int]]): Int = {
+    val cpusPerTask = scheduler.CPUS_PER_TASK
+    val resourcesReqsPerTask = scheduler.resourcesReqsPerTask
+    availableCpus.zip(availableResources).map { case (cpu, resources) =>
+      val cpuNum = cpu / cpusPerTask
+      resourcesReqsPerTask.map { req =>
+        resources.get(req.resourceName).map(_ / req.amount).getOrElse(0)
+      }.reduceOption(Math.min).map(_.min(cpuNum)).getOrElse(cpuNum)
+    }.sum
+  }
+
+  /**
    * Used to balance containers across hosts.
    *
    * Accepts a map of hosts to resource offers for that host, and returns a 
prioritized list of
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9d8fb8f..8b55e2c 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -563,10 +563,26 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       !executorsPendingLossReason.contains(id)
   }
 
+  /**
+   * Get the max number of tasks that can be concurrent launched based on the 
resources
+   * could be used, even if some of them are being used at the moment.
+   * Note that please don't cache the value returned by this method, because 
the number can change
+   * due to add/remove executors.
+   *
+   * @return The max number of tasks that can be concurrent launched currently.
+   */
   override def maxNumConcurrentTasks(): Int = synchronized {
-    executorDataMap.values.map { executor =>
-      executor.totalCores / scheduler.CPUS_PER_TASK
-    }.sum
+    val (cpus, resources) = {
+      executorDataMap
+        .filter { case (id, _) => isExecutorActive(id) }
+        .values.toArray.map { executor =>
+        (
+          executor.totalCores,
+          executor.resourcesInfo.map { case (name, rInfo) => (name, 
rInfo.totalAddressAmount) }
+        )
+      }.unzip
+    }
+    TaskSchedulerImpl.calculateAvailableSlots(scheduler, cpus, resources)
   }
 
   // this function is for testing only
diff --git 
a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala 
b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
index 435b927..7052d1a 100644
--- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
@@ -19,9 +19,12 @@ package org.apache.spark
 
 import scala.concurrent.duration._
 
+import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
+import org.apache.spark.resource.TestResourceIDs.{EXECUTOR_GPU_ID, 
TASK_GPU_ID, WORKER_GPU_ID}
 import org.apache.spark.scheduler.BarrierJobAllocationFailed._
+import org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
     testSubmitJob(sc, rdd,
       message = 
ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
   }
+
+  test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks 
should " +
+    "consider all kinds of resources for the barrier stage") {
+    withTempDir { dir =>
+      val discoveryScript = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""")
+
+      val conf = new SparkConf()
+        .setMaster("local-cluster[1, 2, 1024]")
+        .setAppName("test-cluster")
+        .set(WORKER_GPU_ID.amountConf, "1")
+        .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+        .set(EXECUTOR_GPU_ID.amountConf, "1")
+        .set(TASK_GPU_ID.amountConf, "1")
+        // disable barrier stage retry to fail the application as soon as 
possible
+        .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
+        // disable the check to simulate the behavior of Standalone in order to
+        // reproduce the issue.
+        .set(Tests.SKIP_VALIDATE_CORES_TESTING, true)
+      sc = new SparkContext(conf)
+      // setup an executor which will have 2 CPUs and 1 GPU
+      TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+
+      val exception = intercept[BarrierJobSlotsNumberCheckFailed] {
+        sc.parallelize(Range(1, 10), 2)
+          .barrier()
+          .mapPartitions { iter => iter }
+          .collect()
+      }
+      assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution " 
+
+        "mode does not allow run a barrier stage that requires more slots"))
+    }
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index ce437a5..dc1c045 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -36,6 +36,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.TestUtils._
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.RESOURCES_WARNING_TESTING
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.resource.ResourceAllocation
 import org.apache.spark.resource.ResourceUtils._
@@ -890,6 +891,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       .setAppName("test-cluster")
     conf.set(TASK_GPU_ID.amountConf, "2")
     conf.set(EXECUTOR_GPU_ID.amountConf, "4")
+    conf.set(RESOURCES_WARNING_TESTING, true)
 
     var error = intercept[SparkException] {
       sc = new SparkContext(conf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to