Repository: spark
Updated Branches:
  refs/heads/master 9ff203346 -> 1165b17d2


[SPARK-6707] [CORE] [MESOS] Mesos Scheduler should allow the user to specify 
constraints based on slave attributes

Currently, the mesos scheduler only looks at the 'cpu' and 'mem' resources when 
trying to determine the usablility of a resource offer from a mesos slave node. 
It may be preferable for the user to be able to ensure that the spark jobs are 
only started on a certain set of nodes (based on attributes).

For example, If the user sets a property, let's say `spark.mesos.constraints` 
is set to `tachyon=true;us-east-1=false`, then the resource offers will be 
checked to see if they meet both these constraints and only then will be 
accepted to start new executors.

Author: Ankur Chauhan <achau...@brightcove.com>

Closes #5563 from ankurcha/mesos_attribs and squashes the following commits:

902535b [Ankur Chauhan] Fix line length
d83801c [Ankur Chauhan] Update code as per code review comments
8b73f2d [Ankur Chauhan] Fix imports
c3523e7 [Ankur Chauhan] Added docs
1a24d0b [Ankur Chauhan] Expand scope of attributes matching to include all data 
types
482fd71 [Ankur Chauhan] Update access modifier to private[this] for offer 
constraints
5ccc32d [Ankur Chauhan] Fix nit pick whitespace
1bce782 [Ankur Chauhan] Fix nit pick whitespace
c0cbc75 [Ankur Chauhan] Use offer id value for debug message
7fee0ea [Ankur Chauhan] Add debug statements
fc7eb5b [Ankur Chauhan] Fix import codestyle
00be252 [Ankur Chauhan] Style changes as per code review comments
662535f [Ankur Chauhan] Incorporate code review comments + use SparkFunSuite
fdc0937 [Ankur Chauhan] Decline offers that did not meet criteria
67b58a0 [Ankur Chauhan] Add documentation for spark.mesos.constraints
63f53f4 [Ankur Chauhan] Update codestyle - uniform style for config values
02031e4 [Ankur Chauhan] Fix scalastyle warnings in tests
c09ed84 [Ankur Chauhan] Fixed the access modifier on offerConstraints val to 
private[mesos]
0c64df6 [Ankur Chauhan] Rename overhead fractions to memory_*, fix spacing
8cc1e8f [Ankur Chauhan] Make exception message more explicit about the source 
of the error
addedba [Ankur Chauhan] Added test case for malformed constraint string
ec9d9a6 [Ankur Chauhan] Add tests for parse constraint string
72fe88a [Ankur Chauhan] Fix up tests + remove redundant method override, 
combine utility class into new mesos scheduler util trait
92b47fd [Ankur Chauhan] Add attributes based constraints support to 
MesosScheduler


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1165b17d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1165b17d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1165b17d

Branch: refs/heads/master
Commit: 1165b17d24cdf1dbebb2faca14308dfe5c2a652c
Parents: 9ff2033
Author: Ankur Chauhan <achau...@brightcove.com>
Authored: Mon Jul 6 16:04:57 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Jul 6 16:04:57 2015 -0700

----------------------------------------------------------------------
 .../mesos/CoarseMesosSchedulerBackend.scala     |  43 +++---
 .../scheduler/cluster/mesos/MemoryUtils.scala   |  31 ----
 .../cluster/mesos/MesosClusterScheduler.scala   |   1 +
 .../cluster/mesos/MesosSchedulerBackend.scala   |  62 +++++---
 .../cluster/mesos/MesosSchedulerUtils.scala     | 153 ++++++++++++++++++-
 .../cluster/mesos/MemoryUtilsSuite.scala        |  46 ------
 .../mesos/MesosSchedulerBackendSuite.scala      |   6 +-
 .../mesos/MesosSchedulerUtilsSuite.scala        | 140 +++++++++++++++++
 docs/running-on-mesos.md                        |  22 +++
 9 files changed, 376 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1165b17d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 6b8edca..b68f8c7 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,18 +18,18 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
