Repository: spark
Updated Branches:
  refs/heads/branch-1.6 2459b3432 -> 74f50275e


[SPARK-10471][CORE][MESOS] prevent getting offers for unmet constraints

this change rejects offers for slaves with unmet constraints for 120s to 
mitigate offer starvation.
this prevents mesos to send us these offers again and again.
in return, we get more offers for slaves which might meet our constraints.
and it enables mesos to send the rejected offers to other frameworks.

Author: Felix Bechstein <felix.bechst...@otto.de>

Closes #8639 from felixb/decline_offers_constraint_mismatch.

(cherry picked from commit 5039a49b636325f321daa089971107003fae9d4b)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74f50275
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74f50275
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74f50275

Branch: refs/heads/branch-1.6
Commit: 74f50275e429e649212928a9f36552941b862edc
Parents: 2459b34
Author: Felix Bechstein <felix.bechst...@otto.de>
Authored: Mon Nov 9 13:36:14 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Nov 9 13:36:28 2015 -0800

----------------------------------------------------------------------
 .../mesos/CoarseMesosSchedulerBackend.scala     | 92 +++++++++++---------
 .../cluster/mesos/MesosSchedulerBackend.scala   | 48 +++++++---
 .../cluster/mesos/MesosSchedulerUtils.scala     |  4 +
 3 files changed, 91 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74f50275/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d10a77f..2de9b6a 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -101,6 +101,10 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val slaveOfferConstraints =
     parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
 
