Repository: spark
Updated Branches:
  refs/heads/master dd784a882 -> 978cd5f12


[SPARK-15271][MESOS] Allow force pulling executor docker images

## What changes were proposed in this pull request?

Mesos agents by default will not pull docker images which are cached
locally already. In order to run Spark executors from mutable tags like
`:latest` this commit introduces a Spark setting
`spark.mesos.executor.docker.forcePullImage`. Setting this flag to
true will tell the Mesos agent to force pull the docker image (default is 
`false` which is consistent with the previous
implementation and Mesos' default
behaviour).

## How was this patch tested?

I ran a sample application including this change on a Mesos cluster and 
verified the correct behaviour for both, with and without, force pulling the 
executor image. As expected the image is being force pulled if the flag is set.

Author: Philipp Hoffmann <m...@philipphoffmann.de>

Closes #13051 from philipphoffmann/force-pull-image.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/978cd5f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/978cd5f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/978cd5f1

Branch: refs/heads/master
Commit: 978cd5f125eb5a410bad2e60bf8385b11cf1b978
Parents: dd784a8
Author: Philipp Hoffmann <m...@philipphoffmann.de>
Authored: Mon Jul 25 20:14:47 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jul 25 20:14:47 2016 +0100

----------------------------------------------------------------------
 .../cluster/mesos/MesosClusterScheduler.scala   | 14 ++---
 .../MesosCoarseGrainedSchedulerBackend.scala    |  7 ++-
 .../MesosFineGrainedSchedulerBackend.scala      |  7 ++-
 .../mesos/MesosSchedulerBackendUtil.scala       | 20 ++++---
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 63 ++++++++++++++++++++
 .../MesosFineGrainedSchedulerBackendSuite.scala |  2 +
 dev/deps/spark-deps-hadoop-2.2                  |  2 +-
 dev/deps/spark-deps-hadoop-2.3                  |  2 +-
 dev/deps/spark-deps-hadoop-2.4                  |  2 +-
 dev/deps/spark-deps-hadoop-2.6                  |  2 +-
 dev/deps/spark-deps-hadoop-2.7                  |  2 +-
 docs/_config.yml                                |  2 +-
 docs/running-on-mesos.md                        | 12 ++++
 pom.xml                                         |  2 +-
 14 files changed, 110 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 39b0f4d..1e9644d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler(
           .addAllResources(memResourcesToUse.asJava)
         offer.resources = finalResources.asJava
         
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach 
{ image =>
-          val container = taskInfo.getContainerBuilder()
-          val volumes = submission.schedulerProperties
-            .get("spark.mesos.executor.docker.volumes")
-            .map(MesosSchedulerBackendUtil.parseVolumesSpec)
-          val portmaps = submission.schedulerProperties
-            .get("spark.mesos.executor.docker.portmaps")
-            .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
-          MesosSchedulerBackendUtil.addDockerInfo(
-            container, image, volumes = volumes, portmaps = portmaps)
-          taskInfo.setContainer(container.build())
+          MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+            image,
+            submission.schedulerProperties.get,
+            taskInfo.getContainerBuilder())
         }
         val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new 
ArrayBuffer[TaskInfo])
         queuedTasks += taskInfo.build()

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 99e6d39..52993ca 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             .addAllResources(memResourcesToUse.asJava)
 
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { 
image =>
-            MesosSchedulerBackendUtil
-              .setupContainerBuilderDockerInfo(image, sc.conf, 
taskBuilder.getContainerBuilder)
+            MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+              image,
+              sc.conf.getOption,
+              taskBuilder.getContainerBuilder
+            )
           }
 
           tasks(offer.getId) ::= taskBuilder.build()

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index e08dc3b..8d4fc9e 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       .setData(ByteString.copyFrom(createExecArg()))
 
     sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-      MesosSchedulerBackendUtil
-        .setupContainerBuilderDockerInfo(image, sc.conf, 
executorInfo.getContainerBuilder())
+      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+        image,
+        sc.conf.getOption,
+        executorInfo.getContainerBuilder()
+      )
     }
 
     (executorInfo.build(), resourcesAfterMem.asJava)

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 05b2b08..aa669f0 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
 import org.apache.mesos.Protos.{ContainerInfo, Volume}
 import org.apache.mesos.Protos.ContainerInfo.DockerInfo
 
-import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 
 /**
@@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
   def addDockerInfo(
       container: ContainerInfo.Builder,
       image: String,
+      forcePullImage: Boolean = false,
       volumes: Option[List[Volume]] = None,
       network: Option[ContainerInfo.DockerInfo.Network] = None,
       portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): 
Unit = {
 
-    val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
+    val docker = ContainerInfo.DockerInfo.newBuilder()
+      .setImage(image)
+      .setForcePullImage(forcePullImage)
 
     network.foreach(docker.setNetwork)
     portmaps.foreach(_.foreach(docker.addPortMappings))
@@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
   }
 
   /**
-   * Setup a docker containerizer
+   * Setup a docker containerizer from MesosDriverDescription scheduler 
properties
    */
   def setupContainerBuilderDockerInfo(
     imageName: String,
-    conf: SparkConf,
+    conf: String => Option[String],
     builder: ContainerInfo.Builder): Unit = {
-    val volumes = conf
-      .getOption("spark.mesos.executor.docker.volumes")
+    val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
+      .exists(_.equals("true"))
+    val volumes = conf("spark.mesos.executor.docker.volumes")
       .map(parseVolumesSpec)
-    val portmaps = conf
-      .getOption("spark.mesos.executor.docker.portmaps")
+    val portmaps = conf("spark.mesos.executor.docker.portmaps")
       .map(parsePortMappingsSpec)
+
     addDockerInfo(
       builder,
       imageName,
+      forcePullImage = forcePullImage,
       volumes = volumes,
       portmaps = portmaps)
     logDebug("setupContainerDockerInfo: using docker image: " + imageName)

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index c2779d7..d3a85c6 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     backend.start()
   }
 
