Github user mgummelt commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14167#discussion_r71046997
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
    @@ -353,38 +353,60 @@ private[spark] class MesosClusterScheduler(
         }
       }
     
    -  private def buildDriverCommand(desc: MesosDriverDescription): 
CommandInfo = {
    -    val appJar = CommandInfo.URI.newBuilder()
    -      
.setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
    -    val builder = CommandInfo.newBuilder().addUris(appJar)
    -    val entries = conf.getOption("spark.executor.extraLibraryPath")
    -      .map(path => Seq(path) ++ desc.command.libraryPathEntries)
    -      .getOrElse(desc.command.libraryPathEntries)
    -
    -    val prefixEnv = if (!entries.isEmpty) {
    -      Utils.libraryPathEnvPrefix(entries)
    -    } else {
    -      ""
    +  private def getDriverExecutorURI(desc: MesosDriverDescription) = {
    +    desc.schedulerProperties.get("spark.executor.uri")
    +      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
    +  }
    +
    +  private def getDriverEnvironment(desc: MesosDriverDescription): 
Environment = {
    +    val env = {
    +      val executorOpts = desc.schedulerProperties.map { case (k, v) => 
s"-D$k=$v" }.mkString(" ")
    +      val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
    +
    +      val prefix = "spark.mesos.driverEnv."
    +      val driverEnv = 
desc.schedulerProperties.filterKeys(_.startsWith(prefix))
    +        .map { case (k, v) => (k.substring(prefix.length), v) }
    +
    +      driverEnv ++ executorEnv ++ desc.command.environment
         }
    +
         val envBuilder = Environment.newBuilder()
    -    desc.command.environment.foreach { case (k, v) =>
    -      
envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v).build())
    +    env.foreach { case (k, v) =>
    +      envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v))
         }
    -    // Pass all spark properties to executor.
    -    val executorOpts = desc.schedulerProperties.map { case (k, v) => 
s"-D$k=$v" }.mkString(" ")
    -    envBuilder.addVariables(
    -      
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
    +    envBuilder.build()
    +  }
    +
    +  private def getDriverUris(desc: MesosDriverDescription): 
List[CommandInfo.URI] = {
    +    val confUris = List(conf.getOption("spark.mesos.uris"),
    +      desc.schedulerProperties.get("spark.mesos.uris"),
    +      desc.schedulerProperties.get("spark.submit.pyFiles")).flatMap(
    +      _.map(_.split(",").map(_.trim))
    +    ).flatten
    +
    +    val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
    +
    +    ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
    +      CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
    +  }
    +
    +  private def getDriverCommandValue(desc: MesosDriverDescription): String 
= {
         val dockerDefined = 
desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
    -    val executorUri = desc.schedulerProperties.get("spark.executor.uri")
    -      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
    +    val executorUri = getDriverExecutorURI(desc)
         // Gets the path to run spark-submit, and the path to the Mesos 
sandbox.
         val (executable, sandboxPath) = if (dockerDefined) {
           // Application jar is automatically downloaded in the mounted 
sandbox by Mesos,
           // and the path to the mounted volume is stored in $MESOS_SANDBOX 
env variable.
           ("./bin/spark-submit", "$MESOS_SANDBOX")
         } else if (executorUri.isDefined) {
    -      
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
           val folderBasename = executorUri.get.split('/').last.split('.').head
    +
    +      val entries = conf.getOption("spark.executor.extraLibraryPath")
    +        .map(path => Seq(path) ++ desc.command.libraryPathEntries)
    +        .getOrElse(desc.command.libraryPathEntries)
    +
    +      val prefixEnv = if (!entries.isEmpty) 
Utils.libraryPathEnvPrefix(entries) else ""
    --- End diff --
    
    This is existing code that I didn't change.  I just moved some things 
around.  I'd prefer not to change code not related to this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to