Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2350#discussion_r17637499 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala --- @@ -36,113 +36,114 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null - var checkerThread: Thread = null var stopping: Boolean = false var totalExpectedExecutors = 0 - private[spark] def addArg(optionName: String, envVar: String, sysProp: String, - arrayBuf: ArrayBuffer[String]) { - if (System.getenv(envVar) != null) { - arrayBuf += (optionName, System.getenv(envVar)) - } else if (sc.getConf.contains(sysProp)) { - arrayBuf += (optionName, sc.getConf.get(sysProp)) - } - } - + /** + * Create a Yarn client to submit an application to the ResourceManager. + * This waits until the application is running. + */ override def start() { super.start() - val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort) val argsArrayBuf = new ArrayBuffer[String]() - argsArrayBuf += ( - "--args", hostport - ) - - // process any optional arguments, given either as environment variables - // or system properties. use the defaults already defined in ClientArguments - // if things aren't specified. system properties override environment - // variables. - List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), - ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), - ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), - ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), - ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), - ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), - ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), - ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), - ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), - ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")) - .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } - - logDebug("ClientArguments called with: " + argsArrayBuf) + argsArrayBuf += ("--arg", hostport) + argsArrayBuf ++= getExtraClientArguments + + logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) val args = new ClientArguments(argsArrayBuf.toArray, conf) totalExpectedExecutors = args.numExecutors client = new Client(args, conf) - appId = client.runApp() - waitForApp() - checkerThread = yarnApplicationStateCheckerThread() + appId = client.submitApplication() + waitForApplication() + asyncMonitorApplication() } - def waitForApp() { - - // TODO : need a better way to find out whether the executors are ready or not - // maybe by resource usage report? - while(true) { - val report = client.getApplicationReport(appId) - - logInfo("Application report from ASM: \n" + - "\t appMasterRpcPort: " + report.getRpcPort() + "\n" + - "\t appStartTime: " + report.getStartTime() + "\n" + - "\t yarnAppState: " + report.getYarnApplicationState() + "\n" + /** + * Return any extra command line arguments to be passed to Client provided in the form of + * environment variables or Spark properties. + */ + private def getExtraClientArguments: Seq[String] = { + val extraArgs = new ArrayBuffer[String] + val optionTuples = // List of (target Client argument, environment variable, Spark property) + List( + ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), + ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), + ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), + ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), + ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), + ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), + ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), + ("--name", "SPARK_YARN_APP_NAME", "spark.app.name") ) - - // Ready to go, or already gone. - val state = report.getYarnApplicationState() - if (state == YarnApplicationState.RUNNING) { - return - } else if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - throw new SparkException("Yarn application already ended," + - "might be killed or not able to launch application master.") + optionTuples.foreach { case (optionName, envVar, sparkProp) => + if (System.getenv(envVar) != null) { + extraArgs += (optionName, System.getenv(envVar)) + } else if (sc.getConf.contains(sparkProp)) { + extraArgs += (optionName, sc.getConf.get(sparkProp)) } + } + extraArgs + } - Thread.sleep(1000) + /** + * Report the state of the application until it is running. + * If the application has finished, failed or been killed in the process, throw an exception. + * This assumes both `client` and `appId` have already been set. + */ + private def waitForApplication(): Unit = { + assert(client != null && appId != null, "Application has not been submitted yet!") + val state = client.monitorApplication(appId, returnOnRunning = true) // blocking + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + throw new SparkException("Yarn application has already ended! " + + "It might have been killed or unable to launch application master.") + } + if (state == YarnApplicationState.RUNNING) { + logInfo(s"Application ${appId.getId} has started running.") } } - private def yarnApplicationStateCheckerThread(): Thread = { + /** + * Monitor the application state in a separate thread. + * If the application has exited for any reason, stop the SparkContext. + * This assumes both `client` and `appId` have already been set. + */ + private def asyncMonitorApplication(): Thread = { + assert(client != null && appId != null, "Application has not been submitted yet!") val t = new Thread { override def run() { - while (!stopping) { --- End diff -- Ah yes good catch.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org