Repository: spark
Updated Branches:
  refs/heads/master 69626addd -> 235cb256d


[SPARK-16194] Mesos Driver env vars

## What changes were proposed in this pull request?

Added new configuration namespace: spark.mesos.env.*

This allows a user submitting a job in cluster mode to set arbitrary 
environment variables on the driver.
spark.mesos.driverEnv.KEY=VAL will result in the env var "KEY" being set to 
"VAL"

I've also refactored the tests a bit so we can re-use code in 
MesosClusterScheduler.

And I've refactored the command building logic in `buildDriverCommand`.  
Command builder values were very intertwined before, and now it's easier to 
determine exactly how each variable is set.

## How was this patch tested?

unit tests

Author: Michael Gummelt <mgumm...@mesosphere.io>

Closes #14167 from mgummelt/driver-env-vars.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/235cb256
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/235cb256
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/235cb256

Branch: refs/heads/master
Commit: 235cb256d06653bcde4c3ed6b081503a94996321
Parents: 69626ad
Author: Michael Gummelt <mgumm...@mesosphere.io>
Authored: Thu Jul 21 18:29:00 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jul 21 18:29:00 2016 +0100

----------------------------------------------------------------------
 .../cluster/mesos/MesosClusterScheduler.scala   | 86 +++++++++++-------
 .../mesos/MesosClusterSchedulerSuite.scala      | 47 +++++++++-
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 93 +++++---------------
 .../spark/scheduler/cluster/mesos/Utils.scala   | 71 +++++++++++++++
 docs/running-on-mesos.md                        | 10 +++
 5 files changed, 201 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/235cb256/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 73bd4c5..39b0f4d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -353,38 +353,60 @@ private[spark] class MesosClusterScheduler(
     }
   }
 
-  private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
-    val appJar = CommandInfo.URI.newBuilder()
-      .setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
-    val builder = CommandInfo.newBuilder().addUris(appJar)
-    val entries = conf.getOption("spark.executor.extraLibraryPath")
-      .map(path => Seq(path) ++ desc.command.libraryPathEntries)
-      .getOrElse(desc.command.libraryPathEntries)
-
-    val prefixEnv = if (!entries.isEmpty) {
-      Utils.libraryPathEnvPrefix(entries)
-    } else {
-      ""
+  private def getDriverExecutorURI(desc: MesosDriverDescription) = {
+    desc.schedulerProperties.get("spark.executor.uri")
+      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+  }
+
+  private def getDriverEnvironment(desc: MesosDriverDescription): Environment 
= {
+    val env = {
+      val executorOpts = desc.schedulerProperties.map { case (k, v) => 
s"-D$k=$v" }.mkString(" ")
+      val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
+
+      val prefix = "spark.mesos.driverEnv."
+      val driverEnv = desc.schedulerProperties.filterKeys(_.startsWith(prefix))
+        .map { case (k, v) => (k.substring(prefix.length), v) }
+
+      driverEnv ++ executorEnv ++ desc.command.environment
     }
+
     val envBuilder = Environment.newBuilder()
-    desc.command.environment.foreach { case (k, v) =>
-      
envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v).build())
+    env.foreach { case (k, v) =>
+      envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v))
     }
