This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new db9a982 [SPARK-37461][YARN] YARN-CLIENT mode client.appId is always null db9a982 is described below commit db9a982a1441810314be07e2c3b7ccffff77d1f1 Author: Angerszhuuuu <angers....@gmail.com> AuthorDate: Sun Nov 28 08:53:25 2021 -0600 [SPARK-37461][YARN] YARN-CLIENT mode client.appId is always null ### What changes were proposed in this pull request? In yarn-client mode, `Client.appId` variable is not assigned, it is always `null`, in cluster mode, this variable will be assigned to the true value. In this patch, we assign true application id to `appId` too ### Why are the changes needed? 1. Refactor the code to avoid define different id in each function, we can just use this variable. 2. In client mode, user can use this value to get the application id. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manuel tested. We have a internal proxy server to replace yarn tracking url, here use `appId`, with this patch it's not null. ``` 21/11/26 12:38:44 INFO Client: client token: N/A diagnostics: AM container is launched, waiting for AM container to Register with RM ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: user_queue start time: 1637901520956 final status: UNDEFINED tracking URL: http://internal-proxy-server/proxy?applicationId=application_1635856758535_4209064 user: user_name ``` Closes #34710 from AngersZhuuuu/SPARK-37461. Authored-by: Angerszhuuuu <angers....@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7787e2f..e6136fc 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -169,7 +169,6 @@ private[spark] class Client( def submitApplication(): ApplicationId = { ResourceRequestHelper.validateResources(sparkConf) - var appId: ApplicationId = null try { launcherBackend.connect() yarnClient.init(hadoopConf) @@ -181,7 +180,7 @@ private[spark] class Client( // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() - appId = newAppResponse.getApplicationId() + this.appId = newAppResponse.getApplicationId() // The app staging dir based on the STAGING_DIR configuration if configured // otherwise based on the users home directory. @@ -207,8 +206,7 @@ private[spark] class Client( yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) - - appId + this.appId } catch { case e: Throwable => if (stagingDirPath != null) { @@ -915,7 +913,6 @@ private[spark] class Client( private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") - val appId = newAppResponse.getApplicationId val pySparkArchives = if (sparkConf.get(IS_PYTHON_APP)) { findPySparkArchives() @@ -971,7 +968,7 @@ private[spark] class Client( if (isClusterMode) { sparkConf.get(DRIVER_JAVA_OPTIONS).foreach { opts => javaOpts ++= Utils.splitCommandString(opts) - .map(Utils.substituteAppId(_, appId.toString)) + .map(Utils.substituteAppId(_, this.appId.toString)) .map(YarnSparkHadoopUtil.escapeForShell) } val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH), @@ -996,7 +993,7 @@ private[spark] class Client( throw new SparkException(msg) } javaOpts ++= Utils.splitCommandString(opts) - .map(Utils.substituteAppId(_, appId.toString)) + .map(Utils.substituteAppId(_, this.appId.toString)) .map(YarnSparkHadoopUtil.escapeForShell) } sparkConf.get(AM_LIBRARY_PATH).foreach { paths => @@ -1269,7 +1266,7 @@ private[spark] class Client( * throw an appropriate SparkException. */ def run(): Unit = { - this.appId = submitApplication() + submitApplication() if (!launcherBackend.isConnected() && fireAndForget) { val report = getApplicationReport(appId) val state = report.getYarnApplicationState --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org