Repository: spark
Updated Branches:
  refs/heads/master b1f4b4abf -> 41a7cdf85


[SPARK-8881] [SPARK-9260] Fix algorithm for scheduling executors on workers

Current scheduling algorithm allocates one core at a time and in doing so ends 
up ignoring spark.executor.cores. As a result, when 
spark.cores.max/spark.executor.cores (i.e, num_executors) < num_workers, 
executors are not launched and the app hangs. This PR fixes and refactors the 
scheduling algorithm.

andrewor14

Author: Nishkam Ravi <nr...@cloudera.com>
Author: nishkamravi2 <nishkamr...@gmail.com>

Closes #7274 from nishkamravi2/master_scheduler and squashes the following 
commits:

b998097 [nishkamravi2] Update Master.scala
da0f491 [Nishkam Ravi] Update Master.scala
79084e8 [Nishkam Ravi] Update Master.scala
1daf25f [Nishkam Ravi] Update Master.scala
f279cdf [Nishkam Ravi] Update Master.scala
adec84b [Nishkam Ravi] Update Master.scala
a06da76 [nishkamravi2] Update Master.scala
40c8f9f [nishkamravi2] Update Master.scala (to trigger retest)
c11c689 [nishkamravi2] Update EventLoggingListenerSuite.scala
5d6a19c [nishkamravi2] Update Master.scala (for the purpose of issuing a retest)
2d6371c [Nishkam Ravi] Update Master.scala
66362d5 [nishkamravi2] Update Master.scala
ee7cf0e [Nishkam Ravi] Improved scheduling algorithm for executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41a7cdf8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41a7cdf8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41a7cdf8