-import java.util.{Collections, List => JList}
+import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
 
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" 
tasks, where it holds
@@ -66,6 +66,10 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
 
+  // Offer constraints
+  private val slaveOfferConstraints =
+    parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
   var nextMesosTaskId = 0
 
   @volatile var appId: String = _
@@ -170,13 +174,16 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
     synchronized {
       val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-
       for (offer <- offers) {
+        val offerAttributes = toAttributeMap(offer.getAttributesList)
+        val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
         val slaveId = offer.getSlaveId.toString
         val mem = getResource(offer.getResourcesList, "mem")
         val cpus = getResource(offer.getResourcesList, "cpus").toInt
-        if (totalCoresAcquired < maxCores &&
-            mem >= MemoryUtils.calculateTotalMemory(sc) &&
+        val id = offer.getId.getValue
+        if (meetsConstraints &&
+            totalCoresAcquired < maxCores &&
+            mem >= calculateTotalMemory(sc) &&
             cpus >= 1 &&
             failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
             !slaveIdsWithExecutors.contains(slaveId)) {
@@ -193,33 +200,25 @@ private[spark] class CoarseMesosSchedulerBackend(
             .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
             .setName("Task " + taskId)
             .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem",
-              MemoryUtils.calculateTotalMemory(sc)))
+            .addResources(createResource("mem", calculateTotalMemory(sc)))
 
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { 
image =>
             MesosSchedulerBackendUtil
-              .setupContainerBuilderDockerInfo(image, sc.conf, 
task.getContainerBuilder())
+              .setupContainerBuilderDockerInfo(image, sc.conf, 
task.getContainerBuilder)
           }
 
-          d.launchTasks(
-            Collections.singleton(offer.getId), 
Collections.singletonList(task.build()), filters)
+          // accept the offer and launch the task
+          logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
+          d.launchTasks(List(offer.getId), List(task.build()), filters)
         } else {
-          // Filter it out
-          d.launchTasks(
-            Collections.singleton(offer.getId), 
Collections.emptyList[MesosTaskInfo](), filters)
+          // Decline the offer
+          logDebug(s"Declining offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
+          d.declineOffer(offer.getId)
         }
       }
     }
   }
 
-  /** Build a Mesos resource protobuf object */
-  private def createResource(resourceName: String, quantity: Double): 
Protos.Resource = {
-    Resource.newBuilder()
-      .setName(resourceName)
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
-      .build()
-  }
 
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
     val taskId = status.getTaskId.getValue.toInt

