Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2350#discussion_r17435604
  
    --- Diff: 
yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
    @@ -45,120 +46,97 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
     
       def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
     
    -  val args = clientArgs
    -  val conf = hadoopConf
    -  val sparkConf = spConf
    -  var rpc: YarnRPC = YarnRPC.create(conf)
    -  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
    +  val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)
     
    +  /* 
-------------------------------------------------------------------------------------
 *
    +   | The following methods have much in common in the stable and alpha 
versions of Client, |
    +   | but cannot be implemented in the parent trait due to subtle API 
differences across    |
    +   | hadoop versions.                                                      
                |
    +   * 
-------------------------------------------------------------------------------------
 */
     
    -  // for client user who want to monitor app status by itself.
    -  def runApp() = {
    -    validateArgs()
    -
    +  /** Submit an application running our ApplicationMaster to the 
ResourceManager. */
    +  override def submitApplication(): ApplicationId = {
         init(yarnConf)
         start()
    -    logClusterResourceDetails()
     
    -    val newApp = super.getNewApplication()
    -    val appId = newApp.getApplicationId()
    +    logInfo("Requesting a new application from cluster with %d 
NodeManagers"
    +      .format(getYarnClusterMetrics.getNumNodeManagers))
     
    -    verifyClusterResources(newApp)
    -    val appContext = createApplicationSubmissionContext(appId)
    -    val appStagingDir = getAppStagingDir(appId)
    -    val localResources = prepareLocalResources(appStagingDir)
    -    val env = setupLaunchEnv(localResources, appStagingDir)
    -    val amContainer = createContainerLaunchContext(newApp, localResources, 
env)
    +    // Get a new application from our RM
    +    val newAppResponse = getNewApplication()
    +    val appId = newAppResponse.getApplicationId()
     
    -    val capability = 
Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
    -    // Memory for the ApplicationMaster.
    -    capability.setMemory(args.amMemory + memoryOverhead)
    -    amContainer.setResource(capability)
    +    // Verify whether the cluster has enough resources for our AM
    +    verifyClusterResources(newAppResponse)
     
    -    appContext.setQueue(args.amQueue)
    -    appContext.setAMContainerSpec(amContainer)
    -    
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
    +    // Set up the appropriate contexts to launch our AM
    +    val containerContext = createContainerLaunchContext(newAppResponse)
    +    val appContext = createApplicationSubmissionContext(appId, 
containerContext)
     
    -    submitApp(appContext)
    +    // Finally, submit and monitor the application
    +    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
    +    submitApplication(appContext)
         appId
       }
     
    -  def run() {
    -    val appId = runApp()
    -    monitorApplication(appId)
    +  /**
    +   * Set up a context for launching our ApplicationMaster container.
    +   * In the Yarn alpha API, the memory requirements of this container must 
be set in
    +   * the ContainerLaunchContext instead of the 
ApplicationSubmissionContext.
    +   */
    +  override def createContainerLaunchContext(newAppResponse: 
GetNewApplicationResponse)
    +      : ContainerLaunchContext = {
    +    val containerContext = 
super.createContainerLaunchContext(newAppResponse)
    +    val capability = Records.newRecord(classOf[Resource])
    +    capability.setMemory(getAMMemory(newAppResponse) + amMemoryOverhead)
    +    containerContext.setResource(capability)
    +    containerContext
       }
     
    -  def logClusterResourceDetails() {
    -    val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
    -    logInfo("Got cluster metric info from ASM, numNodeManagers = " +
    -      clusterMetrics.getNumNodeManagers)
    -  }
    -
    -
    -  def createApplicationSubmissionContext(appId: ApplicationId): 
ApplicationSubmissionContext = {
    -    logInfo("Setting up application submission context for ASM")
    +  /** Set up the context for submitting our ApplicationMaster. */
    +  def createApplicationSubmissionContext(
    +      appId: ApplicationId,
    +      containerContext: ContainerLaunchContext): 
ApplicationSubmissionContext = {
         val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
         appContext.setApplicationId(appId)
         appContext.setApplicationName(args.appName)
    +    appContext.setQueue(args.amQueue)
    +    appContext.setAMContainerSpec(containerContext)
    +    
appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName)
         appContext
       }
     
    -  def calculateAMMemory(newApp: GetNewApplicationResponse): Int = {
    -    val minResMemory = newApp.getMinimumResourceCapability().getMemory()
    -    val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
    -          ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
    -          memoryOverhead)
    -    amMemory
    -  }
    -
    -  def setupSecurityToken(amContainer: ContainerLaunchContext) = {
    -    // Setup security tokens.
    +  /**
    +   * Set up security tokens for launching our ApplicationMaster container.
    +   * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in 
the stable API.
    +   */
    +  override def setupSecurityToken(amContainer: ContainerLaunchContext): 
Unit = {
         val dob = new DataOutputBuffer()
         credentials.writeTokenStorageToStream(dob)
         amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
       }
     
    -  def submitApp(appContext: ApplicationSubmissionContext) = {
    -    // Submit the application to the applications manager.
    -    logInfo("Submitting application to ASM")
    -    super.submitApplication(appContext)
    +  /**
    +   * Return the amount of memory for launching the ApplicationMaster 
container (MB).
    +   * GetNewApplicationResponse#getMinimumResourceCapability does not exist 
in the stable API.
    +   */
    +  override def getAMMemory(newAppResponse: GetNewApplicationResponse): Int 
= {
    +    val minResMemory = 
newAppResponse.getMinimumResourceCapability().getMemory()
    +    val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
    +      ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - 
amMemoryOverhead)
    +    amMemory
       }
     
    -  def monitorApplication(appId: ApplicationId): Boolean = {
    -    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
    -
    -    while (true) {
    -      Thread.sleep(interval)
    -      val report = super.getApplicationReport(appId)
    -
    -      logInfo("Application report from ASM: \n" +
    -        "\t application identifier: " + appId.toString() + "\n" +
    -        "\t appId: " + appId.getId() + "\n" +
    -        "\t clientToken: " + report.getClientToken() + "\n" +
    -        "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
    -        "\t appMasterHost: " + report.getHost() + "\n" +
    -        "\t appQueue: " + report.getQueue() + "\n" +
    -        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
    -        "\t appStartTime: " + report.getStartTime() + "\n" +
    -        "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
    -        "\t distributedFinalState: " + report.getFinalApplicationStatus() 
+ "\n" +
    -        "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
    -        "\t appUser: " + report.getUser()
    -      )
    -
    -      val state = report.getYarnApplicationState()
    -      if (state == YarnApplicationState.FINISHED ||
    -        state == YarnApplicationState.FAILED ||
    -        state == YarnApplicationState.KILLED) {
    -        return true
    -      }
    -    }
    -    true
    -  }
    +  /**
    +   * Return the security token used by this client to communicate with the 
ApplicationMaster.
    +   * If no security is enabled, the token returned by the report is null.
    +   * ApplicationReport#getClientToken is renamed `getClientToAMToken` in 
the stable API.
    +   */
    +  override def getClientToken(report: ApplicationReport): String =
    +    Option(report.getClientToken).getOrElse("")
     }
     
    -object Client {
    -
    +private[spark] object Client {
    --- End diff --
    
    Never mind, I just saw your comment elsewhere about people who do this. I 
will expose this again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to