Repository: spark
Updated Branches:
  refs/heads/master af3bc59d1 -> f0562e8cd


[SPARK-6350] [MESOS] Fine-grained mode scheduler respects mesosExecutor.cores

This is a regression introduced in #4960, this commit fixes it and adds a test.

tnachen andrewor14 please review, this should be an easy one.

Author: Iulian Dragos <jagua...@gmail.com>

Closes #8653 from dragos/issue/mesos/fine-grained-maxExecutorCores.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0562e8c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0562e8c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0562e8c

Branch: refs/heads/master
Commit: f0562e8cdbab7ce40f3186da98595312252f8b5c
Parents: af3bc59
Author: Iulian Dragos <jagua...@gmail.com>
Authored: Thu Sep 10 12:00:21 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Sep 10 12:00:21 2015 -0700

----------------------------------------------------------------------
 .../cluster/mesos/MesosSchedulerBackend.scala   |  3 +-
 .../mesos/MesosSchedulerBackendSuite.scala      | 33 +++++++++++++++++++-
 2 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f0562e8c/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 18da6d2..8edf700 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -32,7 +32,6 @@ import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.Utils
 
-
 /**
  * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task 
is mapped to a
  * separate Mesos task, allowing multiple applications to share cluster nodes 
both in space (tasks
@@ -127,7 +126,7 @@ private[spark] class MesosSchedulerBackend(
     }
     val builder = MesosExecutorInfo.newBuilder()
     val (resourcesAfterCpu, usedCpuResources) =
-      partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
+      partitionResources(availableResources, "cpus", mesosExecutorCores)
     val (resourcesAfterMem, usedMemResources) =
       partitionResources(resourcesAfterCpu.asJava, "mem", 
calculateTotalMemory(sc))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f0562e8c/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index 319b317..c4dc560 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -42,6 +42,38 @@ import org.apache.spark.{LocalSparkContext, SparkConf, 
SparkContext, SparkFunSui
 
 class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext 
with MockitoSugar {
 
+  test("Use configured mesosExecutor.cores for ExecutorInfo") {
+    val mesosExecutorCores = 3
+    val conf = new SparkConf
+    conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString)
+
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
+
+    val sc = mock[SparkContext]
+    when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+
+    when(sc.conf).thenReturn(conf)
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.listenerBus).thenReturn(listenerBus)
+    val taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+    val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, 
"master")
+
+    val resources = Arrays.asList(
+      mesosSchedulerBackend.createResource("cpus", 4),
+      mesosSchedulerBackend.createResource("mem", 1024))
+    // uri is null.
+    val (executorInfo, _) = 
mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
+    val executorResources = executorInfo.getResourcesList
+    val cpus = 
executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue
+
+    assert(cpus === mesosExecutorCores)
+  }
+
   test("check spark-class location correctly") {
     val conf = new SparkConf
     conf.set("spark.mesos.executor.home" , "/mesos-home")
@@ -263,7 +295,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
       .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
       .setHostname(s"host${id.toString}").build()
 
-
     val mesosOffers = new java.util.ArrayList[Offer]
     mesosOffers.add(offer)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to