This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5056c267e6b YARN-11508. [Minor] Improve 
UnmanagedAMPoolManager/UnmanagedApplicationManager Code (#5726)
5056c267e6b is described below

commit 5056c267e6bd29468ec4fbd99da079cc552efc46
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Fri Jul 28 20:20:23 2023 +0800

    YARN-11508. [Minor] Improve 
UnmanagedAMPoolManager/UnmanagedApplicationManager Code (#5726)
---
 .../yarn/server/uam/UnmanagedAMPoolManager.java    | 12 ++--
 .../server/uam/UnmanagedApplicationManager.java    | 78 ++++++++++------------
 .../uam/TestUnmanagedApplicationManager.java       | 62 +++++------------
 3 files changed, 61 insertions(+), 91 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index aa7a84b0c2f..0ff4260c5e3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -76,7 +76,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
 
   private ExecutorService threadpool;
 
-  private String dispatcherThreadName = "UnmanagedAMPoolManager-Finish-Thread";
+  private final String dispatcherThreadName = 
"UnmanagedAMPoolManager-Finish-Thread";
 
   private Thread finishApplicationThread;
 
@@ -138,7 +138,7 @@ public class UnmanagedAMPoolManager extends AbstractService 
{
       boolean keepContainersAcrossApplicationAttempts, String rmName,
       ApplicationSubmissionContext originalAppSubmissionContext)
       throws YarnException, IOException {
-    ApplicationId appId = null;
+    ApplicationId appId;
     ApplicationClientProtocol rmClient;
     try {
       UserGroupInformation appSubmitter =
@@ -198,14 +198,16 @@ public class UnmanagedAMPoolManager extends 
AbstractService {
     if (this.unmanagedAppMasterMap.containsKey(uamId)) {
       throw new YarnException("UAM " + uamId + " already exists");
     }
+
     UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
         submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
         rmName, originalAppSubmissionContext);
+
     // Put the UAM into map first before initializing it to avoid additional 
UAM
     // for the same uamId being created concurrently
     this.unmanagedAppMasterMap.put(uamId, uam);
 
-    Token<AMRMTokenIdentifier> amrmToken = null;
+    Token<AMRMTokenIdentifier> amrmToken;
     try {
       LOG.info("Launching UAM id {} for application {}", uamId, appId);
       amrmToken = uam.launchUAM();
@@ -390,7 +392,7 @@ public class UnmanagedAMPoolManager extends AbstractService 
{
   public Set<String> getAllUAMIds() {
     // Return a clone of the current id set for concurrency reasons, so that 
the
     // returned map won't change with the actual map
-    return new HashSet<String>(this.unmanagedAppMasterMap.keySet());
+    return new HashSet<>(this.unmanagedAppMasterMap.keySet());
   }
 
   /**
@@ -439,7 +441,7 @@ public class UnmanagedAMPoolManager extends AbstractService 
{
    *
    * @param request FinishApplicationMasterRequest
    * @param appId application Id
-   * @return Returns the Map map,
+   * @return Returns the Map,
    *         the key is subClusterId, the value is 
FinishApplicationMasterResponse
    */
   public Map<String, FinishApplicationMasterResponse> 
batchFinishApplicationMaster(
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 3121e6d44d5..56be136bda7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -147,10 +148,8 @@ public class UnmanagedApplicationManager {
     this.registerRequest = null;
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     this.asyncApiPollIntervalMillis = conf.getLong(
-        YarnConfiguration.
-            YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
-        YarnConfiguration.
-            DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+        
YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
+        
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
     this.keepContainersAcrossApplicationAttempts =
         keepContainersAcrossApplicationAttempts;
     this.applicationSubmissionContext = originalApplicationSubmissionContext;
@@ -175,8 +174,7 @@ public class UnmanagedApplicationManager {
     this.connectionInitiated = true;
 
     // Blocking call to RM
-    Token<AMRMTokenIdentifier> amrmToken =
-        initializeUnmanagedAM(this.applicationId);
+    Token<AMRMTokenIdentifier> amrmToken = 
initializeUnmanagedAM(this.applicationId);
 
     // Creates the UAM connection
     createUAMProxy(amrmToken);
@@ -217,8 +215,8 @@ public class UnmanagedApplicationManager {
    * @throws IOException if register fails
    */
   public RegisterApplicationMasterResponse registerApplicationMaster(
-      RegisterApplicationMasterRequest request)
-      throws YarnException, IOException {
+      RegisterApplicationMasterRequest request) throws YarnException, 
IOException {
+
     // Save the register request for re-register later
     this.registerRequest = request;
 
@@ -228,16 +226,17 @@ public class UnmanagedApplicationManager {
         this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
     this.heartbeatHandler.resetLastResponseId();
 
-    for (Container container : response.getContainersFromPreviousAttempts()) {
-      LOG.debug("RegisterUAM returned existing running container {}",
-          container.getId());
-    }
-    for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
-      LOG.debug("RegisterUAM returned existing NM token for node {}",
-          nmToken.getNodeId());
+    if (LOG.isDebugEnabled()) {
+      for (Container container : response.getContainersFromPreviousAttempts()) 
{
+        LOG.debug("RegisterUAM returned existing running container {}", 
container.getId());
+      }
+
+      for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
+        LOG.debug("RegisterUAM returned existing NM token for node {}", 
nmToken.getNodeId());
+      }
     }
-    LOG.info(
-        "RegisterUAM returned {} existing running container and {} NM tokens",
+
+    LOG.info("RegisterUAM returned {} existing running container and {} NM 
tokens",
         response.getContainersFromPreviousAttempts().size(),
         response.getNMTokensFromPreviousAttempts().size());
 
@@ -257,8 +256,8 @@ public class UnmanagedApplicationManager {
    * @throws IOException if finishAM call fails
    */
   public FinishApplicationMasterResponse finishApplicationMaster(
-      FinishApplicationMasterRequest request)
-      throws YarnException, IOException {
+      FinishApplicationMasterRequest request) throws YarnException, 
IOException {
+
     if (this.userUgi == null) {
       if (this.connectionInitiated) {
         // This is possible if the async launchUAM is still
@@ -322,8 +321,7 @@ public class UnmanagedApplicationManager {
         LOG.info("Unmanaged AM still not successfully launched/registered yet."
             + " Saving the allocate request and send later.");
       } else {
-        throw new YarnException(
-            "AllocateAsync should not be called before launchUAM");
+        throw new YarnException("AllocateAsync should not be called before 
launchUAM");
       }
     }
   }
@@ -358,7 +356,7 @@ public class UnmanagedApplicationManager {
    * Returns RM proxy for the specified protocol type. Unit test cases can
    * override this method and return mock proxy instances.
    *
-   * @param protocol protocal of the proxy
+   * @param protocol protocol of the proxy
    * @param config configuration
    * @param user ugi for the proxy connection
    * @param token token for the connection
@@ -411,8 +409,8 @@ public class UnmanagedApplicationManager {
     }
   }
 
-  private void submitUnmanagedApp(ApplicationId appId)
-      throws YarnException, IOException {
+  private void submitUnmanagedApp(ApplicationId appId) throws YarnException, 
IOException {
+
     SubmitApplicationRequest submitRequest =
         this.recordFactory.newRecordInstance(SubmitApplicationRequest.class);
 
@@ -422,8 +420,7 @@ public class UnmanagedApplicationManager {
     context.setApplicationId(appId);
     context.setApplicationName(APP_NAME + "-" + appNameSuffix);
     if (StringUtils.isBlank(this.queueName)) {
-      context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG,
-          YarnConfiguration.DEFAULT_QUEUE_NAME));
+      context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG, 
YarnConfiguration.DEFAULT_QUEUE_NAME));
     } else {
       context.setQueue(this.queueName);
     }
@@ -467,8 +464,7 @@ public class UnmanagedApplicationManager {
    * @throws IOException if getApplicationReport fails
    */
   private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId 
appId,
-      Set<YarnApplicationState> appStates,
-      YarnApplicationAttemptState attemptState)
+      Set<YarnApplicationState> appStates, YarnApplicationAttemptState 
attemptState)
       throws YarnException, IOException {
 
     long startTime = System.currentTimeMillis();
@@ -495,25 +491,26 @@ public class UnmanagedApplicationManager {
       }
 
       if (appAttemptId != null) {
-        GetApplicationAttemptReportRequest req = this.recordFactory
-            .newRecordInstance(GetApplicationAttemptReportRequest.class);
+        GetApplicationAttemptReportRequest req =
+             
this.recordFactory.newRecordInstance(GetApplicationAttemptReportRequest.class);
         req.setApplicationAttemptId(appAttemptId);
-        ApplicationAttemptReport attemptReport = this.rmClient
-            .getApplicationAttemptReport(req).getApplicationAttemptReport();
-        if (attemptState
-            .equals(attemptReport.getYarnApplicationAttemptState())) {
+        GetApplicationAttemptReportResponse appAttemptReport =
+            this.rmClient.getApplicationAttemptReport(req);
+        ApplicationAttemptReport attemptReport = 
appAttemptReport.getApplicationAttemptReport();
+        YarnApplicationAttemptState appAttemptState =
+            attemptReport.getYarnApplicationAttemptState();
+        if (attemptState.equals(appAttemptState)) {
           return attemptReport;
         }
-        LOG.info("Current attempt state of " + appAttemptId + " is "
-            + attemptReport.getYarnApplicationAttemptState()
-            + ", waiting for current attempt to reach " + attemptState);
+        LOG.info("Current attempt state of {} is {}, waiting for current 
attempt to reach {}.",
+            appAttemptId, appAttemptState, attemptState);
       }
 
       try {
         Thread.sleep(this.asyncApiPollIntervalMillis);
       } catch (InterruptedException e) {
-        LOG.warn("Interrupted while waiting for current attempt of " + appId
-            + " to reach " + attemptState);
+        LOG.warn("Interrupted while waiting for current attempt of {} to reach 
{}.",
+            appId, attemptState);
       }
 
       if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) {
@@ -538,8 +535,7 @@ public class UnmanagedApplicationManager {
     if (amrmToken != null) {
       token = ConverterUtils.convertFromYarn(amrmToken, (Text) null);
     } else {
-      LOG.warn(
-          "AMRMToken not found in the application report for application: {}",
+      LOG.warn("AMRMToken not found in the application report for application: 
{}",
           this.applicationId);
     }
     return token;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
index 65266bf4118..4a57e26cb82 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -351,23 +351,15 @@ public class TestUnmanagedApplicationManager {
       ApplicationAttemptId appAttemptId)
       throws IOException, InterruptedException {
     return getUGIWithToken(appAttemptId)
-        .doAs(new PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>() {
-          @Override
-          public Token<AMRMTokenIdentifier> run() throws Exception {
-            return uam.launchUAM();
-          }
-        });
+        .doAs((PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>) () -> 
uam.launchUAM());
   }
 
   protected void reAttachUAM(final Token<AMRMTokenIdentifier> uamToken,
       ApplicationAttemptId appAttemptId)
       throws IOException, InterruptedException {
-    getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() 
{
-      @Override
-      public Token<AMRMTokenIdentifier> run() throws Exception {
-        uam.reAttachUAM(uamToken);
-        return null;
-      }
+    getUGIWithToken(appAttemptId).doAs((PrivilegedExceptionAction<Object>) () 
-> {
+      uam.reAttachUAM(uamToken);
+      return null;
     });
   }
 
@@ -376,25 +368,16 @@ public class TestUnmanagedApplicationManager {
       ApplicationAttemptId appAttemptId)
       throws YarnException, IOException, InterruptedException {
     return getUGIWithToken(appAttemptId).doAs(
-        new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
-          @Override
-          public RegisterApplicationMasterResponse run()
-              throws YarnException, IOException {
-            return uam.registerApplicationMaster(request);
-          }
-        });
+        (PrivilegedExceptionAction<RegisterApplicationMasterResponse>)
+        () -> uam.registerApplicationMaster(request));
   }
 
   protected void allocateAsync(final AllocateRequest request,
-      final AsyncCallback<AllocateResponse> callBack,
-      ApplicationAttemptId appAttemptId)
+      final AsyncCallback<AllocateResponse> callBack, ApplicationAttemptId 
appAttemptId)
       throws YarnException, IOException, InterruptedException {
-    getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() 
{
-      @Override
-      public Object run() throws YarnException {
-        uam.allocateAsync(request, callBack);
-        return null;
-      }
+    getUGIWithToken(appAttemptId).doAs((PrivilegedExceptionAction<Object>) () 
-> {
+      uam.allocateAsync(request, callBack);
+      return null;
     });
   }
 
@@ -402,16 +385,9 @@ public class TestUnmanagedApplicationManager {
       final FinishApplicationMasterRequest request,
       ApplicationAttemptId appAttemptId)
       throws YarnException, IOException, InterruptedException {
-    return getUGIWithToken(appAttemptId)
-        .doAs(new PrivilegedExceptionAction<FinishApplicationMasterResponse>() 
{
-          @Override
-          public FinishApplicationMasterResponse run()
-              throws YarnException, IOException {
-            FinishApplicationMasterResponse response =
-                uam.finishApplicationMaster(request);
-            return response;
-          }
-        });
+    return getUGIWithToken(appAttemptId).doAs(
+        (PrivilegedExceptionAction<FinishApplicationMasterResponse>) () ->
+        uam.finishApplicationMaster(request));
   }
 
   protected class CountingCallback implements AsyncCallback<AllocateResponse> {
@@ -497,14 +473,10 @@ public class TestUnmanagedApplicationManager {
     @Override
     public void run() {
       try {
-        getUGIWithToken(attemptId)
-            .doAs(new PrivilegedExceptionAction<Object>() {
-              @Override
-              public Object run() {
-                TestableAMRequestHandlerThread.super.run();
-                return null;
-              }
-            });
+        getUGIWithToken(attemptId).doAs((PrivilegedExceptionAction<Object>) () 
-> {
+          TestableAMRequestHandlerThread.super.run();
+          return null;
+        });
       } catch (Exception e) {
         LOG.error("Exception running TestableAMRequestHandlerThread", e);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to