-    // Pass all spark properties to executor.
-    val executorOpts = desc.schedulerProperties.map { case (k, v) => 
s"-D$k=$v" }.mkString(" ")
-    envBuilder.addVariables(
-      
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
+    envBuilder.build()
+  }
+
+  private def getDriverUris(desc: MesosDriverDescription): 
List[CommandInfo.URI] = {
+    val confUris = List(conf.getOption("spark.mesos.uris"),
+      desc.schedulerProperties.get("spark.mesos.uris"),
+      desc.schedulerProperties.get("spark.submit.pyFiles")).flatMap(
+      _.map(_.split(",").map(_.trim))
+    ).flatten
+
+    val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
+
+    ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
+      CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
+  }
+
+  private def getDriverCommandValue(desc: MesosDriverDescription): String = {
     val dockerDefined = 
desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
-    val executorUri = desc.schedulerProperties.get("spark.executor.uri")
-      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+    val executorUri = getDriverExecutorURI(desc)
     // Gets the path to run spark-submit, and the path to the Mesos sandbox.
     val (executable, sandboxPath) = if (dockerDefined) {
       // Application jar is automatically downloaded in the mounted sandbox by 
Mesos,
       // and the path to the mounted volume is stored in $MESOS_SANDBOX env 
variable.
       ("./bin/spark-submit", "$MESOS_SANDBOX")
     } else if (executorUri.isDefined) {
-      
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
       val folderBasename = executorUri.get.split('/').last.split('.').head
+
+      val entries = conf.getOption("spark.executor.extraLibraryPath")
+        .map(path => Seq(path) ++ desc.command.libraryPathEntries)
+        .getOrElse(desc.command.libraryPathEntries)
+
+      val prefixEnv = if (!entries.isEmpty) 
Utils.libraryPathEnvPrefix(entries) else ""
+
       val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
       // Sandbox path points to the parent folder as we chdir into the 
folderBasename.
       (cmdExecutable, "..")
@@ -399,20 +421,18 @@ private[spark] class MesosClusterScheduler(
       // Sandbox points to the current directory by default with Mesos.
       (cmdExecutable, ".")
     }
-    val primaryResource = new File(sandboxPath, 
desc.jarUrl.split("/").last).toString()
     val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ")
+    val primaryResource = new File(sandboxPath, 
desc.jarUrl.split("/").last).toString()
     val appArguments = desc.command.arguments.mkString(" ")
-    builder.setValue(s"$executable $cmdOptions $primaryResource $appArguments")
-    builder.setEnvironment(envBuilder.build())
-    conf.getOption("spark.mesos.uris").map { uris =>
-      setupUris(uris, builder)
-    }
-    desc.schedulerProperties.get("spark.mesos.uris").map { uris =>
-      setupUris(uris, builder)
-    }
-    desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
-      setupUris(pyFiles, builder)
-    }
+
+    s"$executable $cmdOptions $primaryResource $appArguments"
+  }
+
+  private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
+    val builder = CommandInfo.newBuilder()
+    builder.setValue(getDriverCommandValue(desc))
+    builder.setEnvironment(getDriverEnvironment(desc))
+    builder.addAllUris(getDriverUris(desc).asJava)
     builder.build()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/235cb256/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index a32423d..0260759 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -31,17 +31,24 @@ import org.scalatest.mock.MockitoSugar
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.Command
 import org.apache.spark.deploy.mesos.MesosDriverDescription
-
+import org.apache.spark.scheduler.cluster.mesos.Utils
 
 class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext 
with MockitoSugar {
 
   private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), 
Seq(), Seq())
+  private var driver: SchedulerDriver = _
   private var scheduler: MesosClusterScheduler = _
 
-  override def beforeEach(): Unit = {
+  private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = {
     val conf = new SparkConf()
     conf.setMaster("mesos://localhost:5050")
     conf.setAppName("spark mesos")
+
+    if (sparkConfVars != null) {
+      conf.setAll(sparkConfVars)
+    }
+
+    driver = mock[SchedulerDriver]
     scheduler = new MesosClusterScheduler(
       new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
       override def start(): Unit = { ready = true }
@@ -50,9 +57,11 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
   }
 
   test("can queue drivers") {
+    setScheduler()
+
     val response = scheduler.submitDriver(
-        new MesosDriverDescription("d1", "jar", 1000, 1, true,
-          command, Map[String, String](), "s1", new Date()))
+      new MesosDriverDescription("d1", "jar", 1000, 1, true,
+        command, Map[String, String](), "s1", new Date()))
     assert(response.success)
     val response2 =
       scheduler.submitDriver(new MesosDriverDescription(
@@ -65,6 +74,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
   }
 
   test("can kill queued drivers") {
+    setScheduler()
+
     val response = scheduler.submitDriver(
         new MesosDriverDescription("d1", "jar", 1000, 1, true,
           command, Map[String, String](), "s1", new Date()))
@@ -76,6 +87,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
   }
 
   test("can handle multiple roles") {
+    setScheduler()
+
     val driver = mock[SchedulerDriver]
     val response = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
@@ -138,6 +151,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
   }
 
   test("escapes commandline args for the shell") {
+    setScheduler()
+
     val conf = new SparkConf()
     conf.setMaster("mesos://localhost:5050")
     conf.setAppName("spark mesos")
@@ -172,4 +187,28 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
       assert(escape(s"onlywrap${char}this") === 
wrapped(s"onlywrap${char}this"))
     })
   }
+
+  test("supports spark.mesos.driverEnv.*") {
+    setScheduler()
+
+    val mem = 1000
+    val cpu = 1
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", mem, cpu, true,
+        command,
+        Map("spark.mesos.executor.home" -> "test",
+          "spark.app.name" -> "test",
+          "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
+        "s1",
+        new Date()))
+    assert(response.success)
+
+    val offer = Utils.createOffer("o1", "s1", mem, cpu)
+    scheduler.resourceOffers(driver, List(offer).asJava)
+    val tasks = Utils.verifyTaskLaunched(driver, "o1")
+    val env = 
tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v =>
+      (v.getName, v.getValue)).toMap
+    assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/235cb256/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 7f21d4c..c2779d7 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -24,8 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
 import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.Scalar
