Repository: spark Updated Branches: refs/heads/master e4f4886d7 -> 558962a83
[SPARK-3411] Improve load-balancing of concurrently-submitted drivers across workers If the waiting driver array is too big, the drivers in it will be dispatched to the first worker we get(if it has enough resources), with or without the Randomization. We should do randomization every time we dispatch a driver, in order to better balance drivers. Author: WangTaoTheTonic <barneystin...@aliyun.com> Author: WangTao <barneystin...@aliyun.com> Closes #1106 from WangTaoTheTonic/fixBalanceDrivers and squashes the following commits: d1a928b [WangTaoTheTonic] Minor adjustment b6560cf [WangTaoTheTonic] solve the shuffle problem for HashSet f674e59 [WangTaoTheTonic] add comment and minor fix 2835929 [WangTao] solve the failed test and avoid filtering 2ca3091 [WangTao] fix checkstyle bc91bb1 [WangTao] Avoid shuffle every time we schedule the driver using round robin bbc7087 [WangTaoTheTonic] Optimize the schedule in Master Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/558962a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/558962a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/558962a8 Branch: refs/heads/master Commit: 558962a83fb0758ab5c13ff4ea58cc96c29cbbcc Parents: e4f4886 Author: WangTaoTheTonic <barneystin...@aliyun.com> Authored: Wed Sep 10 13:06:47 2014 -0700 Committer: Andrew Or <andrewo...@gmail.com> Committed: Wed Sep 10 13:06:47 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/master/Master.scala | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/558962a8/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a3909d6..2a3bd6b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -487,13 +487,25 @@ private[spark] class Master( if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications - val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers - for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers + // Randomization helps balance drivers + val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) + val aliveWorkerNum = shuffledAliveWorkers.size + var curPos = 0 + for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers + // We assign workers to each waiting driver in a round-robin fashion. For each driver, we + // start from the last worker that was assigned a driver, and continue onwards until we have + // explored all alive workers. + curPos = (curPos + 1) % aliveWorkerNum + val startPos = curPos + var launched = false + while (curPos != startPos && !launched) { + val worker = shuffledAliveWorkers(curPos) if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver + launched = true } + curPos = (curPos + 1) % aliveWorkerNum } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org