Repository: spark
Updated Branches:
  refs/heads/master c564b2744 -> ef062c159


[SPARK-9731] Standalone scheduling incorrect cores if spark.executor.cores is 
not set

The issue only happens if `spark.executor.cores` is not set and executor memory 
is set to a high value.
For example, if we have a worker with 4G and 10 cores and we set 
`spark.executor.memory` to 3G, then only 1 core is assigned to the executor. 
The correct number should be 10 cores.
I've added a unit test to illustrate the issue.

Author: Carson Wang <carson.w...@intel.com>

Closes #8017 from carsonwang/SPARK-9731 and squashes the following commits:

d09ec48 [Carson Wang] Fix code style
86b651f [Carson Wang] Simplify the code
943cc4c [Carson Wang] fix scheduling correct cores to executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef062c15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef062c15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef062c15

Branch: refs/heads/master
Commit: ef062c15992b0d08554495b8ea837bef3fabf6e9
Parents: c564b27
Author: Carson Wang <carson.w...@intel.com>
Authored: Fri Aug 7 23:36:26 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Aug 7 23:36:26 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala | 26 +++++++++++---------
 .../spark/deploy/master/MasterSuite.scala       | 15 +++++++++++
 2 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ef062c15/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e38e437..9217202 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -581,20 +581,22 @@ private[deploy] class Master(
 
     /** Return whether the specified worker can launch an executor for this 
app. */
     def canLaunchExecutor(pos: Int): Boolean = {
+      val keepScheduling = coresToAssign >= minCoresPerExecutor
+      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= 
minCoresPerExecutor
+
       // If we allow multiple executors per worker, then we can always launch 
new executors.
-      // Otherwise, we may have already started assigning cores to the 
executor on this worker.
+      // Otherwise, if there is already an executor on this worker, just give 
it more cores.
       val launchingNewExecutor = !oneExecutorPerWorker || 
assignedExecutors(pos) == 0
-      val underLimit =
-        if (launchingNewExecutor) {
-          assignedExecutors.sum + app.executors.size < app.executorLimit
-        } else {
-          true
-        }
-      val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
-      usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor &&
-      usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor 
&&
-      coresToAssign >= minCoresPerExecutor &&
-      underLimit
+      if (launchingNewExecutor) {
+        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
+        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= 
memoryPerExecutor
+        val underLimit = assignedExecutors.sum + app.executors.size < 
app.executorLimit
+        keepScheduling && enoughCores && enoughMemory && underLimit
+      } else {
+        // We're adding cores to an existing executor, so no need
+        // to check memory and executor limits
+        keepScheduling && enoughCores
+      }
     }
 
     // Keep launching executors until no more workers can accommodate any

http://git-wip-us.apache.org/repos/asf/spark/blob/ef062c15/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index ae0e037..20d0201 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -151,6 +151,14 @@ class MasterSuite extends SparkFunSuite with Matchers with 
Eventually with Priva
     basicScheduling(spreadOut = false)
   }
 
+  test("basic scheduling with more memory - spread out") {
+    basicSchedulingWithMoreMemory(spreadOut = true)
+  }
+
+  test("basic scheduling with more memory - no spread out") {
+    basicSchedulingWithMoreMemory(spreadOut = false)
+  }
+
   test("scheduling with max cores - spread out") {
     schedulingWithMaxCores(spreadOut = true)
   }
@@ -214,6 +222,13 @@ class MasterSuite extends SparkFunSuite with Matchers with 
Eventually with Priva
     assert(scheduledCores === Array(10, 10, 10))
   }
 
+  private def basicSchedulingWithMoreMemory(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(3072)
+    val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, 
workerInfos, spreadOut)
+    assert(scheduledCores === Array(10, 10, 10))
+  }
+
   private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
     val master = makeMaster()
     val appInfo1 = makeAppInfo(1024, maxCores = Some(8))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to