Repository: spark
Updated Branches:
  refs/heads/master e090f3c0c -> a888fed30


[SPARK-19740][MESOS] Add support in Spark to pass arbitrary parameters into 
docker when running on mesos with docker containerizer

## What changes were proposed in this pull request?

Allow passing in arbitrary parameters into docker when launching spark 
executors on mesos with docker containerizer tnachen

## How was this patch tested?

Manually built and tested with passed in parameter

Author: Ji Yan <jiyan@Jis-MacBook-Air.local>

Closes #17109 from yanji84/ji/allow_set_docker_user.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a888fed3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a888fed3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a888fed3

Branch: refs/heads/master
Commit: a888fed3099e84c2cf45e9419f684a3658ada19d
Parents: e090f3c
Author: Ji Yan <jiyan@Jis-MacBook-Air.local>
Authored: Sun Apr 16 14:34:12 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Apr 16 14:34:12 2017 +0100

----------------------------------------------------------------------
 docs/running-on-mesos.md                        | 10 ++++
 .../mesos/MesosSchedulerBackendUtil.scala       | 36 +++++++++++--
 .../mesos/MesosSchedulerBackendUtilSuite.scala  | 53 ++++++++++++++++++++
 3 files changed, 96 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a888fed3/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index ef01cfe..314a806 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -357,6 +357,16 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.executor.docker.parameters</code></td>
+  <td>(none)</td>
+  <td>
+    Set the list of custom parameters which will be passed into the 
<code>docker run</code> command when launching the Spark executor on Mesos 
using the docker containerizer. The format of this property is a 
comma-separated list of
+    key/value pairs. Example:
+
+    <pre>key1=val1,key2=val2,key3=val3</pre>
+  </td>
+</tr>
+<tr>
   <td><code>spark.mesos.executor.docker.volumes</code></td>
   <td>(none)</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/a888fed3/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index a2adb22..fbcbc55 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume}
+import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, 
Volume}
 import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
 
 import org.apache.spark.{SparkConf, SparkException}
@@ -99,6 +99,28 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
     .toList
   }
 
+  /**
+   * Parse a list of docker parameters, each of which
+   * takes the form key=value
+   */
+  private def parseParamsSpec(params: String): List[Parameter] = {
+    // split with limit of 2 to avoid parsing error when '='
+    // exists in the parameter value
+    params.split(",").map(_.split("=", 2)).flatMap { spec: Array[String] =>
+      val param: Parameter.Builder = Parameter.newBuilder()
+      spec match {
+        case Array(key, value) =>
+          Some(param.setKey(key).setValue(value))
+        case spec =>
+          logWarning(s"Unable to parse arbitary parameters: $params. "
+            + "Expected form: \"key=value(, ...)\"")
+          None
+      }
+    }
+    .map { _.build() }
+    .toList
+  }
+
   def containerInfo(conf: SparkConf): ContainerInfo = {
     val containerType = if (conf.contains("spark.mesos.executor.docker.image") 
&&
       conf.get("spark.mesos.containerizer", "docker") == "docker") {
@@ -120,8 +142,14 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
         .map(parsePortMappingsSpec)
         .getOrElse(List.empty)
 
+      val params = conf
+        .getOption("spark.mesos.executor.docker.parameters")
+        .map(parseParamsSpec)
+        .getOrElse(List.empty)
+
       if (containerType == ContainerInfo.Type.DOCKER) {
-        containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps))
+        containerInfo
+          .setDocker(dockerInfo(image, forcePullImage, portMaps, params))
       } else {
         containerInfo.setMesos(mesosInfo(image, forcePullImage))
       }
@@ -144,11 +172,13 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
   private def dockerInfo(
       image: String,
       forcePullImage: Boolean,
-      portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = {
+      portMaps: List[ContainerInfo.DockerInfo.PortMapping],
+      params: List[Parameter]): DockerInfo = {
     val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
       .setImage(image)
       .setForcePullImage(forcePullImage)
     portMaps.foreach(dockerBuilder.addPortMappings(_))
+    params.foreach(dockerBuilder.addParameters(_))
 
     dockerBuilder.build
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a888fed3/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
new file mode 100644
index 0000000..caf9d89
--- /dev/null
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
+
+  test("ContainerInfo fails to parse invalid docker parameters") {
+    val conf = new SparkConf()
+    conf.set("spark.mesos.executor.docker.parameters", "a,b")
+    conf.set("spark.mesos.executor.docker.image", "test")
+
+    val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
+    val params = containerInfo.getDocker.getParametersList
+
+    assert(params.size() == 0)
+  }
+
+  test("ContainerInfo parses docker parameters") {
+    val conf = new SparkConf()
+    conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3")
+    conf.set("spark.mesos.executor.docker.image", "test")
+
+    val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
+    val params = containerInfo.getDocker.getParametersList
+    assert(params.size() == 3)
+    assert(params.get(0).getKey == "a")
+    assert(params.get(0).getValue == "1")
+    assert(params.get(1).getKey == "b")
+    assert(params.get(1).getValue == "2")
+    assert(params.get(2).getKey == "c")
+    assert(params.get(2).getValue == "3")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to