Repository: spark Updated Branches: refs/heads/master cdce4e62a -> eb019af9a
[SPARK-13001][CORE][MESOS] Prevent getting offers when reached max cores Similar to https://github.com/apache/spark/pull/8639 This change rejects offers for 120s when reached `spark.cores.max` in coarse-grained mode to mitigate offer starvation. This prevents Mesos to send us offers again and again, starving other frameworks. This is especially problematic when running many small frameworks on the same Mesos cluster, e.g. many small Sparks streaming jobs, and cause the bigger spark jobs to stop receiving offers. By rejecting the offers for a long period of time, they become available to those other frameworks. Author: Sebastien Rainville <sebast...@hopper.com> Closes #10924 from sebastienrainville/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb019af9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb019af9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb019af9 Branch: refs/heads/master Commit: eb019af9a9cadb127eab1b6d30312169ed90f808 Parents: cdce4e6 Author: Sebastien Rainville <sebast...@hopper.com> Authored: Wed May 4 14:32:36 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed May 4 14:32:36 2016 -0700 ---------------------------------------------------------------------- .../mesos/CoarseMesosSchedulerBackend.scala | 53 +++++++++++++------- .../cluster/mesos/MesosSchedulerUtils.scala | 4 ++ .../CoarseMesosSchedulerBackendSuite.scala | 13 +++++ 3 files changed, 53 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/eb019af9/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 50b452c..2c5be1f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -109,10 +109,14 @@ private[spark] class CoarseMesosSchedulerBackend( private val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) - // reject offers with mismatched constraints in seconds + // Reject offers with mismatched constraints in seconds private val rejectOfferDurationForUnmetConstraints = getRejectOfferDurationForUnmetConstraints(sc) + // Reject offers when we reached the maximum number of cores for this framework + private val rejectOfferDurationForReachedMaxCores = + getRejectOfferDurationForReachedMaxCores(sc) + // A client for talking to the external shuffle service private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { if (shuffleServiceEnabled) { @@ -279,18 +283,32 @@ private[spark] class CoarseMesosSchedulerBackend( } private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { - for (offer <- offers) { - val id = offer.getId.getValue - val offerAttributes = toAttributeMap(offer.getAttributesList) - val mem = getResource(offer.getResourcesList, "mem") - val cpus = getResource(offer.getResourcesList, "cpus") - val filters = Filters.newBuilder() - .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() - - logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" - + s" for $rejectOfferDurationForUnmetConstraints seconds") + offers.foreach { offer => + declineOffer(d, offer, Some("unmet constraints"), + Some(rejectOfferDurationForUnmetConstraints)) + } + } - d.declineOffer(offer.getId, filters) + private def declineOffer( + d: SchedulerDriver, + offer: Offer, + reason: Option[String] = None, + refuseSeconds: Option[Long] = None): Unit = { + + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" + + s" cpu: $cpus for $refuseSeconds seconds" + + reason.map(r => s" (reason: $r)").getOrElse("")) + + refuseSeconds match { + case Some(seconds) => + val filters = Filters.newBuilder().setRefuseSeconds(seconds).build() + d.declineOffer(offer.getId, filters) + case _ => d.declineOffer(offer.getId) } } @@ -326,11 +344,12 @@ private[spark] class CoarseMesosSchedulerBackend( d.launchTasks( Collections.singleton(offer.getId), offerTasks.asJava) - } else { // decline - logDebug(s"Declining offer: $id with attributes: $offerAttributes " + - s"mem: $offerMem cpu: $offerCpus") - - d.declineOffer(offer.getId) + } else if (totalCoresAcquired >= maxCores) { + // Reject an offer for a configurable amount of time to avoid starving other frameworks + declineOffer(d, offer, Some("reached spark.cores.max"), + Some(rejectOfferDurationForReachedMaxCores)) + } else { + declineOffer(d, offer) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/eb019af9/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 1e322ac..7355ba3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -352,4 +352,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = { + sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/eb019af9/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index b18f0eb..15d59e7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -147,6 +147,19 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite verifyDeclinedOffer(driver, createOfferId("o1"), true) } + test("mesos declines offers with a filter when reached spark.cores.max") { + val maxCores = 3 + setBackend(Map("spark.cores.max" -> maxCores.toString)) + + val executorMemory = backend.executorMemory(sc) + offerResources(List( + (executorMemory, maxCores + 1), + (executorMemory, maxCores + 1))) + + verifyTaskLaunched("o1") + verifyDeclinedOffer(driver, createOfferId("o2"), true) + } + test("mesos assigns tasks round-robin on offers") { val executorCores = 4 val maxCores = executorCores * 2 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org