Repository: spark
Updated Branches:
  refs/heads/master a2abb583c -> 0869b3a5f


[SPARK-15271][MESOS] Allow force pulling executor docker images

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Mesos agents by default will not pull docker images which are cached
locally already. In order to run Spark executors from mutable tags like
`:latest` this commit introduces a Spark setting
(`spark.mesos.executor.docker.forcePullImage`). Setting this flag to
true will tell the Mesos agent to force pull the docker image (default is 
`false` which is consistent with the previous
implementation and Mesos' default
behaviour).

Author: Philipp Hoffmann <m...@philipphoffmann.de>

Closes #14348 from philipphoffmann/force-pull-image.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0869b3a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0869b3a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0869b3a5

Branch: refs/heads/master
Commit: 0869b3a5f028b64c2da511e70b02ab42f65fc949
Parents: a2abb58
Author: Philipp Hoffmann <m...@philipphoffmann.de>
Authored: Tue Jul 26 16:09:10 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jul 26 16:09:10 2016 +0100

----------------------------------------------------------------------
 .../cluster/mesos/MesosClusterScheduler.scala   | 14 ++---
 .../MesosCoarseGrainedSchedulerBackend.scala    |  7 ++-
 .../MesosFineGrainedSchedulerBackend.scala      |  7 ++-
 .../mesos/MesosSchedulerBackendUtil.scala       | 20 ++++---
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 63 ++++++++++++++++++++
 .../MesosFineGrainedSchedulerBackendSuite.scala |  2 +
 dev/deps/spark-deps-hadoop-2.2                  |  2 +-
 dev/deps/spark-deps-hadoop-2.3                  |  2 +-
 dev/deps/spark-deps-hadoop-2.4                  |  2 +-
 dev/deps/spark-deps-hadoop-2.6                  |  2 +-
 dev/deps/spark-deps-hadoop-2.7                  |  2 +-
 docs/_config.yml                                |  2 +-
 docs/running-on-mesos.md                        | 12 ++++
 pom.xml                                         |  2 +-
 14 files changed, 110 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 39b0f4d..1e9644d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler(
           .addAllResources(memResourcesToUse.asJava)
         offer.resources = finalResources.asJava
         
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach 
{ image =>
-          val container = taskInfo.getContainerBuilder()
-          val volumes = submission.schedulerProperties
-            .get("spark.mesos.executor.docker.volumes")
-            .map(MesosSchedulerBackendUtil.parseVolumesSpec)
-          val portmaps = submission.schedulerProperties
-            .get("spark.mesos.executor.docker.portmaps")
-            .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
-          MesosSchedulerBackendUtil.addDockerInfo(
-            container, image, volumes = volumes, portmaps = portmaps)
-          taskInfo.setContainer(container.build())
+          MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+            image,
+            submission.schedulerProperties.get,
+            taskInfo.getContainerBuilder())
         }
         val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new 
ArrayBuffer[TaskInfo])
         queuedTasks += taskInfo.build()

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 99e6d39..52993ca 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             .addAllResources(memResourcesToUse.asJava)
 
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { 
image =>
-            MesosSchedulerBackendUtil
-              .setupContainerBuilderDockerInfo(image, sc.conf, 
taskBuilder.getContainerBuilder)
+            MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+              image,
+              sc.conf.getOption,
+              taskBuilder.getContainerBuilder
+            )
           }
 
           tasks(offer.getId) ::= taskBuilder.build()

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index e08dc3b..8d4fc9e 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       .setData(ByteString.copyFrom(createExecArg()))
 
     sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-      MesosSchedulerBackendUtil
-        .setupContainerBuilderDockerInfo(image, sc.conf, 
executorInfo.getContainerBuilder())
+      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+        image,
+        sc.conf.getOption,
+        executorInfo.getContainerBuilder()
+      )
     }
 
     (executorInfo.build(), resourcesAfterMem.asJava)

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 05b2b08..aa669f0 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
 import org.apache.mesos.Protos.{ContainerInfo, Volume}
 import org.apache.mesos.Protos.ContainerInfo.DockerInfo
 
-import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 
 /**
@@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
   def addDockerInfo(
       container: ContainerInfo.Builder,
       image: String,
+      forcePullImage: Boolean = false,
       volumes: Option[List[Volume]] = None,
       network: Option[ContainerInfo.DockerInfo.Network] = None,
       portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): 
Unit = {
 
-    val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
+    val docker = ContainerInfo.DockerInfo.newBuilder()
+      .setImage(image)
+      .setForcePullImage(forcePullImage)
 
     network.foreach(docker.setNetwork)
     portmaps.foreach(_.foreach(docker.addPortMappings))
@@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
   }
 
   /**
-   * Setup a docker containerizer
+   * Setup a docker containerizer from MesosDriverDescription scheduler 
properties
    */
   def setupContainerBuilderDockerInfo(
     imageName: String,
-    conf: SparkConf,
+    conf: String => Option[String],
     builder: ContainerInfo.Builder): Unit = {
-    val volumes = conf
-      .getOption("spark.mesos.executor.docker.volumes")
+    val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
+      .exists(_.equals("true"))
+    val volumes = conf("spark.mesos.executor.docker.volumes")
       .map(parseVolumesSpec)
-    val portmaps = conf
-      .getOption("spark.mesos.executor.docker.portmaps")
+    val portmaps = conf("spark.mesos.executor.docker.portmaps")
       .map(parsePortMappingsSpec)
+
     addDockerInfo(
       builder,
       imageName,
+      forcePullImage = forcePullImage,
       volumes = volumes,
       portmaps = portmaps)
     logDebug("setupContainerDockerInfo: using docker image: " + imageName)

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index c2779d7..51d262e 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     backend.start()
   }
 