+  test("docker settings are reflected in created tasks") {
+    setBackend(Map(
+      "spark.mesos.executor.docker.image" -> "some_image",
+      "spark.mesos.executor.docker.forcePullImage" -> "true",
+      "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
+      "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
+    ))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val launchedTasks = verifyTaskLaunched("o1").asScala
+    assert(launchedTasks.size == 1)
+
+    val containerInfo = launchedTasks.head.getContainer
+    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+    val volumes = containerInfo.getVolumesList.asScala
+    assert(volumes.size == 1)
+
+    val volume = volumes.head
+    assert(volume.getHostPath == "/host_vol")
+    assert(volume.getContainerPath == "/container_vol")
+    assert(volume.getMode == Volume.Mode.RO)
+
+    val dockerInfo = containerInfo.getDocker
+
+    assert(dockerInfo.getImage == "some_image")
+    assert(dockerInfo.getForcePullImage)
+
+    val portMappings = dockerInfo.getPortMappingsList.asScala
+    assert(portMappings.size == 1)
+
+    val portMapping = portMappings.head
+    assert(portMapping.getHostPort == 8080)
+    assert(portMapping.getContainerPort == 80)
+    assert(portMapping.getProtocol == "tcp")
+  }
+
+  test("force-pull-image option is disabled by default") {
+    setBackend(Map(
+      "spark.mesos.executor.docker.image" -> "some_image"
+    ))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val launchedTasks = verifyTaskLaunched("o1").asScala
+    assert(launchedTasks.size == 1)
+
+    val containerInfo = launchedTasks.head.getContainer
+    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+    val dockerInfo = containerInfo.getDocker
+
+    assert(dockerInfo.getImage == "some_image")
+    assert(!dockerInfo.getForcePullImage)
+  }
+
   private def verifyDeclinedOffer(driver: SchedulerDriver,
       offerId: OfferID,
       filter: Boolean = false): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 41693b1..fcf39f6 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite
 
     val conf = new SparkConf()
       .set("spark.mesos.executor.docker.image", "spark/mock")
+      .set("spark.mesos.executor.docker.forcePullImage", "true")
       .set("spark.mesos.executor.docker.volumes", 
"/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
       .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
 
@@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite
     val (execInfo, _) = backend.createExecutorInfo(
       Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
     assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+    assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
     val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
     assert(portmaps.get(0).getHostPort.equals(80))
     assert(portmaps.get(0).getContainerPort.equals(8080))

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.2
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 5d536b7..ff15873 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -116,7 +116,7 @@ libfb303-0.9.2.jar
 libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.3
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index d16f42a..2b5764f 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.4
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 2e261cb..3f53fdb 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 67f38f4..d3a7ab8 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -130,7 +130,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 0758396..05317a0 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -131,7 +131,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/docs/_config.yml
----------------------------------------------------------------------
diff --git a/docs/_config.yml b/docs/_config.yml
index be3d8a2..bbb576e 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
 SPARK_VERSION_SHORT: 2.1.0
 SCALA_BINARY_VERSION: "2.11"
 SCALA_VERSION: "2.11.7"
-MESOS_VERSION: 0.21.0
+MESOS_VERSION: 0.22.0
 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
 SPARK_GITHUB_URL: https://github.com/apache/spark

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 10dc9ce..ce888b5 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -260,6 +260,10 @@ have Mesos download Spark via the usual methods.
 
 Requires Mesos version 0.20.1 or later.
 
+Note that by default Mesos agents will not pull the image if it already exists 
on the agent. If you use mutable image
+tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in 
order to force the agent to always pull the
+image before running the executor. Force pulling images is only available in 
Mesos version 0.22 and above.
+
 # Running Alongside Hadoop
 
 You can run Spark and Mesos alongside your existing Hadoop cluster by just 
launching them as a
@@ -335,6 +339,14 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.executor.docker.forcePullImage</code></td>
+  <td>false</td>
+  <td>
+    Force Mesos agents to pull the image specified in 
<code>spark.mesos.executor.docker.image</code>.
+    By default Mesos agents will not pull images they already have cached.
+  </td>
+</tr>
+<tr>
   <td><code>spark.mesos.executor.docker.volumes</code></td>
   <td>(none)</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d064cb5..b69292d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
     <java.version>1.7</java.version>
     <maven.version>3.3.9</maven.version>
     <sbt.project.name>spark</sbt.project.name>
-    <mesos.version>0.21.1</mesos.version>
+    <mesos.version>0.22.2</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to