Repository: spark
Updated Branches:
  refs/heads/master cdce4e62a -> eb019af9a


[SPARK-13001][CORE][MESOS] Prevent getting offers when reached max cores

Similar to https://github.com/apache/spark/pull/8639

This change rejects offers for 120s when reached `spark.cores.max` in 
coarse-grained mode to mitigate offer starvation. This prevents Mesos to send 
us offers again and again, starving other frameworks. This is especially 
problematic when running many small frameworks on the same Mesos cluster, e.g. 
many small Sparks streaming jobs, and cause the bigger spark jobs to stop 
receiving offers. By rejecting the offers for a long period of time, they 
become available to those other frameworks.

Author: Sebastien Rainville <sebast...@hopper.com>

Closes #10924 from sebastienrainville/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb019af9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb019af9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb019af9

Branch: refs/heads/master
Commit: eb019af9a9cadb127eab1b6d30312169ed90f808
Parents: cdce4e6
Author: Sebastien Rainville <sebast...@hopper.com>
Authored: Wed May 4 14:32:36 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed May 4 14:32:36 2016 -0700

----------------------------------------------------------------------
 .../mesos/CoarseMesosSchedulerBackend.scala     | 53 +++++++++++++-------
 .../cluster/mesos/MesosSchedulerUtils.scala     |  4 ++
 .../CoarseMesosSchedulerBackendSuite.scala      | 13 +++++
 3 files changed, 53 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eb019af9/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 50b452c..2c5be1f 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -109,10 +109,14 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val slaveOfferConstraints =
     parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
 
-  // reject offers with mismatched constraints in seconds
+  // Reject offers with mismatched constraints in seconds
   private val rejectOfferDurationForUnmetConstraints =
     getRejectOfferDurationForUnmetConstraints(sc)
 
+  // Reject offers when we reached the maximum number of cores for this 
framework
+  private val rejectOfferDurationForReachedMaxCores =
+    getRejectOfferDurationForReachedMaxCores(sc)
+
   // A client for talking to the external shuffle service
   private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = 
{
     if (shuffleServiceEnabled) {
@@ -279,18 +283,32 @@ private[spark] class CoarseMesosSchedulerBackend(
   }
 
   private def declineUnmatchedOffers(d: SchedulerDriver, offers: 
Buffer[Offer]): Unit = {
-    for (offer <- offers) {
-      val id = offer.getId.getValue
-      val offerAttributes = toAttributeMap(offer.getAttributesList)
-      val mem = getResource(offer.getResourcesList, "mem")
-      val cpus = getResource(offer.getResourcesList, "cpus")
-      val filters = Filters.newBuilder()
-        .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
-
-      logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: 
$mem cpu: $cpus"
-        + s" for $rejectOfferDurationForUnmetConstraints seconds")
+    offers.foreach { offer =>
+      declineOffer(d, offer, Some("unmet constraints"),
+        Some(rejectOfferDurationForUnmetConstraints))
+    }
+  }
 
-      d.declineOffer(offer.getId, filters)
+  private def declineOffer(
+      d: SchedulerDriver,
+      offer: Offer,
+      reason: Option[String] = None,
+      refuseSeconds: Option[Long] = None): Unit = {
+
+    val id = offer.getId.getValue
+    val offerAttributes = toAttributeMap(offer.getAttributesList)
+    val mem = getResource(offer.getResourcesList, "mem")
+    val cpus = getResource(offer.getResourcesList, "cpus")
+
+    logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: 
$mem" +
+      s" cpu: $cpus for $refuseSeconds seconds" +
+      reason.map(r => s" (reason: $r)").getOrElse(""))
+
+    refuseSeconds match {
+      case Some(seconds) =>
+        val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
+        d.declineOffer(offer.getId, filters)
+      case _ => d.declineOffer(offer.getId)
     }
   }
 
@@ -326,11 +344,12 @@ private[spark] class CoarseMesosSchedulerBackend(
         d.launchTasks(
           Collections.singleton(offer.getId),
           offerTasks.asJava)
-      } else { // decline
-        logDebug(s"Declining offer: $id with attributes: $offerAttributes " +
-          s"mem: $offerMem cpu: $offerCpus")
-
-        d.declineOffer(offer.getId)
+      } else if (totalCoresAcquired >= maxCores) {
+        // Reject an offer for a configurable amount of time to avoid starving 
other frameworks
+        declineOffer(d, offer, Some("reached spark.cores.max"),
+          Some(rejectOfferDurationForReachedMaxCores))
+      } else {
+        declineOffer(d, offer)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/eb019af9/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 1e322ac..7355ba3 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -352,4 +352,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
     
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): 
Long = {
+    
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/eb019af9/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index b18f0eb..15d59e7 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -147,6 +147,19 @@ class CoarseMesosSchedulerBackendSuite extends 
SparkFunSuite
     verifyDeclinedOffer(driver, createOfferId("o1"), true)
   }
 
+  test("mesos declines offers with a filter when reached spark.cores.max") {
+    val maxCores = 3
+    setBackend(Map("spark.cores.max" -> maxCores.toString))
+
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List(
+      (executorMemory, maxCores + 1),
+      (executorMemory, maxCores + 1)))
+
+    verifyTaskLaunched("o1")
+    verifyDeclinedOffer(driver, createOfferId("o2"), true)
+  }
+
   test("mesos assigns tasks round-robin on offers") {
     val executorCores = 4
     val maxCores = executorCores * 2


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to