Repository: spark
Updated Branches:
  refs/heads/master e5431f2cf -> 4329eb2e7


[SPARK-16944][Mesos] Improve data locality when launching new executors when 
dynamic allocation is enabled

## What changes were proposed in this pull request?

Improve the Spark-Mesos coarse-grained scheduler to consider the preferred 
locations when dynamic allocation is enabled.

## How was this patch tested?

Added a unittest, and performed manual testing on AWS.

Author: Gene Pang <gene.p...@gmail.com>

Closes #18098 from gpang/mesos_data_locality.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4329eb2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4329eb2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4329eb2e

Branch: refs/heads/master
Commit: 4329eb2e73181819bb712f57ca9c7feac0d640ea
Parents: e5431f2
Author: Gene Pang <gene.p...@gmail.com>
Authored: Mon Oct 2 15:09:11 2017 -0700
Committer: haoyuan <haoy...@tachyonnexus.com>
Committed: Mon Oct 2 15:09:11 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |  4 ++
 .../apache/spark/scheduler/TaskSetManager.scala |  6 +-
 .../MesosCoarseGrainedSchedulerBackend.scala    | 52 ++++++++++++++--
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 62 ++++++++++++++++++++
 .../spark/scheduler/cluster/mesos/Utils.scala   |  6 ++
 5 files changed, 123 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 44a2815..d85b6a0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -72,6 +72,10 @@ package object config {
   private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
     
ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
 
+  private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
+    .timeConf(TimeUnit.MILLISECONDS)
+    .createWithDefaultString("3s")
+
   private[spark] val SHUFFLE_SERVICE_ENABLED =
     
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index bb86741..3bdede6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.scheduler.SchedulingMode._
 import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils}
 import org.apache.spark.util.collection.MedianHeap
@@ -980,7 +980,7 @@ private[spark] class TaskSetManager(
   }
 
   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
-    val defaultWait = conf.get("spark.locality.wait", "3s")
+    val defaultWait = conf.get(config.LOCALITY_WAIT)
     val localityWaitKey = level match {
       case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
       case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
@@ -989,7 +989,7 @@ private[spark] class TaskSetManager(
     }
 
     if (localityWaitKey != null) {
-      conf.getTimeAsMs(localityWaitKey, defaultWait)
+      conf.getTimeAsMs(localityWaitKey, defaultWait.toString)
     } else {
       0L
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 2669987..80c0a04 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -99,6 +99,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private var totalCoresAcquired = 0
   private var totalGpusAcquired = 0
 
+  // The amount of time to wait for locality scheduling
+  private val localityWait = conf.get(config.LOCALITY_WAIT)
+  // The start of the waiting, for data local scheduling
+  private var localityWaitStartTime = System.currentTimeMillis()
+  // If true, the scheduler is in the process of launching executors to reach 
the requested
+  // executor limit
+  private var launchingExecutors = false
+
   // SlaveID -> Slave
   // This map accumulates entries for the duration of the job.  Slaves are 
never deleted, because
   // we need to maintain e.g. failure state and connection state.
@@ -311,6 +319,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
         return
       }
 
+      if (numExecutors >= executorLimit) {
+        logDebug("Executor limit reached. numExecutors: " + numExecutors +
+          " executorLimit: " + executorLimit)
+        offers.asScala.map(_.getId).foreach(d.declineOffer)
+        launchingExecutors = false
+        return
+      } else {
+        if (!launchingExecutors) {
+          launchingExecutors = true
+          localityWaitStartTime = System.currentTimeMillis()
+        }
+      }
+
       logDebug(s"Received ${offers.size} resource offers.")
 
       val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer 
=>
@@ -413,7 +434,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
         val offerId = offer.getId.getValue
         val resources = remainingResources(offerId)
 
-        if (canLaunchTask(slaveId, resources)) {
+        if (canLaunchTask(slaveId, offer.getHostname, resources)) {
           // Create a task
           launchTasks = true
           val taskId = newMesosTaskId()
@@ -477,7 +498,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ 
gpuResourcesToUse)
   }
 
-  private def canLaunchTask(slaveId: String, resources: JList[Resource]): 
Boolean = {
+  private def canLaunchTask(slaveId: String, offerHostname: String,
+                            resources: JList[Resource]): Boolean = {
     val offerMem = getResource(resources, "mem")
     val offerCPUs = getResource(resources, "cpus").toInt
     val cpus = executorCores(offerCPUs)
@@ -489,9 +511,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       cpus <= offerCPUs &&
       cpus + totalCoresAcquired <= maxCores &&
       mem <= offerMem &&
-      numExecutors() < executorLimit &&
+      numExecutors < executorLimit &&
       slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < 
MAX_SLAVE_FAILURES &&
-      meetsPortRequirements
+      meetsPortRequirements &&
+      satisfiesLocality(offerHostname)
   }
 
   private def executorCores(offerCPUs: Int): Int = {
@@ -500,6 +523,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+    if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+      return true
+    }
+
+    // Check the locality information
+    val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+    val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+    // Try to match locality for hosts which do not have executors yet, to 
potentially
+    // increase coverage.
+    val remainingHosts = allDesiredHosts -- currentHosts
+    if (!remainingHosts.contains(offerHostname) &&
+      (System.currentTimeMillis() - localityWaitStartTime <= localityWait)) {
+      logDebug("Skipping host and waiting for locality. host: " + 
offerHostname)
+      return false
+    }
+    return true
+  }
+
   override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: 
TaskStatus) {
     val taskId = status.getTaskId.getValue
     val slaveId = status.getSlaveId.getValue
@@ -646,6 +688,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     // since at coarse grain it depends on the amount of slaves available.
     logInfo("Capping the total amount of executors to " + requestedTotal)
     executorLimitOption = Some(requestedTotal)
+    // Update the locality wait start time to continue trying for locality.
+    localityWaitStartTime = System.currentTimeMillis()
     true
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index f6bae01..6c40792 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -604,6 +604,55 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     assert(backend.isReady)
   }
 
