Repository: spark Updated Branches: refs/heads/master c564b2744 -> ef062c159
[SPARK-9731] Standalone scheduling incorrect cores if spark.executor.cores is not set The issue only happens if `spark.executor.cores` is not set and executor memory is set to a high value. For example, if we have a worker with 4G and 10 cores and we set `spark.executor.memory` to 3G, then only 1 core is assigned to the executor. The correct number should be 10 cores. I've added a unit test to illustrate the issue. Author: Carson Wang <carson.w...@intel.com> Closes #8017 from carsonwang/SPARK-9731 and squashes the following commits: d09ec48 [Carson Wang] Fix code style 86b651f [Carson Wang] Simplify the code 943cc4c [Carson Wang] fix scheduling correct cores to executors Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef062c15 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef062c15 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef062c15 Branch: refs/heads/master Commit: ef062c15992b0d08554495b8ea837bef3fabf6e9 Parents: c564b27 Author: Carson Wang <carson.w...@intel.com> Authored: Fri Aug 7 23:36:26 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Fri Aug 7 23:36:26 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/master/Master.scala | 26 +++++++++++--------- .../spark/deploy/master/MasterSuite.scala | 15 +++++++++++ 2 files changed, 29 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ef062c15/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e38e437..9217202 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -581,20 +581,22 @@ private[deploy] class Master( /** Return whether the specified worker can launch an executor for this app. */ def canLaunchExecutor(pos: Int): Boolean = { + val keepScheduling = coresToAssign >= minCoresPerExecutor + val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor + // If we allow multiple executors per worker, then we can always launch new executors. - // Otherwise, we may have already started assigning cores to the executor on this worker. + // Otherwise, if there is already an executor on this worker, just give it more cores. val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 - val underLimit = - if (launchingNewExecutor) { - assignedExecutors.sum + app.executors.size < app.executorLimit - } else { - true - } - val assignedMemory = assignedExecutors(pos) * memoryPerExecutor - usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor && - usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor && - coresToAssign >= minCoresPerExecutor && - underLimit + if (launchingNewExecutor) { + val assignedMemory = assignedExecutors(pos) * memoryPerExecutor + val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor + val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit + keepScheduling && enoughCores && enoughMemory && underLimit + } else { + // We're adding cores to an existing executor, so no need + // to check memory and executor limits + keepScheduling && enoughCores + } } // Keep launching executors until no more workers can accommodate any http://git-wip-us.apache.org/repos/asf/spark/blob/ef062c15/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index ae0e037..20d0201 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -151,6 +151,14 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva basicScheduling(spreadOut = false) } + test("basic scheduling with more memory - spread out") { + basicSchedulingWithMoreMemory(spreadOut = true) + } + + test("basic scheduling with more memory - no spread out") { + basicSchedulingWithMoreMemory(spreadOut = false) + } + test("scheduling with max cores - spread out") { schedulingWithMaxCores(spreadOut = true) } @@ -214,6 +222,13 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva assert(scheduledCores === Array(10, 10, 10)) } + private def basicSchedulingWithMoreMemory(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo = makeAppInfo(3072) + val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + assert(scheduledCores === Array(10, 10, 10)) + } + private def schedulingWithMaxCores(spreadOut: Boolean): Unit = { val master = makeMaster() val appInfo1 = makeAppInfo(1024, maxCores = Some(8)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org