-import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Matchers
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
@@ -35,6 +34,7 @@ import org.apache.spark.{LocalSparkContext, SecurityManager, 
SparkConf, SparkCon
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.mesos.Utils._
 
 class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     with LocalSparkContext
@@ -59,7 +59,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 
     // launches a task on a valid offer
     offerResources(offers)
-    verifyTaskLaunched("o1")
+    verifyTaskLaunched(driver, "o1")
 
     // kills executors
     backend.doRequestTotalExecutors(0)
@@ -74,7 +74,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     // Launches a new task when requested executors is positive
     backend.doRequestTotalExecutors(2)
     offerResources(offers, 2)
-    verifyTaskLaunched("o2")
+    verifyTaskLaunched(driver, "o2")
   }
 
   test("mesos supports killing and relaunching tasks with executors") {
@@ -86,7 +86,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     val offer1 = (minMem, minCpu)
     val offer2 = (minMem, 1)
     offerResources(List(offer1, offer2))
-    verifyTaskLaunched("o1")
+    verifyTaskLaunched(driver, "o1")
 
     // accounts for a killed task
     val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
@@ -95,7 +95,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 
     // Launches a new task on a valid offer from the same slave
     offerResources(List(offer2))
-    verifyTaskLaunched("o2")
+    verifyTaskLaunched(driver, "o2")
   }
 
   test("mesos supports spark.executor.cores") {
@@ -106,10 +106,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     val offers = List((executorMemory * 2, executorCores + 1))
     offerResources(offers)
 
-    val taskInfos = verifyTaskLaunched("o1")
-    assert(taskInfos.size() == 1)
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 1)
 
-    val cpus = 
backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+    val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
     assert(cpus == executorCores)
   }
 
@@ -120,10 +120,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     val offerCores = 10
     offerResources(List((executorMemory * 2, offerCores)))
 
-    val taskInfos = verifyTaskLaunched("o1")
-    assert(taskInfos.size() == 1)
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 1)
 
-    val cpus = 
backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+    val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
     assert(cpus == offerCores)
   }
 
@@ -134,10 +134,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     val executorMemory = backend.executorMemory(sc)
     offerResources(List((executorMemory, maxCores + 1)))
 
-    val taskInfos = verifyTaskLaunched("o1")
-    assert(taskInfos.size() == 1)
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 1)
 
-    val cpus = 
backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+    val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
     assert(cpus == maxCores)
   }
 
@@ -156,7 +156,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
       (executorMemory, maxCores + 1),
       (executorMemory, maxCores + 1)))
 
-    verifyTaskLaunched("o1")
+    verifyTaskLaunched(driver, "o1")
     verifyDeclinedOffer(driver, createOfferId("o2"), true)
   }
 
@@ -171,8 +171,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
       (executorMemory * 2, executorCores * 2),
       (executorMemory * 2, executorCores * 2)))
 
-    verifyTaskLaunched("o1")
-    verifyTaskLaunched("o2")
+    verifyTaskLaunched(driver, "o1")
+    verifyTaskLaunched(driver, "o2")
   }
 
   test("mesos creates multiple executors on a single slave") {
@@ -184,8 +184,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     offerResources(List((executorMemory * 2, executorCores * 2)))
 
     // verify two executors were started on a single offer
-    val taskInfos = verifyTaskLaunched("o1")
-    assert(taskInfos.size() == 2)
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 2)
   }
 
   test("mesos doesn't register twice with the same shuffle service") {
@@ -194,11 +194,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 
     val offer1 = createOffer("o1", "s1", mem, cpu)
     backend.resourceOffers(driver, List(offer1).asJava)
-    verifyTaskLaunched("o1")
+    verifyTaskLaunched(driver, "o1")
 
     val offer2 = createOffer("o2", "s1", mem, cpu)
     backend.resourceOffers(driver, List(offer2).asJava)
-    verifyTaskLaunched("o2")
+    verifyTaskLaunched(driver, "o2")
 
     val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING)
     backend.statusUpdate(driver, status1)
