Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2350#discussion_r17400747
  
    --- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 ---
    @@ -36,113 +36,114 @@ private[spark] class YarnClientSchedulerBackend(
     
       var client: Client = null
       var appId: ApplicationId = null
    -  var checkerThread: Thread = null
       var stopping: Boolean = false
       var totalExpectedExecutors = 0
     
    -  private[spark] def addArg(optionName: String, envVar: String, sysProp: 
String,
    -      arrayBuf: ArrayBuffer[String]) {
    -    if (System.getenv(envVar) != null) {
    -      arrayBuf += (optionName, System.getenv(envVar))
    -    } else if (sc.getConf.contains(sysProp)) {
    -      arrayBuf += (optionName, sc.getConf.get(sysProp))
    -    }
    -  }
    -
    +  /**
    +   * Create a Yarn client to submit an application to the ResourceManager.
    +   * This waits until the application is running.
    +   */
       override def start() {
         super.start()
    -
         val driverHost = conf.get("spark.driver.host")
         val driverPort = conf.get("spark.driver.port")
         val hostport = driverHost + ":" + driverPort
         conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
     
         val argsArrayBuf = new ArrayBuffer[String]()
    -    argsArrayBuf += (
    -      "--args", hostport
    -    )
    -
    -    // process any optional arguments, given either as environment 
variables
    -    // or system properties. use the defaults already defined in 
ClientArguments
    -    // if things aren't specified. system properties override environment
    -    // variables.
    -    List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
    -      ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
    -      ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.executor.instances"),
    -      ("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
    -      ("--executor-memory", "SPARK_WORKER_MEMORY", 
"spark.executor.memory"),
    -      ("--executor-memory", "SPARK_EXECUTOR_MEMORY", 
"spark.executor.memory"),
    -      ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
    -      ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
    -      ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
    -      ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
    -    .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, 
sysProp, argsArrayBuf) }
    -
    -    logDebug("ClientArguments called with: " + argsArrayBuf)
    +    argsArrayBuf += ("--arg", hostport)
    +    argsArrayBuf ++= getExtraClientArguments
    +
    +    logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
         val args = new ClientArguments(argsArrayBuf.toArray, conf)
         totalExpectedExecutors = args.numExecutors
         client = new Client(args, conf)
    -    appId = client.runApp()
    -    waitForApp()
    -    checkerThread = yarnApplicationStateCheckerThread()
    +    appId = client.submitApplication()
    +    waitForApplication()
    +    asyncMonitorApplication()
       }
     
    -  def waitForApp() {
    -
    -    // TODO : need a better way to find out whether the executors are 
ready or not
    -    // maybe by resource usage report?
    -    while(true) {
    -      val report = client.getApplicationReport(appId)
    -
    -      logInfo("Application report from ASM: \n" +
    -        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
    -        "\t appStartTime: " + report.getStartTime() + "\n" +
    -        "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
    +  /**
    +   * Return any extra command line arguments to be passed to Client 
provided in the form of
    +   * environment variables or Spark properties.
    +   */
    +  private def getExtraClientArguments: Seq[String] = {
    +    val extraArgs = new ArrayBuffer[String]
    +    val optionTuples = // List of (target Client argument, environment 
variable, Spark property)
    +      List(
    +        ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
    +        ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
    +        ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.executor.instances"),
    +        ("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
    +        ("--executor-memory", "SPARK_WORKER_MEMORY", 
"spark.executor.memory"),
    +        ("--executor-memory", "SPARK_EXECUTOR_MEMORY", 
"spark.executor.memory"),
    +        ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
    +        ("--executor-cores", "SPARK_EXECUTOR_CORES", 
"spark.executor.cores"),
    +        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
    +        ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")
           )
    -
    -      // Ready to go, or already gone.
    -      val state = report.getYarnApplicationState()
    -      if (state == YarnApplicationState.RUNNING) {
    -        return
    -      } else if (state == YarnApplicationState.FINISHED ||
    -        state == YarnApplicationState.FAILED ||
    -        state == YarnApplicationState.KILLED) {
    -        throw new SparkException("Yarn application already ended," +
    -          "might be killed or not able to launch application master.")
    +    optionTuples.foreach { case (optionName, envVar, sparkProp) =>
    +      if (System.getenv(envVar) != null) {
    +        extraArgs += (optionName, System.getenv(envVar))
    +      } else if (sc.getConf.contains(sparkProp)) {
    +        extraArgs += (optionName, sc.getConf.get(sparkProp))
    --- End diff --
    
    We need to test it and see if that is still true. Someone else has a pull 
request up that was making it consistent with the comment but I haven't had 
time to test it.  I thought there were other changes to the app name elsewhere 
that was supposed to make it work but I haven't had time to follow up on it.
    
    It would be nice to make things consistent.
    
    Note that I would love to see these configs cleaned up as far as how they 
are handled, document backwards compatibility, and commonize where we can. I 
filed a jira for that a whiel back and we can do it separately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to