Repository: spark
Updated Branches:
  refs/heads/master 0c33bf817 -> 4eecf550a


[SPARK-7373] [MESOS] Add docker support for launching drivers in mesos cluster 
mode.

Using the existing docker support for mesos, also enabling the mesos cluster 
mode scheduler to launch Spark drivers in docker images as well.

This also allows the executors launched by the drivers to be also in the same 
Docker image by passing  the docker settings.

Author: Timothy Chen <tnac...@gmail.com>

Closes #5917 from tnachen/spark_cluster_docker and squashes the following 
commits:

1e842f5 [Timothy Chen] Add docker support for launching drivers in mesos 
cluster mode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4eecf550
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4eecf550
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4eecf550

Branch: refs/heads/master
Commit: 4eecf550aa7e4fb448baca82281bfd4e8bc4a778
Parents: 0c33bf8
Author: Timothy Chen <tnac...@gmail.com>
Authored: Thu May 7 12:23:16 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu May 7 12:23:16 2015 -0700

----------------------------------------------------------------------
 .../cluster/mesos/MesosClusterScheduler.scala   | 30 +++++++++++++++-----
 1 file changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4eecf550/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 06f0e28..1067a7f 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -370,16 +370,21 @@ private[spark] class MesosClusterScheduler(
     val executorOpts = desc.schedulerProperties.map { case (k, v) => 
s"-D$k=$v" }.mkString(" ")
     envBuilder.addVariables(
       
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
-    val cmdOptions = generateCmdOption(desc)
+    val cmdOptions = generateCmdOption(desc).mkString(" ")
+    val dockerDefined = 
desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
     val executorUri = desc.schedulerProperties.get("spark.executor.uri")
       .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
     val appArguments = desc.command.arguments.mkString(" ")
-    val cmd = if (executorUri.isDefined) {
+    val (executable, jar) = if (dockerDefined) {
+      // Application jar is automatically downloaded in the mounted sandbox by 
Mesos,
+      // and the path to the mounted volume is stored in $MESOS_SANDBOX env 
variable.
+      ("./bin/spark-submit", s"$$MESOS_SANDBOX/${desc.jarUrl.split("/").last}")
+    } else if (executorUri.isDefined) {
       
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
       val folderBasename = executorUri.get.split('/').last.split('.').head
       val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
       val cmdJar = s"../${desc.jarUrl.split("/").last}"
-      s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
+      (cmdExecutable, cmdJar)
     } else {
       val executorSparkHome = 
desc.schedulerProperties.get("spark.mesos.executor.home")
         .orElse(conf.getOption("spark.home"))
@@ -389,9 +394,9 @@ private[spark] class MesosClusterScheduler(
         }
       val cmdExecutable = new File(executorSparkHome, 
"./bin/spark-submit").getCanonicalPath
       val cmdJar = desc.jarUrl.split("/").last
-      s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
+      (cmdExecutable, cmdJar)
     }
-    builder.setValue(cmd)
+    builder.setValue(s"$executable $cmdOptions $jar $appArguments")
     builder.setEnvironment(envBuilder.build())
     builder.build()
   }
@@ -458,9 +463,20 @@ private[spark] class MesosClusterScheduler(
           .setCommand(commandInfo)
           .addResources(cpuResource)
           .addResources(memResource)
-          .build()
+        
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach 
{ image =>
+          val container = taskInfo.getContainerBuilder()
+          val volumes = submission.schedulerProperties
+            .get("spark.mesos.executor.docker.volumes")
+            .map(MesosSchedulerBackendUtil.parseVolumesSpec)
+          val portmaps = submission.schedulerProperties
+            .get("spark.mesos.executor.docker.portmaps")
+            .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
+          MesosSchedulerBackendUtil.addDockerInfo(
+            container, image, volumes = volumes, portmaps = portmaps)
+          taskInfo.setContainer(container.build())
+        }
         val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new 
ArrayBuffer[TaskInfo])
-        queuedTasks += taskInfo
+        queuedTasks += taskInfo.build()
         logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver 
" +
           submission.submissionId)
         val newState = new MesosClusterSubmissionState(submission, taskId, 
offer.offer.getSlaveId,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to