@@ -216,7 +216,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 
     val offer1 = createOffer("o1", "s1", mem, cpu)
     backend.resourceOffers(driver, List(offer1).asJava)
-    verifyTaskLaunched("o1")
+    verifyTaskLaunched(driver, "o1")
 
     backend.doKillExecutors(List("0"))
     verify(driver, times(1)).killTask(createTaskId("0"))
@@ -269,14 +269,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     backend.resourceOffers(driver, mesosOffers.asJava)
   }
 
-  private def verifyTaskLaunched(offerId: String): 
java.util.Collection[TaskInfo] = {
-    val captor = 
ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(createOfferId(offerId))),
-      captor.capture())
-    captor.getValue
-  }
-
   private def createTaskStatus(taskId: String, slaveId: String, state: 
TaskState): TaskStatus = {
     TaskStatus.newBuilder()
       .setTaskId(TaskID.newBuilder().setValue(taskId).build())
@@ -285,41 +277,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
       .build
   }
 
-
-  private def createOfferId(offerId: String): OfferID = {
-    OfferID.newBuilder().setValue(offerId).build()
-  }
-
-  private def createSlaveId(slaveId: String): SlaveID = {
-    SlaveID.newBuilder().setValue(slaveId).build()
-  }
-
-  private def createExecutorId(executorId: String): ExecutorID = {
-    ExecutorID.newBuilder().setValue(executorId).build()
-  }
-
-  private def createTaskId(taskId: String): TaskID = {
-    TaskID.newBuilder().setValue(taskId).build()
-  }
-
-  private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: 
Int): Offer = {
-    val builder = Offer.newBuilder()
-    builder.addResourcesBuilder()
-      .setName("mem")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(mem))
-    builder.addResourcesBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(cpu))
-    builder.setId(createOfferId(offerId))
-      .setFrameworkId(FrameworkID.newBuilder()
-        .setValue("f1"))
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
-      .setHostname(s"host${slaveId}")
-      .build()
-  }
-
   private def createSchedulerBackend(
       taskScheduler: TaskSchedulerImpl,
       driver: SchedulerDriver,
@@ -364,9 +321,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
       .set("spark.mesos.driver.webui.url", "http://webui";)
 
     if (sparkConfVars != null) {
-      for (attr <- sparkConfVars) {
-        sparkConf.set(attr._1, attr._2)
-      }
+      sparkConf.setAll(sparkConfVars)
     }
 
     sc = new SparkContext(sparkConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/235cb256/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
new file mode 100644
index 0000000..ff26d14
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.Collections
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.Scalar
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+import scala.collection.JavaConverters._
+
+object Utils {
+  def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer 
= {
+    val builder = Offer.newBuilder()
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(mem))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(cpu))
+    builder.setId(createOfferId(offerId))
+      .setFrameworkId(FrameworkID.newBuilder()
+        .setValue("f1"))
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+      .setHostname(s"host${slaveId}")
+      .build()
+  }
+
+  def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): 
List[TaskInfo] = {
+    val captor = 
ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(createOfferId(offerId))),
+      captor.capture())
+    captor.getValue.asScala.toList
+  }
+
+  def createOfferId(offerId: String): OfferID = {
+    OfferID.newBuilder().setValue(offerId).build()
+  }
+
+  def createSlaveId(slaveId: String): SlaveID = {
+    SlaveID.newBuilder().setValue(slaveId).build()
+  }
+
+  def createExecutorId(executorId: String): ExecutorID = {
+    ExecutorID.newBuilder().setValue(executorId).build()
+  }
+
+  def createTaskId(taskId: String): TaskID = {
+    TaskID.newBuilder().setValue(taskId).build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/235cb256/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 5219e99..10dc9ce 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -429,6 +429,16 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    This only affects drivers submitted in cluster mode.  Add the
+    environment variable specified by EnvironmentVariableName to the
+    driver process. The user can specify multiple of these to set
+    multiple environment variables.
+  </td>
+</tr>
+<tr>
   <td><code>spark.mesos.dispatcher.webui.url</code></td>
   <td><code>(none)</code></td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to