Repository: spark
Updated Branches:
  refs/heads/master e156b5dd3 -> c8fc1f3ba


[SPARK-20085][MESOS] Configurable mesos labels for executors

## What changes were proposed in this pull request?

Add spark.mesos.task.labels configuration option to add mesos key:value labels 
to the executor.

 "k1:v1,k2:v2" as the format, colons separating key-value and commas to list 
out more than one.

Discussion of labels with mgummelt at #17404

## How was this patch tested?

Added unit tests to verify labels were added correctly, with incorrect labels 
being ignored and added a test to test the name of the executor.

Tested with: `./build/sbt -Pmesos mesos/test`

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Kalvin Chau <kalvin.c...@viasat.com>

Closes #17413 from kalvinnchau/mesos-labels.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8fc1f3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8fc1f3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8fc1f3b

Branch: refs/heads/master
Commit: c8fc1f3badf61bcfc4bd8eeeb61f73078ca068d1
Parents: e156b5d
Author: Kalvin Chau <kalvin.c...@viasat.com>
Authored: Thu Apr 6 09:14:31 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Apr 6 09:14:31 2017 +0100

----------------------------------------------------------------------
 docs/running-on-mesos.md                        |  9 ++++
 .../MesosCoarseGrainedSchedulerBackend.scala    | 24 ++++++++++
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 46 ++++++++++++++++++++
 3 files changed, 79 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c8fc1f3b/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 8d5ad12..ef01cfe 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -368,6 +368,15 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.task.labels</code></td>
+  <td>(none)</td>
+  <td>
+    Set the Mesos labels to add to each task. Labels are free-form key-value 
pairs.
+    Key-value pairs should be separated by a colon, and commas used to list 
more than one.
+    Ex. key:value,key2:value2.
+  </td>
+</tr>
+<tr>
   <td><code>spark.mesos.executor.home</code></td>
   <td>driver side <code>SPARK_HOME</code></td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/c8fc1f3b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 5bdc2a2..2a36ec4 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -67,6 +67,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
 
+  private val taskLabels = conf.get("spark.mesos.task.labels", "")
+
   private[this] val shutdownTimeoutMS =
     conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
       .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
@@ -408,6 +410,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           taskBuilder.addAllResources(resourcesToUse.asJava)
           
taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
 
+          val labelsBuilder = taskBuilder.getLabelsBuilder
+          val labels = buildMesosLabels().asJava
+
+          labelsBuilder.addAllLabels(labels)
+
+          taskBuilder.setLabels(labelsBuilder)
+
           tasks(offer.getId) ::= taskBuilder.build()
           remainingResources(offerId) = resourcesLeft.asJava
           totalCoresAcquired += taskCPUs
@@ -422,6 +431,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     tasks.toMap
   }
 
+  private def buildMesosLabels(): List[Label] = {
+   taskLabels.split(",").flatMap(label =>
+      label.split(":") match {
+        case Array(key, value) =>
+          Some(Label.newBuilder()
+            .setKey(key)
+            .setValue(value)
+            .build())
+        case _ =>
+          logWarning(s"Unable to parse $label into a key:value label for the 
task.")
+          None
+      }
+    ).toList
+  }
+
   /** Extracts task needed resources from a list of available resources. */
   private def partitionTaskResources(
       resources: JList[Resource],

http://git-wip-us.apache.org/repos/asf/spark/blob/c8fc1f3b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index eb83926..c040f05 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -475,6 +475,52 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0")
   }
 
+  test("mesos sets configurable labels on tasks") {
+    val taskLabelsString = "mesos:test,label:test"
+    setBackend(Map(
+      "spark.mesos.task.labels" -> taskLabelsString
+    ))
+
+    // Build up the labels
+    val taskLabels = Protos.Labels.newBuilder()
+      .addLabels(Protos.Label.newBuilder()
+        .setKey("mesos").setValue("test").build())
+      .addLabels(Protos.Label.newBuilder()
+        .setKey("label").setValue("test").build())
+      .build()
+
+    val offers = List(Resources(backend.executorMemory(sc), 1))
+    offerResources(offers)
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+
+    val labels = launchedTasks.head.getLabels
+
+    assert(launchedTasks.head.getLabels.equals(taskLabels))
+  }
+
+  test("mesos ignored invalid labels and sets configurable labels on tasks") {
+    val taskLabelsString = "mesos:test,label:test,incorrect:label:here"
+    setBackend(Map(
+      "spark.mesos.task.labels" -> taskLabelsString
+    ))
+
+    // Build up the labels
+    val taskLabels = Protos.Labels.newBuilder()
+      .addLabels(Protos.Label.newBuilder()
+        .setKey("mesos").setValue("test").build())
+      .addLabels(Protos.Label.newBuilder()
+        .setKey("label").setValue("test").build())
+      .build()
+
+    val offers = List(Resources(backend.executorMemory(sc), 1))
+    offerResources(offers)
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+
+    val labels = launchedTasks.head.getLabels
+
+    assert(launchedTasks.head.getLabels.equals(taskLabels))
+  }
+
   test("mesos supports spark.mesos.network.name") {
     setBackend(Map(
       "spark.mesos.network.name" -> "test-network-name"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to