http://git-wip-us.apache.org/repos/asf/spark/blob/1165b17d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
deleted file mode 100644
index 8df4f3b..0000000
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import org.apache.spark.SparkContext
-
-private[spark] object MemoryUtils {
-  // These defaults copied from YARN
-  val OVERHEAD_FRACTION = 0.10
-  val OVERHEAD_MINIMUM = 384
-
-  def calculateTotalMemory(sc: SparkContext): Int = {
-    sc.conf.getInt("spark.mesos.executor.memoryOverhead",
-      math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) 
+ sc.executorMemory
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1165b17d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1067a7f..d3a20f8 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.Protos.TaskStatus.Reason
 import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
 import org.apache.mesos.{Scheduler, SchedulerDriver}
+
 import org.apache.spark.deploy.mesos.MesosDriverDescription
 import org.apache.spark.deploy.rest.{CreateSubmissionResponse, 
KillSubmissionResponse, SubmissionStatusResponse}
 import org.apache.spark.metrics.MetricsSystem

http://git-wip-us.apache.org/repos/asf/spark/blob/1165b17d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 49de85e..d72e2af 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -23,14 +23,14 @@ import java.util.{ArrayList => JArrayList, Collections, 
List => JList}
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
 
+import org.apache.mesos.{Scheduler => MScheduler, _}
 import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => 
MesosTaskInfo, _}
 import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.spark.{SparkContext, SparkException, TaskState}
 import org.apache.spark.executor.MesosExecutorBackend
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkException, TaskState}
 
 /**
  * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task 
is mapped to a
@@ -59,6 +59,10 @@ private[spark] class MesosSchedulerBackend(
 
   private[mesos] val mesosExecutorCores = 
sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
 
+  // Offer constraints
+  private[this] val slaveOfferConstraints =
+    parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
   @volatile var appId: String = _
 
   override def start() {
@@ -71,8 +75,8 @@ private[spark] class MesosSchedulerBackend(
     val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
       .orElse(sc.getSparkHome()) // Fall back to driver Spark home for 
backward compatibility
       .getOrElse {
-        throw new SparkException("Executor Spark home 
`spark.mesos.executor.home` is not set!")
-      }
+      throw new SparkException("Executor Spark home 
`spark.mesos.executor.home` is not set!")
+    }
     val environment = Environment.newBuilder()
     sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
       environment.addVariables(
@@ -115,14 +119,14 @@ private[spark] class MesosSchedulerBackend(
       .setName("cpus")
       .setType(Value.Type.SCALAR)
       .setScalar(Value.Scalar.newBuilder()
-        .setValue(mesosExecutorCores).build())
+      .setValue(mesosExecutorCores).build())
       .build()
     val memory = Resource.newBuilder()
       .setName("mem")
       .setType(Value.Type.SCALAR)
       .setScalar(
         Value.Scalar.newBuilder()
-          .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
+          .setValue(calculateTotalMemory(sc)).build())
       .build()
     val executorInfo = MesosExecutorInfo.newBuilder()
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -191,13 +195,31 @@ private[spark] class MesosSchedulerBackend(
         val mem = getResource(o.getResourcesList, "mem")
         val cpus = getResource(o.getResourcesList, "cpus")
         val slaveId = o.getSlaveId.getValue
-        (mem >= MemoryUtils.calculateTotalMemory(sc) &&
-          // need at least 1 for executor, 1 for task
-          cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
-          (slaveIdsWithExecutors.contains(slaveId) &&
-            cpus >= scheduler.CPUS_PER_TASK)
+        val offerAttributes = toAttributeMap(o.getAttributesList)
+
+        // check if all constraints are satisfield
+        //  1. Attribute constraints
+        //  2. Memory requirements
+        //  3. CPU requirements - need at least 1 for executor, 1 for task
+        val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+        val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
+        val meetsCPURequirements = cpus >= (mesosExecutorCores + 
scheduler.CPUS_PER_TASK)
+
+        val meetsRequirements =
+          (meetsConstraints && meetsMemoryRequirements && 
meetsCPURequirements) ||
+          (slaveIdsWithExecutors.contains(slaveId) && cpus >= 
scheduler.CPUS_PER_TASK)
+
+        // add some debug messaging
+        val debugstr = if (meetsRequirements) "Accepting" else "Declining"
+        val id = o.getId.getValue
+        logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: 
$mem cpu: $cpus")
+
+        meetsRequirements
       }
 
+      // Decline offers we ruled out immediately
+      unUsableOffers.foreach(o => d.declineOffer(o.getId))
+
       val workerOffers = usableOffers.map { o =>
         val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
           getResource(o.getResourcesList, "cpus").toInt
@@ -223,15 +245,15 @@ private[spark] class MesosSchedulerBackend(
       val acceptedOffers = 
scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
       acceptedOffers
         .foreach { offer =>
-          offer.foreach { taskDesc =>
-            val slaveId = taskDesc.executorId
-            slaveIdsWithExecutors += slaveId
-            slavesIdsOfAcceptedOffers += slaveId
-            taskIdToSlaveId(taskDesc.taskId) = slaveId
-            mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
-              .add(createMesosTask(taskDesc, slaveId))
-          }
+        offer.foreach { taskDesc =>
+          val slaveId = taskDesc.executorId
+          slaveIdsWithExecutors += slaveId
+          slavesIdsOfAcceptedOffers += slaveId
+          taskIdToSlaveId(taskDesc.taskId) = slaveId
+          mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+            .add(createMesosTask(taskDesc, slaveId))
         }
+      }
 
       // Reply to the offers
       val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: 
lower timeout?
@@ -251,8 +273,6 @@ private[spark] class MesosSchedulerBackend(
         d.declineOffer(o.getId)
       }
 
-      // Decline offers we ruled out immediately
-      unUsableOffers.foreach(o => d.declineOffer(o.getId))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1165b17d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index d11228f..d8a8c84 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -17,14 +17,17 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import java.util.List
+import java.util.{List => JList}
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.JavaConversions._
+import scala.util.control.NonFatal
 
-import org.apache.mesos.Protos.{FrameworkInfo, Resource, Status}
-import org.apache.mesos.{MesosSchedulerDriver, Scheduler}
-import org.apache.spark.Logging
+import com.google.common.base.Splitter
+import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler}
+import org.apache.mesos.Protos._
+import org.apache.mesos.protobuf.GeneratedMessage
+import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.util.Utils
 
 /**
@@ -86,10 +89,150 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
   /**
    * Get the amount of resources for the specified type from the resource list
    */
