Repository: spark Updated Branches: refs/heads/master b1f4b4abf -> 41a7cdf85
[SPARK-8881] [SPARK-9260] Fix algorithm for scheduling executors on workers Current scheduling algorithm allocates one core at a time and in doing so ends up ignoring spark.executor.cores. As a result, when spark.cores.max/spark.executor.cores (i.e, num_executors) < num_workers, executors are not launched and the app hangs. This PR fixes and refactors the scheduling algorithm. andrewor14 Author: Nishkam Ravi <nr...@cloudera.com> Author: nishkamravi2 <nishkamr...@gmail.com> Closes #7274 from nishkamravi2/master_scheduler and squashes the following commits: b998097 [nishkamravi2] Update Master.scala da0f491 [Nishkam Ravi] Update Master.scala 79084e8 [Nishkam Ravi] Update Master.scala 1daf25f [Nishkam Ravi] Update Master.scala f279cdf [Nishkam Ravi] Update Master.scala adec84b [Nishkam Ravi] Update Master.scala a06da76 [nishkamravi2] Update Master.scala 40c8f9f [nishkamravi2] Update Master.scala (to trigger retest) c11c689 [nishkamravi2] Update EventLoggingListenerSuite.scala 5d6a19c [nishkamravi2] Update Master.scala (for the purpose of issuing a retest) 2d6371c [Nishkam Ravi] Update Master.scala 66362d5 [nishkamravi2] Update Master.scala ee7cf0e [Nishkam Ravi] Improved scheduling algorithm for executors Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41a7cdf8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41a7cdf8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41a7cdf8 Branch: refs/heads/master Commit: 41a7cdf85de2d583d8b8759941a9d6c6e98cae4d Parents: b1f4b4a Author: Nishkam Ravi <nr...@cloudera.com> Authored: Sat Jul 25 22:56:25 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Sat Jul 25 22:56:25 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/master/Master.scala | 112 +++++++++++++------ 1 file changed, 75 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/41a7cdf8/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4615feb..029f94d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -541,6 +541,7 @@ private[master] class Master( /** * Schedule executors to be launched on the workers. + * Returns an array containing number of cores assigned to each worker. * * There are two modes of launching executors. The first attempts to spread out an application's * executors on as many workers as possible, while the second does the opposite (i.e. launch them @@ -551,39 +552,73 @@ private[master] class Master( * multiple executors from the same application may be launched on the same worker if the worker * has enough cores and memory. Otherwise, each executor grabs all the cores available on the * worker by default, in which case only one executor may be launched on each worker. + * + * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core + * at a time). Consider the following example: cluster has 4 workers with 16 cores each. + * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is + * allocated at a time, 12 cores from each worker would be assigned to each executor. + * Since 12 < 16, no executors would launch [SPARK-8881]. */ - private def startExecutorsOnWorkers(): Unit = { - // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app - // in the queue, then the second app, etc. - if (spreadOutApps) { - // Try to spread out each app among all the workers, until it has all its cores - for (app <- waitingApps if app.coresLeft > 0) { - val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) - .sortBy(_.coresFree).reverse - val numUsable = usableWorkers.length - val assigned = new Array[Int](numUsable) // Number of cores to give on each node - var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) - var pos = 0 - while (toAssign > 0) { - if (usableWorkers(pos).coresFree - assigned(pos) > 0) { - toAssign -= 1 - assigned(pos) += 1 + private[master] def scheduleExecutorsOnWorkers( + app: ApplicationInfo, + usableWorkers: Array[WorkerInfo], + spreadOutApps: Boolean): Array[Int] = { + // If the number of cores per executor is not specified, then we can just schedule + // 1 core at a time since we expect a single executor to be launched on each worker + val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) + val memoryPerExecutor = app.desc.memoryPerExecutorMB + val numUsable = usableWorkers.length + val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker + val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker + var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + var freeWorkers = (0 until numUsable).toIndexedSeq + + def canLaunchExecutor(pos: Int): Boolean = { + usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor && + usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor + } + + while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) { + freeWorkers = freeWorkers.filter(canLaunchExecutor) + freeWorkers.foreach { pos => + var keepScheduling = true + while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) { + coresToAssign -= coresPerExecutor + assignedCores(pos) += coresPerExecutor + assignedMemory(pos) += memoryPerExecutor + + // Spreading out an application means spreading out its executors across as + // many workers as possible. If we are not spreading out, then we should keep + // scheduling executors on this worker until we use all of its resources. + // Otherwise, just move on to the next worker. + if (spreadOutApps) { + keepScheduling = false } - pos = (pos + 1) % numUsable - } - // Now that we've decided how many cores to give on each node, let's actually give them - for (pos <- 0 until numUsable if assigned(pos) > 0) { - allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos)) } } - } else { - // Pack each app into as few workers as possible until we've assigned all its cores - for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (app <- waitingApps if app.coresLeft > 0) { - allocateWorkerResourceToExecutors(app, app.coresLeft, worker) - } + } + assignedCores + } + + /** + * Schedule and launch executors on workers + */ + private def startExecutorsOnWorkers(): Unit = { + // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app + // in the queue, then the second app, etc. + for (app <- waitingApps if app.coresLeft > 0) { + val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor + // Filter out workers that don't have enough resources to launch an executor + val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && + worker.coresFree >= coresPerExecutor.getOrElse(1)) + .sortBy(_.coresFree).reverse + val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) + + // Now that we've decided how many cores to allocate on each worker, let's allocate them + for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { + allocateWorkerResourceToExecutors( + app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } } @@ -591,19 +626,22 @@ private[master] class Master( /** * Allocate a worker's resources to one or more executors. * @param app the info of the application which the executors belong to - * @param coresToAllocate cores on this worker to be allocated to this application + * @param assignedCores number of cores on this worker for this application + * @param coresPerExecutor number of cores per executor * @param worker the worker info */ private def allocateWorkerResourceToExecutors( app: ApplicationInfo, - coresToAllocate: Int, + assignedCores: Int, + coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { - val memoryPerExecutor = app.desc.memoryPerExecutorMB - val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) - var coresLeft = coresToAllocate - while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { - val exec = app.addExecutor(worker, coresPerExecutor) - coresLeft -= coresPerExecutor + // If the number of cores per executor is specified, we divide the cores assigned + // to this worker evenly among the executors with no remainder. + // Otherwise, we launch a single executor that grabs all the assignedCores on this worker. + val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) + val coresToAssign = coresPerExecutor.getOrElse(assignedCores) + for (i <- 1 to numExecutors) { + val exec = app.addExecutor(worker, coresToAssign) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org