+  test("docker settings are reflected in created tasks") {
+    setBackend(Map(
+      "spark.mesos.executor.docker.image" -> "some_image",
+      "spark.mesos.executor.docker.forcePullImage" -> "true",
+      "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
+      "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
+    ))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    assert(launchedTasks.size == 1)
+
+    val containerInfo = launchedTasks.head.getContainer
+    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+    val volumes = containerInfo.getVolumesList.asScala
+    assert(volumes.size == 1)
+
+    val volume = volumes.head
+    assert(volume.getHostPath == "/host_vol")
+    assert(volume.getContainerPath == "/container_vol")
+    assert(volume.getMode == Volume.Mode.RO)
+
+    val dockerInfo = containerInfo.getDocker
+
+    assert(dockerInfo.getImage == "some_image")
+    assert(dockerInfo.getForcePullImage)
+
+    val portMappings = dockerInfo.getPortMappingsList.asScala
+    assert(portMappings.size == 1)
+
+    val portMapping = portMappings.head
+    assert(portMapping.getHostPort == 8080)
+    assert(portMapping.getContainerPort == 80)
+    assert(portMapping.getProtocol == "tcp")
+  }
+
+  test("force-pull-image option is disabled by default") {
+    setBackend(Map(
+      "spark.mesos.executor.docker.image" -> "some_image"
+    ))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    assert(launchedTasks.size == 1)
+
+    val containerInfo = launchedTasks.head.getContainer
+    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+    val dockerInfo = containerInfo.getDocker
+
+    assert(dockerInfo.getImage == "some_image")
+    assert(!dockerInfo.getForcePullImage)
+  }
+
   private def verifyDeclinedOffer(driver: SchedulerDriver,
       offerId: OfferID,
       filter: Boolean = false): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 41693b1..fcf39f6 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite
 
     val conf = new SparkConf()
       .set("spark.mesos.executor.docker.image", "spark/mock")
+      .set("spark.mesos.executor.docker.forcePullImage", "true")
       .set("spark.mesos.executor.docker.volumes", 
"/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
       .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
 
@@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite
     val (execInfo, _) = backend.createExecutorInfo(
       Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
     assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+    assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
     val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
     assert(portmaps.get(0).getHostPort.equals(80))
     assert(portmaps.get(0).getContainerPort.equals(8080))

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/dev/deps/spark-deps-hadoop-2.2
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 5d536b7..ff15873 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -116,7 +116,7 @@ libfb303-0.9.2.jar
 libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/dev/deps/spark-deps-hadoop-2.3
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index d16f42a..2b5764f 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/dev/deps/spark-deps-hadoop-2.4
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 2e261cb..3f53fdb 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 67f38f4..d3a7ab8 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -130,7 +130,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 0758396..05317a0 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -131,7 +131,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/docs/_config.yml
----------------------------------------------------------------------
diff --git a/docs/_config.yml b/docs/_config.yml
index be3d8a2..bbb576e 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
 SPARK_VERSION_SHORT: 2.1.0
 SCALA_BINARY_VERSION: "2.11"
 SCALA_VERSION: "2.11.7"
-MESOS_VERSION: 0.21.0
+MESOS_VERSION: 0.22.0
 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
 SPARK_GITHUB_URL: https://github.com/apache/spark

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 10dc9ce..ce888b5 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -260,6 +260,10 @@ have Mesos download Spark via the usual methods.
 
 Requires Mesos version 0.20.1 or later.
 
+Note that by default Mesos agents will not pull the image if it already exists 
on the agent. If you use mutable image
+tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in 
order to force the agent to always pull the
+image before running the executor. Force pulling images is only available in 
Mesos version 0.22 and above.
+
 # Running Alongside Hadoop
 
 You can run Spark and Mesos alongside your existing Hadoop cluster by just 
launching them as a
@@ -335,6 +339,14 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.executor.docker.forcePullImage</code></td>
+  <td>false</td>
+  <td>
+    Force Mesos agents to pull the image specified in 
<code>spark.mesos.executor.docker.image</code>.
+    By default Mesos agents will not pull images they already have cached.
+  </td>
+</tr>
+<tr>
   <td><code>spark.mesos.executor.docker.volumes</code></td>
   <td>(none)</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/0869b3a5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d064cb5..b69292d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
     <java.version>1.7</java.version>
     <maven.version>3.3.9</maven.version>
     <sbt.project.name>spark</sbt.project.name>
-    <mesos.version>0.21.1</mesos.version>
+    <mesos.version>0.22.2</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to