Repository: spark Updated Branches: refs/heads/master 14f8c3404 -> c4022dd52
http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 82e45e3..0b43e6e 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,11 +21,9 @@ import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.DataOutputBuffer -import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SparkConf} @@ -34,128 +32,98 @@ import org.apache.spark.deploy.SparkHadoopUtil /** * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API. */ -class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf) +private[spark] class Client( + val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf) extends ClientBase with Logging { - val yarnClient = YarnClient.createYarnClient - def this(clientArgs: ClientArguments, spConf: SparkConf) = this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf()) - val args = clientArgs - val conf = hadoopConf - val sparkConf = spConf - var rpc: YarnRPC = YarnRPC.create(conf) - val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - - def runApp(): ApplicationId = { - validateArgs() - // Initialize and start the client service. + val yarnClient = YarnClient.createYarnClient + val yarnConf = new YarnConfiguration(hadoopConf) + + def stop(): Unit = yarnClient.stop() + + /* ------------------------------------------------------------------------------------- * + | The following methods have much in common in the stable and alpha versions of Client, | + | but cannot be implemented in the parent trait due to subtle API differences across | + | hadoop versions. | + * ------------------------------------------------------------------------------------- */ + + /** + * Submit an application running our ApplicationMaster to the ResourceManager. + * + * The stable Yarn API provides a convenience method (YarnClient#createApplication) for + * creating applications and setting up the application submission context. This was not + * available in the alpha API. + */ + override def submitApplication(): ApplicationId = { yarnClient.init(yarnConf) yarnClient.start() - // Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers). - logClusterResourceDetails() - - // Prepare to submit a request to the ResourcManager (specifically its ApplicationsManager (ASM) - // interface). + logInfo("Requesting a new application from cluster with %d NodeManagers" + .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) - // Get a new client application. + // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() val appId = newAppResponse.getApplicationId() + // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) - // Set up resource and environment variables. - val appStagingDir = getAppStagingDir(appId) - val localResources = prepareLocalResources(appStagingDir) - val launchEnv = setupLaunchEnv(localResources, appStagingDir) - val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv) + // Set up the appropriate contexts to launch our AM + val containerContext = createContainerLaunchContext(newAppResponse) + val appContext = createApplicationSubmissionContext(newApp, containerContext) - // Set up an application submission context. - val appContext = newApp.getApplicationSubmissionContext() - appContext.setApplicationName(args.appName) - appContext.setQueue(args.amQueue) - appContext.setAMContainerSpec(amContainer) - appContext.setApplicationType("SPARK") - - // Memory for the ApplicationMaster. - val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] - memoryResource.setMemory(args.amMemory + memoryOverhead) - appContext.setResource(memoryResource) - - // Finally, submit and monitor the application. - submitApp(appContext) + // Finally, submit and monitor the application + logInfo(s"Submitting application ${appId.getId} to ResourceManager") + yarnClient.submitApplication(appContext) appId } - def run() { - val appId = runApp() - monitorApplication(appId) - } - - def logClusterResourceDetails() { - val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics - logInfo("Got cluster metric info from ResourceManager, number of NodeManagers: " + - clusterMetrics.getNumNodeManagers) + /** + * Set up the context for submitting our ApplicationMaster. + * This uses the YarnClientApplication not available in the Yarn alpha API. + */ + def createApplicationSubmissionContext( + newApp: YarnClientApplication, + containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { + val appContext = newApp.getApplicationSubmissionContext + appContext.setApplicationName(args.appName) + appContext.setQueue(args.amQueue) + appContext.setAMContainerSpec(containerContext) + appContext.setApplicationType("SPARK") + val capability = Records.newRecord(classOf[Resource]) + capability.setMemory(args.amMemory + amMemoryOverhead) + appContext.setResource(capability) + appContext } - def setupSecurityToken(amContainer: ContainerLaunchContext) = { - // Setup security tokens. - val dob = new DataOutputBuffer() + /** Set up security tokens for launching our ApplicationMaster container. */ + override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { + val dob = new DataOutputBuffer credentials.writeTokenStorageToStream(dob) - amContainer.setTokens(ByteBuffer.wrap(dob.getData())) + amContainer.setTokens(ByteBuffer.wrap(dob.getData)) } - def submitApp(appContext: ApplicationSubmissionContext) = { - // Submit the application to the applications manager. - logInfo("Submitting application to ResourceManager") - yarnClient.submitApplication(appContext) - } + /** Get the application report from the ResourceManager for an application we have submitted. */ + override def getApplicationReport(appId: ApplicationId): ApplicationReport = + yarnClient.getApplicationReport(appId) - def getApplicationReport(appId: ApplicationId) = - yarnClient.getApplicationReport(appId) - - def stop = yarnClient.stop - - def monitorApplication(appId: ApplicationId): Boolean = { - val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) - - while (true) { - Thread.sleep(interval) - val report = yarnClient.getApplicationReport(appId) - - logInfo("Application report from ResourceManager: \n" + - "\t application identifier: " + appId.toString() + "\n" + - "\t appId: " + appId.getId() + "\n" + - "\t clientToAMToken: " + report.getClientToAMToken() + "\n" + - "\t appDiagnostics: " + report.getDiagnostics() + "\n" + - "\t appMasterHost: " + report.getHost() + "\n" + - "\t appQueue: " + report.getQueue() + "\n" + - "\t appMasterRpcPort: " + report.getRpcPort() + "\n" + - "\t appStartTime: " + report.getStartTime() + "\n" + - "\t yarnAppState: " + report.getYarnApplicationState() + "\n" + - "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" + - "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" + - "\t appUser: " + report.getUser() - ) - - val state = report.getYarnApplicationState() - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - return true - } - } - true - } + /** + * Return the security token used by this client to communicate with the ApplicationMaster. + * If no security is enabled, the token returned by the report is null. + */ + override def getClientToken(report: ApplicationReport): String = + Option(report.getClientToAMToken).map(_.toString).getOrElse("") } object Client { - def main(argStrings: Array[String]) { if (!sys.props.contains("SPARK_SUBMIT")) { println("WARNING: This client is deprecated and will be removed in a " + @@ -163,22 +131,19 @@ object Client { } // Set an env variable indicating we are running in YARN mode. - // Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes - - // see Client#setupLaunchEnv(). + // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") - val sparkConf = new SparkConf() + val sparkConf = new SparkConf try { val args = new ClientArguments(argStrings, sparkConf) new Client(args, sparkConf).run() } catch { - case e: Exception => { + case e: Exception => Console.err.println(e.getMessage) System.exit(1) - } } System.exit(0) } - } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org