Repository: spark
Updated Branches:
  refs/heads/master 14f8c3404 -> c4022dd52


http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 82e45e3..0b43e6e 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,11 +21,9 @@ import java.nio.ByteBuffer
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SparkConf}
@@ -34,128 +32,98 @@ import org.apache.spark.deploy.SparkHadoopUtil
 /**
  * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's 
stable API.
  */
-class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: 
SparkConf)
+private[spark] class Client(
+    val args: ClientArguments,
+    val hadoopConf: Configuration,
+    val sparkConf: SparkConf)
   extends ClientBase with Logging {
 
-  val yarnClient = YarnClient.createYarnClient
-
   def this(clientArgs: ClientArguments, spConf: SparkConf) =
     this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
 
   def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
 
-  val args = clientArgs
-  val conf = hadoopConf
-  val sparkConf = spConf
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  def runApp(): ApplicationId = {
-    validateArgs()
-    // Initialize and start the client service.
+  val yarnClient = YarnClient.createYarnClient
+  val yarnConf = new YarnConfiguration(hadoopConf)
+
+  def stop(): Unit = yarnClient.stop()
+
+  /* 
-------------------------------------------------------------------------------------
 *
+   | The following methods have much in common in the stable and alpha 
versions of Client, |
+   | but cannot be implemented in the parent trait due to subtle API 
differences across    |
+   | hadoop versions.                                                          
            |
+   * 
-------------------------------------------------------------------------------------
 */
+
+  /**
+   * Submit an application running our ApplicationMaster to the 
ResourceManager.
+   *
+   * The stable Yarn API provides a convenience method 
(YarnClient#createApplication) for
+   * creating applications and setting up the application submission context. 
This was not
+   * available in the alpha API.
+   */
+  override def submitApplication(): ApplicationId = {
     yarnClient.init(yarnConf)
     yarnClient.start()
 
-    // Log details about this YARN cluster (e.g, the number of slave 
machines/NodeManagers).
-    logClusterResourceDetails()
-
-    // Prepare to submit a request to the ResourcManager (specifically its 
ApplicationsManager (ASM)
-    // interface).
+    logInfo("Requesting a new application from cluster with %d NodeManagers"
+      .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
 
-    // Get a new client application.
+    // Get a new application from our RM
     val newApp = yarnClient.createApplication()
     val newAppResponse = newApp.getNewApplicationResponse()
     val appId = newAppResponse.getApplicationId()
 
+    // Verify whether the cluster has enough resources for our AM
     verifyClusterResources(newAppResponse)
 
-    // Set up resource and environment variables.
-    val appStagingDir = getAppStagingDir(appId)
-    val localResources = prepareLocalResources(appStagingDir)
-    val launchEnv = setupLaunchEnv(localResources, appStagingDir)
-    val amContainer = createContainerLaunchContext(newAppResponse, 
localResources, launchEnv)
+    // Set up the appropriate contexts to launch our AM
+    val containerContext = createContainerLaunchContext(newAppResponse)
+    val appContext = createApplicationSubmissionContext(newApp, 
containerContext)
 
-    // Set up an application submission context.
-    val appContext = newApp.getApplicationSubmissionContext()
-    appContext.setApplicationName(args.appName)
-    appContext.setQueue(args.amQueue)
-    appContext.setAMContainerSpec(amContainer)
-    appContext.setApplicationType("SPARK")
-
-    // Memory for the ApplicationMaster.
-    val memoryResource = 
Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
-    memoryResource.setMemory(args.amMemory + memoryOverhead)
-    appContext.setResource(memoryResource)
-
-    // Finally, submit and monitor the application.
-    submitApp(appContext)
+    // Finally, submit and monitor the application
+    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+    yarnClient.submitApplication(appContext)
     appId
   }
 
-  def run() {
-    val appId = runApp()
-    monitorApplication(appId)
-  }
-
-  def logClusterResourceDetails() {
-    val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
-    logInfo("Got cluster metric info from ResourceManager, number of 
NodeManagers: " +
-      clusterMetrics.getNumNodeManagers)
+  /**
+   * Set up the context for submitting our ApplicationMaster.
+   * This uses the YarnClientApplication not available in the Yarn alpha API.
+   */
+  def createApplicationSubmissionContext(
+      newApp: YarnClientApplication,
+      containerContext: ContainerLaunchContext): ApplicationSubmissionContext 
= {
+    val appContext = newApp.getApplicationSubmissionContext
+    appContext.setApplicationName(args.appName)
+    appContext.setQueue(args.amQueue)
+    appContext.setAMContainerSpec(containerContext)
+    appContext.setApplicationType("SPARK")
+    val capability = Records.newRecord(classOf[Resource])
+    capability.setMemory(args.amMemory + amMemoryOverhead)
+    appContext.setResource(capability)
+    appContext
   }
 
-  def setupSecurityToken(amContainer: ContainerLaunchContext) = {
-    // Setup security tokens.
-    val dob = new DataOutputBuffer()
+  /** Set up security tokens for launching our ApplicationMaster container. */
+  override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = 
{
+    val dob = new DataOutputBuffer
     credentials.writeTokenStorageToStream(dob)
-    amContainer.setTokens(ByteBuffer.wrap(dob.getData()))
+    amContainer.setTokens(ByteBuffer.wrap(dob.getData))
   }
 
-  def submitApp(appContext: ApplicationSubmissionContext) = {
-    // Submit the application to the applications manager.
-    logInfo("Submitting application to ResourceManager")
-    yarnClient.submitApplication(appContext)
-  }
+  /** Get the application report from the ResourceManager for an application 
we have submitted. */
+  override def getApplicationReport(appId: ApplicationId): ApplicationReport =
+    yarnClient.getApplicationReport(appId)
 
-  def getApplicationReport(appId: ApplicationId) =
-      yarnClient.getApplicationReport(appId)
-
-  def stop = yarnClient.stop
-
-  def monitorApplication(appId: ApplicationId): Boolean = {
-    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
-    while (true) {
-      Thread.sleep(interval)
-      val report = yarnClient.getApplicationReport(appId)
-
-      logInfo("Application report from ResourceManager: \n" +
-        "\t application identifier: " + appId.toString() + "\n" +
-        "\t appId: " + appId.getId() + "\n" +
-        "\t clientToAMToken: " + report.getClientToAMToken() + "\n" +
-        "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
-        "\t appMasterHost: " + report.getHost() + "\n" +
-        "\t appQueue: " + report.getQueue() + "\n" +
-        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
-        "\t appStartTime: " + report.getStartTime() + "\n" +
-        "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
-        "\t distributedFinalState: " + report.getFinalApplicationStatus() + 
"\n" +
-        "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
-        "\t appUser: " + report.getUser()
-      )
-
-      val state = report.getYarnApplicationState()
-      if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
-        return true
-      }
-    }
-    true
-  }
+  /**
+   * Return the security token used by this client to communicate with the 
ApplicationMaster.
+   * If no security is enabled, the token returned by the report is null.
+   */
+  override def getClientToken(report: ApplicationReport): String =
+    Option(report.getClientToAMToken).map(_.toString).getOrElse("")
 }
 
 object Client {
-
   def main(argStrings: Array[String]) {
     if (!sys.props.contains("SPARK_SUBMIT")) {
       println("WARNING: This client is deprecated and will be removed in a " +
@@ -163,22 +131,19 @@ object Client {
     }
 
     // Set an env variable indicating we are running in YARN mode.
-    // Note: anything env variable with SPARK_ prefix gets propagated to all 
(remote) processes -
-    // see Client#setupLaunchEnv().
+    // Note that any env variable with the SPARK_ prefix gets propagated to 
all (remote) processes
     System.setProperty("SPARK_YARN_MODE", "true")
-    val sparkConf = new SparkConf()
+    val sparkConf = new SparkConf
 
     try {
       val args = new ClientArguments(argStrings, sparkConf)
       new Client(args, sparkConf).run()
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         Console.err.println(e.getMessage)
         System.exit(1)
-      }
     }
 
     System.exit(0)
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to