Branch: refs/heads/master
Commit: 41a7cdf85de2d583d8b8759941a9d6c6e98cae4d
Parents: b1f4b4a
Author: Nishkam Ravi <nr...@cloudera.com>
Authored: Sat Jul 25 22:56:25 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Sat Jul 25 22:56:25 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala | 112 +++++++++++++------
 1 file changed, 75 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/41a7cdf8/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4615feb..029f94d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -541,6 +541,7 @@ private[master] class Master(
 
   /**
    * Schedule executors to be launched on the workers.
+   * Returns an array containing number of cores assigned to each worker.
    *
    * There are two modes of launching executors. The first attempts to spread 
out an application's
    * executors on as many workers as possible, while the second does the 
opposite (i.e. launch them
@@ -551,39 +552,73 @@ private[master] class Master(
    * multiple executors from the same application may be launched on the same 
worker if the worker
    * has enough cores and memory. Otherwise, each executor grabs all the cores 
available on the
    * worker by default, in which case only one executor may be launched on 
each worker.
+   *
+   * It is important to allocate coresPerExecutor on each worker at a time 
(instead of 1 core
+   * at a time). Consider the following example: cluster has 4 workers with 16 
cores each.
+   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 
16). If 1 core is
+   * allocated at a time, 12 cores from each worker would be assigned to each 
executor.
+   * Since 12 < 16, no executors would launch [SPARK-8881].
    */
-  private def startExecutorsOnWorkers(): Unit = {
-    // Right now this is a very simple FIFO scheduler. We keep trying to fit 
in the first app
-    // in the queue, then the second app, etc.
-    if (spreadOutApps) {
-      // Try to spread out each app among all the workers, until it has all 
its cores
-      for (app <- waitingApps if app.coresLeft > 0) {
-        val usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE)
-          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB 
&&
-            worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
-          .sortBy(_.coresFree).reverse
-        val numUsable = usableWorkers.length
-        val assigned = new Array[Int](numUsable) // Number of cores to give on 
each node
-        var toAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
-        var pos = 0
-        while (toAssign > 0) {
-          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
-            toAssign -= 1
-            assigned(pos) += 1
+  private[master] def scheduleExecutorsOnWorkers(
+      app: ApplicationInfo,
+      usableWorkers: Array[WorkerInfo],
+      spreadOutApps: Boolean): Array[Int] = {
+    // If the number of cores per executor is not specified, then we can just 
schedule
+    // 1 core at a time since we expect a single executor to be launched on 
each worker
+    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
+    val memoryPerExecutor = app.desc.memoryPerExecutorMB
+    val numUsable = usableWorkers.length
+    val assignedCores = new Array[Int](numUsable) // Number of cores to give 
to each worker
+    val assignedMemory = new Array[Int](numUsable) // Amount of memory to give 
to each worker
+    var coresToAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
+    var freeWorkers = (0 until numUsable).toIndexedSeq
+
+    def canLaunchExecutor(pos: Int): Boolean = {
+      usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
+      usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
+    }
+
+    while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
+      freeWorkers = freeWorkers.filter(canLaunchExecutor)
+      freeWorkers.foreach { pos =>
+        var keepScheduling = true
+        while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= 
coresPerExecutor) {
+          coresToAssign -= coresPerExecutor
+          assignedCores(pos) += coresPerExecutor
+          assignedMemory(pos) += memoryPerExecutor
+
+          // Spreading out an application means spreading out its executors 
across as
+          // many workers as possible. If we are not spreading out, then we 
should keep
+          // scheduling executors on this worker until we use all of its 
resources.
+          // Otherwise, just move on to the next worker.
+          if (spreadOutApps) {
+            keepScheduling = false
           }
-          pos = (pos + 1) % numUsable
-        }
-        // Now that we've decided how many cores to give on each node, let's 
actually give them
-        for (pos <- 0 until numUsable if assigned(pos) > 0) {
-          allocateWorkerResourceToExecutors(app, assigned(pos), 
usableWorkers(pos))
         }
       }
-    } else {
-      // Pack each app into as few workers as possible until we've assigned 
all its cores
-      for (worker <- workers if worker.coresFree > 0 && worker.state == 
WorkerState.ALIVE) {
-        for (app <- waitingApps if app.coresLeft > 0) {
-          allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
-        }
+    }
+    assignedCores
+  }
+
+  /**
+   * Schedule and launch executors on workers
+   */
+  private def startExecutorsOnWorkers(): Unit = {
+    // Right now this is a very simple FIFO scheduler. We keep trying to fit 
in the first app
+    // in the queue, then the second app, etc.
+    for (app <- waitingApps if app.coresLeft > 0) {
+      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
+      // Filter out workers that don't have enough resources to launch an 
executor
+      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
+        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
+          worker.coresFree >= coresPerExecutor.getOrElse(1))
+        .sortBy(_.coresFree).reverse
+      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, 
spreadOutApps)
+
+      // Now that we've decided how many cores to allocate on each worker, 
let's allocate them
+      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
+        allocateWorkerResourceToExecutors(
+          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
       }
     }
   }
@@ -591,19 +626,22 @@ private[master] class Master(
   /**
    * Allocate a worker's resources to one or more executors.
    * @param app the info of the application which the executors belong to
-   * @param coresToAllocate cores on this worker to be allocated to this 
application
+   * @param assignedCores number of cores on this worker for this application
+   * @param coresPerExecutor number of cores per executor
    * @param worker the worker info
    */
   private def allocateWorkerResourceToExecutors(
       app: ApplicationInfo,
-      coresToAllocate: Int,
+      assignedCores: Int,
+      coresPerExecutor: Option[Int],
       worker: WorkerInfo): Unit = {
-    val memoryPerExecutor = app.desc.memoryPerExecutorMB
-    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
-    var coresLeft = coresToAllocate
-    while (coresLeft >= coresPerExecutor && worker.memoryFree >= 
memoryPerExecutor) {
-      val exec = app.addExecutor(worker, coresPerExecutor)
-      coresLeft -= coresPerExecutor
+    // If the number of cores per executor is specified, we divide the cores 
assigned
+    // to this worker evenly among the executors with no remainder.
+    // Otherwise, we launch a single executor that grabs all the assignedCores 
on this worker.
+    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
+    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
+    for (i <- 1 to numExecutors) {
+      val exec = app.addExecutor(worker, coresToAssign)
       launchExecutor(worker, exec)
       app.state = ApplicationState.RUNNING
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to