Repository: spark
Updated Branches:
  refs/heads/master 43cbfad99 -> ce0d3bb37


[SPARK-21694][MESOS] Support Mesos CNI network labels

JIRA ticket: https://issues.apache.org/jira/browse/SPARK-21694

## What changes were proposed in this pull request?

Spark already supports launching containers attached to a given CNI network by 
specifying it via the config `spark.mesos.network.name`.

This PR adds support to pass in network labels to CNI plugins via a new config 
option `spark.mesos.network.labels`. These network labels are key-value pairs 
that are set in the `NetworkInfo` of both the driver and executor tasks. More 
details in the related Mesos documentation:  
http://mesos.apache.org/documentation/latest/cni/#mesos-meta-data-to-cni-plugins

## How was this patch tested?

Unit tests, for both driver and executor tasks.
Manual integration test to submit a job with the `spark.mesos.network.labels` 
option, hit the mesos/state.json endpoint, and check that the labels are set in 
the driver and executor tasks.

ArtRand skonto

Author: Susan X. Huynh <xhu...@mesosphere.com>

Closes #18910 from susanxhuynh/sh-mesos-cni-labels.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce0d3bb3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce0d3bb3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce0d3bb3

Branch: refs/heads/master
Commit: ce0d3bb377766bdf4df7852272557ae846408877
Parents: 43cbfad
Author: Susan X. Huynh <xhu...@mesosphere.com>
Authored: Thu Aug 24 10:05:38 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Aug 24 10:05:38 2017 +0100

----------------------------------------------------------------------
 docs/running-on-mesos.md                         | 14 ++++++++++++++
 .../org/apache/spark/deploy/mesos/config.scala   | 19 +++++++++++++++++--
 .../MesosCoarseGrainedSchedulerBackend.scala     |  2 +-
 .../mesos/MesosSchedulerBackendUtil.scala        |  9 +++++++--
 .../mesos/MesosClusterSchedulerSuite.scala       |  9 +++++++--
 ...MesosCoarseGrainedSchedulerBackendSuite.scala |  9 +++++++--
 6 files changed, 53 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce0d3bb3/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index ae38550..0e5a20c 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -538,6 +538,20 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.network.labels</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Pass network labels to CNI plugins.  This is a comma-separated list
+    of key-value pairs, where each key-value pair has the format key:value.
+    Example:
+
+    <pre>key1:val1,key2:val2</pre>
+    See
+    <a 
href="http://mesos.apache.org/documentation/latest/cni/#mesos-meta-data-to-cni-plugins";>the
 Mesos CNI docs</a>
+    for more details.
+  </td>
+</tr>
+<tr>
   <td><code>spark.mesos.fetcherCache.enable</code></td>
   <td><code>false</code></td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/ce0d3bb3/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 6c8619e..a5015b9 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -56,7 +56,7 @@ package object config {
       .stringConf
       .createOptional
 
-  private [spark] val DRIVER_LABELS =
+  private[spark] val DRIVER_LABELS =
     ConfigBuilder("spark.mesos.driver.labels")
       .doc("Mesos labels to add to the driver.  Labels are free-form key-value 
pairs.  Key-value " +
         "pairs should be separated by a colon, and commas used to list more 
than one." +
@@ -64,10 +64,25 @@ package object config {
       .stringConf
       .createOptional
 
-  private [spark] val DRIVER_FAILOVER_TIMEOUT =
+  private[spark] val DRIVER_FAILOVER_TIMEOUT =
     ConfigBuilder("spark.mesos.driver.failoverTimeout")
       .doc("Amount of time in seconds that the master will wait to hear from 
the driver, " +
           "during a temporary disconnection, before tearing down all the 
executors.")
       .doubleConf
       .createWithDefault(0.0)
+
+  private[spark] val NETWORK_NAME =
+    ConfigBuilder("spark.mesos.network.name")
+      .doc("Attach containers to the given named network. If this job is 
launched " +
+        "in cluster mode, also launch the driver in the given named network.")
+      .stringConf
+      .createOptional
+
+  private[spark] val NETWORK_LABELS =
+    ConfigBuilder("spark.mesos.network.labels")
+      .doc("Network labels to pass to CNI plugins.  This is a comma-separated 
list " +
+        "of key-value pairs, where each key-value pair has the format 
key:value. " +
+        "Example: key1:val1,key2:val2")
+      .stringConf
+      .createOptional
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce0d3bb3/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 5ecd466..2669987 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -670,7 +670,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   private def executorHostname(offer: Offer): String = {
-    if (sc.conf.getOption("spark.mesos.network.name").isDefined) {
+    if (sc.conf.get(NETWORK_NAME).isDefined) {
       // The agent's IP is not visible in a CNI container, so we bind to 
0.0.0.0
       "0.0.0.0"
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/ce0d3bb3/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index fbcbc55..e5c1e80 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -21,6 +21,7 @@ import org.apache.mesos.Protos.{ContainerInfo, Image, 
NetworkInfo, Parameter, Vo
 import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
 
 import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.mesos.config.{NETWORK_LABELS, NETWORK_NAME}
 import org.apache.spark.internal.Logging
 
 /**
@@ -161,8 +162,12 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
       volumes.foreach(_.foreach(containerInfo.addVolumes(_)))
     }
 
-    conf.getOption("spark.mesos.network.name").map { name =>
-      val info = NetworkInfo.newBuilder().setName(name).build()
+    conf.get(NETWORK_NAME).map { name =>
+      val networkLabels = 
MesosProtoUtils.mesosLabels(conf.get(NETWORK_LABELS).getOrElse(""))
+      val info = NetworkInfo.newBuilder()
+        .setName(name)
+        .setLabels(networkLabels)
+        .build()
       containerInfo.addNetworkInfos(info)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ce0d3bb3/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 0bb4790..50bb501 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -222,7 +222,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
     assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
   }
 
-  test("supports spark.mesos.network.name") {
+  test("supports spark.mesos.network.name and spark.mesos.network.labels") {
     setScheduler()
 
     val mem = 1000
@@ -233,7 +233,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
         command,
         Map("spark.mesos.executor.home" -> "test",
           "spark.app.name" -> "test",
-          "spark.mesos.network.name" -> "test-network-name"),
+          "spark.mesos.network.name" -> "test-network-name",
+          "spark.mesos.network.labels" -> "key1:val1,key2:val2"),
         "s1",
         new Date()))
 
@@ -246,6 +247,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
     val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
     assert(networkInfos.size == 1)
     assert(networkInfos.get(0).getName == "test-network-name")
+    assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
+    assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
+    assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
+    assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
   }
 
   test("supports spark.mesos.driver.labels") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ce0d3bb3/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index a8175e2..ab29c29 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -568,9 +568,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     assert(launchedTasks.head.getLabels.equals(taskLabels))
   }
 
-  test("mesos supports spark.mesos.network.name") {
+  test("mesos supports spark.mesos.network.name and 
spark.mesos.network.labels") {
     setBackend(Map(
-      "spark.mesos.network.name" -> "test-network-name"
+      "spark.mesos.network.name" -> "test-network-name",
+      "spark.mesos.network.labels" -> "key1:val1,key2:val2"
     ))
 
     val (mem, cpu) = (backend.executorMemory(sc), 4)
@@ -582,6 +583,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
     assert(networkInfos.size == 1)
     assert(networkInfos.get(0).getName == "test-network-name")
+    assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
+    assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
+    assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
+    assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
   }
 
   test("supports spark.scheduler.minRegisteredResourcesRatio") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to