This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new db9a982  [SPARK-37461][YARN] YARN-CLIENT mode client.appId is always 
null
db9a982 is described below

commit db9a982a1441810314be07e2c3b7ccffff77d1f1
Author: Angerszhuuuu <angers....@gmail.com>
AuthorDate: Sun Nov 28 08:53:25 2021 -0600

    [SPARK-37461][YARN] YARN-CLIENT mode client.appId is always null
    
    ### What changes were proposed in this pull request?
    In yarn-client mode, `Client.appId` variable is not assigned, it is always 
`null`,  in cluster mode, this variable will be assigned to the true value. In 
this patch, we assign true application id to `appId` too
    
    ### Why are the changes needed?
    
    1. Refactor the code to avoid define different id in each function, we can 
just use this variable.
    2. In client mode, user can use this value to get the application id.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manuel tested.
    
    We have a internal proxy server to replace yarn tracking url, here use 
`appId`, with this patch it's not null.
    
    ```
    21/11/26 12:38:44 INFO Client:
         client token: N/A
         diagnostics: AM container is launched, waiting for AM container to 
Register with RM
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: user_queue
         start time: 1637901520956
         final status: UNDEFINED
         tracking URL: 
http://internal-proxy-server/proxy?applicationId=application_1635856758535_4209064
         user: user_name
    ```
    
    Closes #34710 from AngersZhuuuu/SPARK-37461.
    
    Authored-by: Angerszhuuuu <angers....@gmail.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../main/scala/org/apache/spark/deploy/yarn/Client.scala    | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7787e2f..e6136fc 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -169,7 +169,6 @@ private[spark] class Client(
   def submitApplication(): ApplicationId = {
     ResourceRequestHelper.validateResources(sparkConf)
 
-    var appId: ApplicationId = null
     try {
       launcherBackend.connect()
       yarnClient.init(hadoopConf)
@@ -181,7 +180,7 @@ private[spark] class Client(
       // Get a new application from our RM
       val newApp = yarnClient.createApplication()
       val newAppResponse = newApp.getNewApplicationResponse()
-      appId = newAppResponse.getApplicationId()
+      this.appId = newAppResponse.getApplicationId()
 
       // The app staging dir based on the STAGING_DIR configuration if 
configured
       // otherwise based on the users home directory.
@@ -207,8 +206,7 @@ private[spark] class Client(
       yarnClient.submitApplication(appContext)
       launcherBackend.setAppId(appId.toString)
       reportLauncherState(SparkAppHandle.State.SUBMITTED)
-
-      appId
+      this.appId
     } catch {
       case e: Throwable =>
         if (stagingDirPath != null) {
@@ -915,7 +913,6 @@ private[spark] class Client(
   private def createContainerLaunchContext(newAppResponse: 
GetNewApplicationResponse)
     : ContainerLaunchContext = {
     logInfo("Setting up container launch context for our AM")
-    val appId = newAppResponse.getApplicationId
     val pySparkArchives =
       if (sparkConf.get(IS_PYTHON_APP)) {
         findPySparkArchives()
@@ -971,7 +968,7 @@ private[spark] class Client(
     if (isClusterMode) {
       sparkConf.get(DRIVER_JAVA_OPTIONS).foreach { opts =>
         javaOpts ++= Utils.splitCommandString(opts)
-          .map(Utils.substituteAppId(_, appId.toString))
+          .map(Utils.substituteAppId(_, this.appId.toString))
           .map(YarnSparkHadoopUtil.escapeForShell)
       }
       val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
@@ -996,7 +993,7 @@ private[spark] class Client(
           throw new SparkException(msg)
         }
         javaOpts ++= Utils.splitCommandString(opts)
-          .map(Utils.substituteAppId(_, appId.toString))
+          .map(Utils.substituteAppId(_, this.appId.toString))
           .map(YarnSparkHadoopUtil.escapeForShell)
       }
       sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
@@ -1269,7 +1266,7 @@ private[spark] class Client(
    * throw an appropriate SparkException.
    */
   def run(): Unit = {
-    this.appId = submitApplication()
+    submitApplication()
     if (!launcherBackend.isConnected() && fireAndForget) {
       val report = getApplicationReport(appId)
       val state = report.getYarnApplicationState

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to