+  // reject offers with mismatched constraints in seconds
+  private val rejectOfferDurationForUnmetConstraints =
+    getRejectOfferDurationForUnmetConstraints(sc)
+
   // A client for talking to the external shuffle service, if it is a
   private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = 
{
     if (shuffleServiceEnabled) {
@@ -249,48 +253,56 @@ private[spark] class CoarseMesosSchedulerBackend(
         val mem = getResource(offer.getResourcesList, "mem")
         val cpus = getResource(offer.getResourcesList, "cpus").toInt
         val id = offer.getId.getValue
-        if (taskIdToSlaveId.size < executorLimit &&
-            totalCoresAcquired < maxCores &&
-            meetsConstraints &&
-            mem >= calculateTotalMemory(sc) &&
-            cpus >= 1 &&
-            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
-            !slaveIdsWithExecutors.contains(slaveId)) {
-          // Launch an executor on the slave
-          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-          totalCoresAcquired += cpusToUse
-          val taskId = newMesosTaskId()
-          taskIdToSlaveId.put(taskId, slaveId)
-          slaveIdsWithExecutors += slaveId
-          coresByTaskId(taskId) = cpusToUse
-          // Gather cpu resources from the available resources and use them in 
the task.
-          val (remainingResources, cpuResourcesToUse) =
-            partitionResources(offer.getResourcesList, "cpus", cpusToUse)
-          val (_, memResourcesToUse) =
-            partitionResources(remainingResources.asJava, "mem", 
calculateTotalMemory(sc))
-          val taskBuilder = MesosTaskInfo.newBuilder()
-            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
-            .setSlaveId(offer.getSlaveId)
-            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, 
taskId))
-            .setName("Task " + taskId)
-            .addAllResources(cpuResourcesToUse.asJava)
-            .addAllResources(memResourcesToUse.asJava)
-
-          sc.conf.getOption("spark.mesos.executor.docker.image").foreach { 
image =>
-            MesosSchedulerBackendUtil
-              .setupContainerBuilderDockerInfo(image, sc.conf, 
taskBuilder.getContainerBuilder())
+        if (meetsConstraints) {
+          if (taskIdToSlaveId.size < executorLimit &&
+              totalCoresAcquired < maxCores &&
+              mem >= calculateTotalMemory(sc) &&
+              cpus >= 1 &&
+              failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
+              !slaveIdsWithExecutors.contains(slaveId)) {
+            // Launch an executor on the slave
+            val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+            totalCoresAcquired += cpusToUse
+            val taskId = newMesosTaskId()
+            taskIdToSlaveId.put(taskId, slaveId)
+            slaveIdsWithExecutors += slaveId
+            coresByTaskId(taskId) = cpusToUse
+            // Gather cpu resources from the available resources and use them 
in the task.
+            val (remainingResources, cpuResourcesToUse) =
+              partitionResources(offer.getResourcesList, "cpus", cpusToUse)
+            val (_, memResourcesToUse) =
+              partitionResources(remainingResources.asJava, "mem", 
calculateTotalMemory(sc))
+            val taskBuilder = MesosTaskInfo.newBuilder()
+              .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+              .setSlaveId(offer.getSlaveId)
+              .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, 
taskId))
+              .setName("Task " + taskId)
+              .addAllResources(cpuResourcesToUse.asJava)
+              .addAllResources(memResourcesToUse.asJava)
+
+            sc.conf.getOption("spark.mesos.executor.docker.image").foreach { 
image =>
+              MesosSchedulerBackendUtil
+                .setupContainerBuilderDockerInfo(image, sc.conf, 
taskBuilder.getContainerBuilder())
+            }
+
+            // Accept the offer and launch the task
+            logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
+            slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
+            d.launchTasks(
+              Collections.singleton(offer.getId),
+              Collections.singleton(taskBuilder.build()), filters)
+          } else {
+            // Decline the offer
+            logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
+            d.declineOffer(offer.getId)
           }
-
-          // accept the offer and launch the task
-          logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
-          slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
-          d.launchTasks(
-            Collections.singleton(offer.getId),
-            Collections.singleton(taskBuilder.build()), filters)
         } else {
-          // Decline the offer
-          logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
-          d.declineOffer(offer.getId)
+          // This offer does not meet constraints. We don't need to see it 
again.
+          // Decline the offer for a long period of time.
+          logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus"
+              + s" for $rejectOfferDurationForUnmetConstraints seconds")
+          d.declineOffer(offer.getId, Filters.newBuilder()
+            .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/74f50275/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index aaffac6..281965a 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -63,6 +63,10 @@ private[spark] class MesosSchedulerBackend(
   private[this] val slaveOfferConstraints =
     parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
 
+  // reject offers with mismatched constraints in seconds
+  private val rejectOfferDurationForUnmetConstraints =
+    getRejectOfferDurationForUnmetConstraints(sc)
+
   @volatile var appId: String = _
 
   override def start() {
@@ -212,29 +216,47 @@ private[spark] class MesosSchedulerBackend(
    */
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
     inClassLoader() {
-      // Fail-fast on offers we know will be rejected
-      val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
+      // Fail first on offers with unmet constraints
+      val (offersMatchingConstraints, offersNotMatchingConstraints) =
+        offers.asScala.partition { o =>
+          val offerAttributes = toAttributeMap(o.getAttributesList)
+          val meetsConstraints =
+            matchesAttributeRequirements(slaveOfferConstraints, 
offerAttributes)
+
+          // add some debug messaging
+          if (!meetsConstraints) {
+            val id = o.getId.getValue
+            logDebug(s"Declining offer: $id with attributes: $offerAttributes")
+          }
+
+          meetsConstraints
+        }
+
+      // These offers do not meet constraints. We don't need to see them again.
+      // Decline the offer for a long period of time.
+      offersNotMatchingConstraints.foreach { o =>
+        d.declineOffer(o.getId, Filters.newBuilder()
+          .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
+      }
+
+      // Of the matching constraints, see which ones give us enough memory and 
cores
+      val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition 
{ o =>
         val mem = getResource(o.getResourcesList, "mem")
         val cpus = getResource(o.getResourcesList, "cpus")
         val slaveId = o.getSlaveId.getValue
         val offerAttributes = toAttributeMap(o.getAttributesList)
 
-        // check if all constraints are satisfield
-        //  1. Attribute constraints
-        //  2. Memory requirements
-        //  3. CPU requirements - need at least 1 for executor, 1 for task
-        val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+        // check offers for
+        //  1. Memory requirements
+        //  2. CPU requirements - need at least 1 for executor, 1 for task
         val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
         val meetsCPURequirements = cpus >= (mesosExecutorCores + 
scheduler.CPUS_PER_TASK)
-
         val meetsRequirements =
-          (meetsConstraints && meetsMemoryRequirements && 
meetsCPURequirements) ||
+          (meetsMemoryRequirements && meetsCPURequirements) ||
           (slaveIdToExecutorInfo.contains(slaveId) && cpus >= 
scheduler.CPUS_PER_TASK)
-
-        // add some debug messaging
         val debugstr = if (meetsRequirements) "Accepting" else "Declining"
-        val id = o.getId.getValue
-        logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: 
$mem cpu: $cpus")
+        logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+          + s"$offerAttributes mem: $mem cpu: $cpus")
 
         meetsRequirements
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/74f50275/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 860c8e0..721861f 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -336,4 +336,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
     }
   }
 
+  protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): 
Long = {
+    
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to