-  protected def getResource(res: List[Resource], name: String): Double = {
+  protected def getResource(res: JList[Resource], name: String): Double = {
     for (r <- res if r.getName == name) {
       return r.getScalar.getValue
     }
     0.0
   }
+
+  /** Helper method to get the key,value-set pair for a Mesos Attribute 
protobuf */
+  protected def getAttribute(attr: Attribute): (String, Set[String]) = {
+    (attr.getName, attr.getText.getValue.split(',').toSet)
+  }
+
+
+  /** Build a Mesos resource protobuf object */
+  protected def createResource(resourceName: String, quantity: Double): 
Protos.Resource = {
+    Resource.newBuilder()
+      .setName(resourceName)
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+      .build()
+  }
+
+  /**
+   * Converts the attributes from the resource offer into a Map of name -> 
Attribute Value
+   * The attribute values are the mesos attribute types and they are
+   * @param offerAttributes
+   * @return
+   */
+  protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, 
GeneratedMessage] = {
+    offerAttributes.map(attr => {
+      val attrValue = attr.getType match {
+        case Value.Type.SCALAR => attr.getScalar
+        case Value.Type.RANGES => attr.getRanges
+        case Value.Type.SET => attr.getSet
+        case Value.Type.TEXT => attr.getText
+      }
+      (attr.getName, attrValue)
+    }).toMap
+  }
+
+
+  /**
+   * Match the requirements (if any) to the offer attributes.
+   * if attribute requirements are not specified - return true
+   * else if attribute is defined and no values are given, simple attribute 
presence is performed
+   * else if attribute name and value is specified, subset match is performed 
on slave attributes
+   */
+  def matchesAttributeRequirements(
+      slaveOfferConstraints: Map[String, Set[String]],
+      offerAttributes: Map[String, GeneratedMessage]): Boolean = {
+    slaveOfferConstraints.forall {
+      // offer has the required attribute and subsumes the required values for 
that attribute
+      case (name, requiredValues) =>
+        offerAttributes.get(name) match {
+          case None => false
+          case Some(_) if requiredValues.isEmpty => true // empty value 
matches presence
+          case Some(scalarValue: Value.Scalar) =>
+            // check if provided values is less than equal to the offered 
values
+            requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue)
+          case Some(rangeValue: Value.Range) =>
+            val offerRange = rangeValue.getBegin to rangeValue.getEnd
+            // Check if there is some required value that is between the 
ranges specified
+            // Note: We only support the ability to specify discrete values, 
in the future
+            // we may expand it to subsume ranges specified with a XX..YY 
value or something
+            // similar to that.
+            requiredValues.map(_.toLong).exists(offerRange.contains(_))
+          case Some(offeredValue: Value.Set) =>
+            // check if the specified required values is a subset of offered 
set
+            requiredValues.subsetOf(offeredValue.getItemList.toSet)
+          case Some(textValue: Value.Text) =>
+            // check if the specified value is equal, if multiple values are 
specified
+            // we succeed if any of them match.
+            requiredValues.contains(textValue.getValue)
+        }
+    }
+  }
+
+  /**
+   * Parses the attributes constraints provided to spark and build a matching 
data struct:
+   *  Map[<attribute-name>, Set[values-to-match]]
+   *  The constraints are specified as ';' separated key-value pairs where 
keys and values
+   *  are separated by ':'. The ':' implies equality (for singular values) and 
"is one of" for
+   *  multiple values (comma separated). For example:
+   *  {{{
+   *  parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
+   *  // would result in
+   *  <code>
+   *  Map(
+   *    "tachyon" -> Set("true"),
+   *    "zone":   -> Set("us-east-1a", "us-east-1b")
+   *  )
+   *  }}}
+   *
+   *  Mesos documentation: 
http://mesos.apache.org/documentation/attributes-resources/
+   *                       
https://github.com/apache/mesos/blob/master/src/common/values.cpp
+   *                       
https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
+   *
+   * @param constraintsVal constaints string consisting of ';' separated 
key-value pairs (separated
+   *                       by ':')
+   * @return  Map of constraints to match resources offers.
+   */
+  def parseConstraintString(constraintsVal: String): Map[String, Set[String]] 
= {
+    /*
+      Based on mesos docs:
+      attributes : attribute ( ";" attribute )*
+      attribute : labelString ":" ( labelString | "," )+
+      labelString : [a-zA-Z0-9_/.-]
+    */
+    val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':')
+    // kv splitter
+    if (constraintsVal.isEmpty) {
+      Map()
+    } else {
+      try {
+        Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
+          case (k, v) =>
+            if (v == null || v.isEmpty) {
+              (k, Set[String]())
+            } else {
+              (k, v.split(',').toSet)
+            }
+        }
+      } catch {
+        case NonFatal(e) =>
+          throw new IllegalArgumentException(s"Bad constraint string: 
$constraintsVal", e)
+      }
+    }
+  }
+
+  // These defaults copied from YARN
+  private val MEMORY_OVERHEAD_FRACTION = 0.10
+  private val MEMORY_OVERHEAD_MINIMUM = 384
+
+  /**
+   * Return the amount of memory to allocate to each executor, taking into 
account
+   * container overheads.
+   * @param sc SparkContext to use to get 
`spark.mesos.executor.memoryOverhead` value
+   * @return memory requirement as (0.1 * <memoryOverhead>) or 
MEMORY_OVERHEAD_MINIMUM
+   *         (whichever is larger)
+   */
+  def calculateTotalMemory(sc: SparkContext): Int = {
+    sc.conf.getInt("spark.mesos.executor.memoryOverhead",
+      math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, 
MEMORY_OVERHEAD_MINIMUM).toInt) +
+      sc.executorMemory
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1165b17d/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
deleted file mode 100644
index e72285d..0000000
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-
-class MemoryUtilsSuite extends SparkFunSuite with MockitoSugar {
-  test("MesosMemoryUtils should always override memoryOverhead when it's set") 
{
-    val sparkConf = new SparkConf
-
-    val sc = mock[SparkContext]
-    when(sc.conf).thenReturn(sparkConf)
-
-    // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
-    when(sc.executorMemory).thenReturn(512)
-    assert(MemoryUtils.calculateTotalMemory(sc) === 896)
-
-    // 384 < sc.executorMemory * 0.1 => 4096 + (4096 * 0.1) = 4505.6
-    when(sc.executorMemory).thenReturn(4096)
-    assert(MemoryUtils.calculateTotalMemory(sc) === 4505)
-
-    // set memoryOverhead
-    sparkConf.set("spark.mesos.executor.memoryOverhead", "100")
-    assert(MemoryUtils.calculateTotalMemory(sc) === 4196)
-    sparkConf.set("spark.mesos.executor.memoryOverhead", "400")
-    assert(MemoryUtils.calculateTotalMemory(sc) === 4496)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1165b17d/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index 68df46a..d01837f 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -149,7 +149,9 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
     when(sc.conf).thenReturn(new SparkConf)
     when(sc.listenerBus).thenReturn(listenerBus)
 
-    val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
+    val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+    val minMem = backend.calculateTotalMemory(sc)
     val minCpu = 4
 
     val mesosOffers = new java.util.ArrayList[Offer]
@@ -157,8 +159,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
     mesosOffers.add(createOffer(2, minMem - 1, minCpu))
     mesosOffers.add(createOffer(3, minMem, minCpu))
 
-    val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
-
     val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
     expectedWorkerOffers.append(new WorkerOffer(
       mesosOffers.get(0).getSlaveId.getValue,

http://git-wip-us.apache.org/repos/asf/spark/blob/1165b17d/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
new file mode 100644
index 0000000..b354914
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.mesos.Protos.Value
+import org.mockito.Mockito._
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+
+class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with 
MockitoSugar {
+
+  // scalastyle:off structural.type
+  // this is the documented way of generating fixtures in scalatest
+  def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
+    val sparkConf = new SparkConf
+    val sc = mock[SparkContext]
+    when(sc.conf).thenReturn(sparkConf)
+  }
+  val utils = new MesosSchedulerUtils { }
+  // scalastyle:on structural.type
+
+  test("use at-least minimum overhead") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(512)
+    utils.calculateTotalMemory(f.sc) shouldBe 896
+  }
+
+  test("use overhead if it is greater than minimum value") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(4096)
+    utils.calculateTotalMemory(f.sc) shouldBe 4505
+  }
+
+  test("use spark.mesos.executor.memoryOverhead (if set)") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(1024)
+    f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
+    utils.calculateTotalMemory(f.sc) shouldBe 1536
+  }
+
+  test("parse a non-empty constraint string correctly") {
+    val expectedMap = Map(
+      "tachyon" -> Set("true"),
+      "zone" -> Set("us-east-1a", "us-east-1b")
+    )
+    utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") 
should be (expectedMap)
+  }
+
+  test("parse an empty constraint string correctly") {
+    utils.parseConstraintString("") shouldBe Map()
+  }
+
+  test("throw an exception when the input is malformed") {
+    an[IllegalArgumentException] should be thrownBy
+      utils.parseConstraintString("tachyon;zone:us-east")
+  }
+
+  test("empty values for attributes' constraints matches all values") {
+    val constraintsStr = "tachyon:"
+    val parsedConstraints = utils.parseConstraintString(constraintsStr)
+
+    parsedConstraints shouldBe Map("tachyon" -> Set())
+
+    val zoneSet = 
Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
+    val noTachyonOffer = Map("zone" -> zoneSet)
+    val tachyonTrueOffer = Map("tachyon" -> 
Value.Text.newBuilder().setValue("true").build())
+    val tachyonFalseOffer = Map("tachyon" -> 
Value.Text.newBuilder().setValue("false").build())
+
+    utils.matchesAttributeRequirements(parsedConstraints, noTachyonOffer) 
shouldBe false
+    utils.matchesAttributeRequirements(parsedConstraints, tachyonTrueOffer) 
shouldBe true
+    utils.matchesAttributeRequirements(parsedConstraints, tachyonFalseOffer) 
shouldBe true
+  }
+
+  test("subset match is performed for set attributes") {
+    val supersetConstraint = Map(
+      "tachyon" -> Value.Text.newBuilder().setValue("true").build(),
+      "zone" -> Value.Set.newBuilder()
+        .addItem("us-east-1a")
+        .addItem("us-east-1b")
+        .addItem("us-east-1c")
+        .build())
+
+    val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c"
+    val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
+
+    utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) 
shouldBe true
+  }
+
+  test("less than equal match is performed on scalar attributes") {
+    val offerAttribs = Map("gpus" -> 
Value.Scalar.newBuilder().setValue(3).build())
+
+    val ltConstraint = utils.parseConstraintString("gpus:2")
+    val eqConstraint = utils.parseConstraintString("gpus:3")
+    val gtConstraint = utils.parseConstraintString("gpus:4")
+
+    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe 
true
+    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe 
true
+    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe 
false
+  }
+
+  test("contains match is performed for range attributes") {
+    val offerAttribs = Map("ports" -> 
Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
+    val ltConstraint = utils.parseConstraintString("ports:6000")
+    val eqConstraint = utils.parseConstraintString("ports:7500")
+    val gtConstraint = utils.parseConstraintString("ports:8002")
+    val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
+
+    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe 
false
+    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe 
true
+    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe 
false
+    utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe 
true
+  }
+
+  test("equality match is performed for text attributes") {
+    val offerAttribs = Map("tachyon" -> 
Value.Text.newBuilder().setValue("true").build())
+
+    val trueConstraint = utils.parseConstraintString("tachyon:true")
+    val falseConstraint = utils.parseConstraintString("tachyon:false")
+
+    utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe 
true
+    utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe 
false
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1165b17d/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 5f1d6da..1f915d8 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -184,6 +184,14 @@ acquire. By default, it will acquire *all* cores in the 
cluster (that get offere
 only makes sense if you run just one application at a time. You can cap the 
maximum number of cores
 using `conf.set("spark.cores.max", "10")` (for example).
 
+You may also make use of `spark.mesos.constraints` to set attribute based 
constraints on mesos resource offers. By default, all resource offers will be 
accepted.
+
+{% highlight scala %}
+conf.set("spark.mesos.constraints", "tachyon=true;us-east-1=false")
+{% endhighlight %}
+
+For example, Let's say `spark.mesos.constraints` is set to 
`tachyon=true;us-east-1=false`, then the resource offers will be checked to see 
if they meet both these constraints and only then will be accepted to start new 
executors.
+
 # Mesos Docker Support
 
 Spark can make use of a Mesos Docker containerizer by setting the property 
`spark.mesos.executor.docker.image`
@@ -298,6 +306,20 @@ See the [configuration page](configuration.html) for 
information on Spark config
     the final overhead will be this value.
   </td>
 </tr>
+<tr>
+  <td><code>spark.mesos.constraints</code></td>
+  <td>Attribute based constraints to be matched against when accepting 
resource offers.</td>
+  <td>
+    Attribute based constraints on mesos resource offers. By default, all 
resource offers will be accepted. Refer to <a 
href="http://mesos.apache.org/documentation/attributes-resources/";>Mesos 
Attributes & Resources</a> for more information on attributes.
+    <ul>
+      <li>Scalar constraints are matched with "less than equal" semantics i.e. 
value in the constraint must be less than or equal to the value in the resource 
offer.</li>
+      <li>Range constraints are matched with "contains" semantics i.e. value 
in the constraint must be within the resource offer's value.</li>
+      <li>Set constraints are matched with "subset of" semantics i.e. value in 
the constraint must be a subset of the resource offer's value.</li>
+      <li>Text constraints are metched with "equality" semantics i.e. value in 
the constraint must be exactly equal to the resource offer's value.</li>
+      <li>In case there is no value present as a part of the constraint any 
offer with the corresponding attribute will be accepted (without value 
check).</li>
+    </ul>
+  </td>
+</tr>
 </table>
 
 # Troubleshooting and Debugging


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to