+  test("supports data locality with dynamic allocation") {
+    setBackend(Map(
+      "spark.dynamicAllocation.enabled" -> "true",
+      "spark.dynamicAllocation.testing" -> "true",
+      "spark.locality.wait" -> "1s"))
+
+    assert(backend.getExecutorIds().isEmpty)
+
+    backend.requestTotalExecutors(2, 2, Map("hosts10" -> 1, "hosts11" -> 1))
+
+    // Offer non-local resources, which should be rejected
+    offerResourcesAndVerify(1, false)
+    offerResourcesAndVerify(2, false)
+
+    // Offer local resource
+    offerResourcesAndVerify(10, true)
+
+    // Wait longer than spark.locality.wait
+    Thread.sleep(2000)
+
+    // Offer non-local resource, which should be accepted
+    offerResourcesAndVerify(1, true)
+
+    // Update total executors
+    backend.requestTotalExecutors(3, 3, Map("hosts10" -> 1, "hosts11" -> 1, 
"hosts12" -> 1))
+
+    // Offer non-local resources, which should be rejected
+    offerResourcesAndVerify(3, false)
+
+    // Wait longer than spark.locality.wait
+    Thread.sleep(2000)
+
+    // Update total executors
+    backend.requestTotalExecutors(4, 4, Map("hosts10" -> 1, "hosts11" -> 1, 
"hosts12" -> 1,
+      "hosts13" -> 1))
+
+    // Offer non-local resources, which should be rejected
+    offerResourcesAndVerify(3, false)
+
+    // Offer local resource
+    offerResourcesAndVerify(13, true)
+
+    // Wait longer than spark.locality.wait
+    Thread.sleep(2000)
+
+    // Offer non-local resource, which should be accepted
+    offerResourcesAndVerify(2, true)
+  }
+
   private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
 
   private def registerMockExecutor(executorId: String, slaveId: String, cores: 
Integer) = {
@@ -631,6 +680,19 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     backend.resourceOffers(driver, mesosOffers.asJava)
   }
 
+  private def offerResourcesAndVerify(id: Int, expectAccept: Boolean): Unit = {
+    offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+    if (expectAccept) {
+      val numExecutors = backend.getExecutorIds().size
+      val launchedTasks = verifyTaskLaunched(driver, s"o$id")
+      assert(s"s$id" == launchedTasks.head.getSlaveId.getValue)
+      registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1)
+      assert(backend.getExecutorIds().size == numExecutors + 1)
+    } else {
+      verifyTaskNotLaunched(driver, s"o$id")
+    }
+  }
+
   private def createTaskStatus(taskId: String, slaveId: String, state: 
TaskState): TaskStatus = {
     TaskStatus.newBuilder()
       .setTaskId(TaskID.newBuilder().setValue(taskId).build())

http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 2a67cbc..833db0c 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -84,6 +84,12 @@ object Utils {
     captor.getValue.asScala.toList
   }
 
+  def verifyTaskNotLaunched(driver: SchedulerDriver, offerId: String): Unit = {
+    verify(driver, times(0)).launchTasks(
+      Matchers.eq(Collections.singleton(createOfferId(offerId))),
+      Matchers.any(classOf[java.util.Collection[TaskInfo]]))
+  }
+
   def createOfferId(offerId: String): OfferID = {
     OfferID.newBuilder().setValue(offerId).build()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to