Repository: spark Updated Branches: refs/heads/master e5431f2cf -> 4329eb2e7
[SPARK-16944][Mesos] Improve data locality when launching new executors when dynamic allocation is enabled ## What changes were proposed in this pull request? Improve the Spark-Mesos coarse-grained scheduler to consider the preferred locations when dynamic allocation is enabled. ## How was this patch tested? Added a unittest, and performed manual testing on AWS. Author: Gene Pang <gene.p...@gmail.com> Closes #18098 from gpang/mesos_data_locality. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4329eb2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4329eb2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4329eb2e Branch: refs/heads/master Commit: 4329eb2e73181819bb712f57ca9c7feac0d640ea Parents: e5431f2 Author: Gene Pang <gene.p...@gmail.com> Authored: Mon Oct 2 15:09:11 2017 -0700 Committer: haoyuan <haoy...@tachyonnexus.com> Committed: Mon Oct 2 15:09:11 2017 -0700 ---------------------------------------------------------------------- .../apache/spark/internal/config/package.scala | 4 ++ .../apache/spark/scheduler/TaskSetManager.scala | 6 +- .../MesosCoarseGrainedSchedulerBackend.scala | 52 ++++++++++++++-- ...esosCoarseGrainedSchedulerBackendSuite.scala | 62 ++++++++++++++++++++ .../spark/scheduler/cluster/mesos/Utils.scala | 6 ++ 5 files changed, 123 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 44a2815..d85b6a0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -72,6 +72,10 @@ package object config { private[spark] val DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue) + private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("3s") + private[spark] val SHUFFLE_SERVICE_ENABLED = ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false) http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index bb86741..3bdede6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.TaskState.TaskState -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.scheduler.SchedulingMode._ import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils} import org.apache.spark.util.collection.MedianHeap @@ -980,7 +980,7 @@ private[spark] class TaskSetManager( } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { - val defaultWait = conf.get("spark.locality.wait", "3s") + val defaultWait = conf.get(config.LOCALITY_WAIT) val localityWaitKey = level match { case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process" case TaskLocality.NODE_LOCAL => "spark.locality.wait.node" @@ -989,7 +989,7 @@ private[spark] class TaskSetManager( } if (localityWaitKey != null) { - conf.getTimeAsMs(localityWaitKey, defaultWait) + conf.getTimeAsMs(localityWaitKey, defaultWait.toString) } else { 0L } http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 2669987..80c0a04 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -99,6 +99,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private var totalCoresAcquired = 0 private var totalGpusAcquired = 0 + // The amount of time to wait for locality scheduling + private val localityWait = conf.get(config.LOCALITY_WAIT) + // The start of the waiting, for data local scheduling + private var localityWaitStartTime = System.currentTimeMillis() + // If true, the scheduler is in the process of launching executors to reach the requested + // executor limit + private var launchingExecutors = false + // SlaveID -> Slave // This map accumulates entries for the duration of the job. Slaves are never deleted, because // we need to maintain e.g. failure state and connection state. @@ -311,6 +319,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( return } + if (numExecutors >= executorLimit) { + logDebug("Executor limit reached. numExecutors: " + numExecutors + + " executorLimit: " + executorLimit) + offers.asScala.map(_.getId).foreach(d.declineOffer) + launchingExecutors = false + return + } else { + if (!launchingExecutors) { + launchingExecutors = true + localityWaitStartTime = System.currentTimeMillis() + } + } + logDebug(s"Received ${offers.size} resource offers.") val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => @@ -413,7 +434,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerId = offer.getId.getValue val resources = remainingResources(offerId) - if (canLaunchTask(slaveId, resources)) { + if (canLaunchTask(slaveId, offer.getHostname, resources)) { // Create a task launchTasks = true val taskId = newMesosTaskId() @@ -477,7 +498,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse) } - private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = { + private def canLaunchTask(slaveId: String, offerHostname: String, + resources: JList[Resource]): Boolean = { val offerMem = getResource(resources, "mem") val offerCPUs = getResource(resources, "cpus").toInt val cpus = executorCores(offerCPUs) @@ -489,9 +511,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus <= offerCPUs && cpus + totalCoresAcquired <= maxCores && mem <= offerMem && - numExecutors() < executorLimit && + numExecutors < executorLimit && slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && - meetsPortRequirements + meetsPortRequirements && + satisfiesLocality(offerHostname) } private def executorCores(offerCPUs: Int): Int = { @@ -500,6 +523,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { + if (!Utils.isDynamicAllocationEnabled(conf) || hostToLocalTaskCount.isEmpty) { + return true + } + + // Check the locality information + val currentHosts = slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet + val allDesiredHosts = hostToLocalTaskCount.keys.toSet + // Try to match locality for hosts which do not have executors yet, to potentially + // increase coverage. + val remainingHosts = allDesiredHosts -- currentHosts + if (!remainingHosts.contains(offerHostname) && + (System.currentTimeMillis() - localityWaitStartTime <= localityWait)) { + logDebug("Skipping host and waiting for locality. host: " + offerHostname) + return false + } + return true + } + override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) { val taskId = status.getTaskId.getValue val slaveId = status.getSlaveId.getValue @@ -646,6 +688,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // since at coarse grain it depends on the amount of slaves available. logInfo("Capping the total amount of executors to " + requestedTotal) executorLimitOption = Some(requestedTotal) + // Update the locality wait start time to continue trying for locality. + localityWaitStartTime = System.currentTimeMillis() true } http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index f6bae01..6c40792 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -604,6 +604,55 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(backend.isReady) } + test("supports data locality with dynamic allocation") { + setBackend(Map( + "spark.dynamicAllocation.enabled" -> "true", + "spark.dynamicAllocation.testing" -> "true", + "spark.locality.wait" -> "1s")) + + assert(backend.getExecutorIds().isEmpty) + + backend.requestTotalExecutors(2, 2, Map("hosts10" -> 1, "hosts11" -> 1)) + + // Offer non-local resources, which should be rejected + offerResourcesAndVerify(1, false) + offerResourcesAndVerify(2, false) + + // Offer local resource + offerResourcesAndVerify(10, true) + + // Wait longer than spark.locality.wait + Thread.sleep(2000) + + // Offer non-local resource, which should be accepted + offerResourcesAndVerify(1, true) + + // Update total executors + backend.requestTotalExecutors(3, 3, Map("hosts10" -> 1, "hosts11" -> 1, "hosts12" -> 1)) + + // Offer non-local resources, which should be rejected + offerResourcesAndVerify(3, false) + + // Wait longer than spark.locality.wait + Thread.sleep(2000) + + // Update total executors + backend.requestTotalExecutors(4, 4, Map("hosts10" -> 1, "hosts11" -> 1, "hosts12" -> 1, + "hosts13" -> 1)) + + // Offer non-local resources, which should be rejected + offerResourcesAndVerify(3, false) + + // Offer local resource + offerResourcesAndVerify(13, true) + + // Wait longer than spark.locality.wait + Thread.sleep(2000) + + // Offer non-local resource, which should be accepted + offerResourcesAndVerify(2, true) + } + private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = { @@ -631,6 +680,19 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.resourceOffers(driver, mesosOffers.asJava) } + private def offerResourcesAndVerify(id: Int, expectAccept: Boolean): Unit = { + offerResources(List(Resources(backend.executorMemory(sc), 1)), id) + if (expectAccept) { + val numExecutors = backend.getExecutorIds().size + val launchedTasks = verifyTaskLaunched(driver, s"o$id") + assert(s"s$id" == launchedTasks.head.getSlaveId.getValue) + registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1) + assert(backend.getExecutorIds().size == numExecutors + 1) + } else { + verifyTaskNotLaunched(driver, s"o$id") + } + } + private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = { TaskStatus.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId).build()) http://git-wip-us.apache.org/repos/asf/spark/blob/4329eb2e/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala index 2a67cbc..833db0c 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -84,6 +84,12 @@ object Utils { captor.getValue.asScala.toList } + def verifyTaskNotLaunched(driver: SchedulerDriver, offerId: String): Unit = { + verify(driver, times(0)).launchTasks( + Matchers.eq(Collections.singleton(createOfferId(offerId))), + Matchers.any(classOf[java.util.Collection[TaskInfo]])) + } + def createOfferId(offerId: String): OfferID = { OfferID.newBuilder().setValue(offerId).build() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org