Repository: spark
Updated Branches:
  refs/heads/master e4f4886d7 -> 558962a83


[SPARK-3411] Improve load-balancing of concurrently-submitted drivers across 
workers

If the waiting driver array is too big, the drivers in it will be dispatched to 
the first worker we get(if it has enough resources), with or without the 
Randomization.

We should do randomization every time we dispatch a driver, in order to better 
balance drivers.

Author: WangTaoTheTonic <barneystin...@aliyun.com>
Author: WangTao <barneystin...@aliyun.com>

Closes #1106 from WangTaoTheTonic/fixBalanceDrivers and squashes the following 
commits:

d1a928b [WangTaoTheTonic] Minor adjustment
b6560cf [WangTaoTheTonic] solve the shuffle problem for HashSet
f674e59 [WangTaoTheTonic] add comment and minor fix
2835929 [WangTao] solve the failed test and avoid filtering
2ca3091 [WangTao] fix checkstyle
bc91bb1 [WangTao] Avoid shuffle every time we schedule the driver using round 
robin
bbc7087 [WangTaoTheTonic] Optimize the schedule in Master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/558962a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/558962a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/558962a8

Branch: refs/heads/master
Commit: 558962a83fb0758ab5c13ff4ea58cc96c29cbbcc
Parents: e4f4886
Author: WangTaoTheTonic <barneystin...@aliyun.com>
Authored: Wed Sep 10 13:06:47 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Wed Sep 10 13:06:47 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala   | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/558962a8/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a3909d6..2a3bd6b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -487,13 +487,25 @@ private[spark] class Master(
     if (state != RecoveryState.ALIVE) { return }
 
     // First schedule drivers, they take strict precedence over applications
-    val shuffledWorkers = Random.shuffle(workers) // Randomization helps 
balance drivers
-    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
-      for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of 
waitingDrivers
+    // Randomization helps balance drivers
+    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == 
WorkerState.ALIVE))
+    val aliveWorkerNum = shuffledAliveWorkers.size
+    var curPos = 0
+    for (driver <- waitingDrivers.toList) { // iterate over a copy of 
waitingDrivers
+      // We assign workers to each waiting driver in a round-robin fashion. 
For each driver, we
+      // start from the last worker that was assigned a driver, and continue 
onwards until we have
+      // explored all alive workers.
+      curPos = (curPos + 1) % aliveWorkerNum
+      val startPos = curPos
+      var launched = false
+      while (curPos != startPos && !launched) {
+        val worker = shuffledAliveWorkers(curPos)
         if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= 
driver.desc.cores) {
           launchDriver(worker, driver)
           waitingDrivers -= driver
+          launched = true
         }
+        curPos = (curPos + 1) % aliveWorkerNum
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to