This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 254dbab5a323 YARN-9013. [BackPort] [GPG] fix order of steps cleaning 
Registry entries in ApplicationCleaner. (#6147) Contributed by Botong Huang, 
Shilun Fan.
254dbab5a323 is described below

commit 254dbab5a323b29897a4e90f3502dc32ccc30688
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Tue Oct 31 06:56:00 2023 +0800

    YARN-9013. [BackPort] [GPG] fix order of steps cleaning Registry entries in 
ApplicationCleaner. (#6147) Contributed by Botong Huang, Shilun Fan.
    
    Co-authored-by: Botong Huang <bot...@apache.org>
    Reviewed-by: Inigo Goiri <inigo...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../src/main/resources/yarn-default.xml            |  4 +--
 .../applicationcleaner/ApplicationCleaner.java     | 17 +++------
 .../DefaultApplicationCleaner.java                 | 41 +++++++++++++++-------
 .../TestDefaultApplicationCleaner.java             | 39 ++++++++++++++++++++
 4 files changed, 74 insertions(+), 27 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c2be5d420bfc..0e317712f826 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5132,10 +5132,10 @@
 
   <property>
     <name>yarn.router.interceptor.user-thread-pool.keep-alive-time</name>
-    <value>0s</value>
+    <value>30s</value>
     <description>
       This configurable is used to set the keepAliveTime of the thread pool of 
the interceptor.
-      Default is 0s.
+      Default is 30s.
     </description>
   </property>
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
index af0bd6184b79..76380af8c986 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
 
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.time.DurationFormatUtils;
@@ -95,6 +94,10 @@ public abstract class ApplicationCleaner implements Runnable 
{
     return this.gpgContext;
   }
 
+  public FederationRegistryClient getRegistryClient() {
+    return this.registryClient;
+  }
+
   /**
    * Query router for applications.
    *
@@ -152,18 +155,6 @@ public abstract class ApplicationCleaner implements 
Runnable {
         + " success Router queries after " + totalAttemptCount + " retries");
   }
 
-  protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
-    List<String> allApps = this.registryClient.getAllApplications();
-    LOG.info("Got {} existing apps in registry.", allApps.size());
-    for (String app : allApps) {
-      ApplicationId appId = ApplicationId.fromString(app);
-      if (!knownApps.contains(appId)) {
-        LOG.info("removing finished application entry for {}", app);
-        this.registryClient.removeAppFromRegistry(appId, true);
-      }
-    }
-  }
-
   @Override
   public abstract void run();
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
index 5b2ff26fcfb4..c3f79d0284c5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@@ -45,33 +46,49 @@ public class DefaultApplicationCleaner extends 
ApplicationCleaner {
     LOG.info("Application cleaner run at time {}", now);
 
     FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
-    Set<ApplicationId> candidates = new HashSet<>();
     try {
+      // Get the candidate list from StateStore before calling router
+      Set<ApplicationId> allStateStoreApps = new HashSet<>();
       List<ApplicationHomeSubCluster> response =
           facade.getApplicationsHomeSubCluster();
       for (ApplicationHomeSubCluster app : response) {
-        candidates.add(app.getApplicationId());
+        allStateStoreApps.add(app.getApplicationId());
       }
-      LOG.info("{} app entries in FederationStateStore", candidates.size());
+      LOG.info("{} app entries in FederationStateStore", 
allStateStoreApps.size());
 
+      // Get the candidate list from Registry before calling router
+      List<String> allRegistryApps = getRegistryClient().getAllApplications();
+      LOG.info("{} app entries in FederationRegistry", 
allStateStoreApps.size());
+
+      // Get the list of known apps from Router
       Set<ApplicationId> routerApps = getRouterKnownApplications();
       LOG.info("{} known applications from Router", routerApps.size());
 
-      candidates.removeAll(routerApps);
-      LOG.info("Deleting {} applications from statestore", candidates.size());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Apps to delete: {}.", 
candidates.stream().map(Object::toString)
-            .collect(Collectors.joining(",")));
-      }
-      for (ApplicationId appId : candidates) {
+      // Clean up StateStore entries
+      Set<ApplicationId> toDelete =
+          Sets.difference(allStateStoreApps, routerApps);
+
+      LOG.info("Deleting {} applications from statestore", toDelete.size());
+      LOG.debug("Apps to delete: {}.",
+          
toDelete.stream().map(Object::toString).collect(Collectors.joining(",")));
+
+      for (ApplicationId appId : toDelete) {
         try {
+          LOG.debug("Deleting {} from statestore ", appId);
           facade.deleteApplicationHomeSubCluster(appId);
         } catch (Exception e) {
           LOG.error("deleteApplicationHomeSubCluster failed at application 
{}.", appId, e);
         }
       }
-      // Clean up registry entries
-      cleanupAppRecordInRegistry(routerApps);
+
+      // Clean up Registry entries
+      for (String app : allRegistryApps) {
+        ApplicationId appId = ApplicationId.fromString(app);
+        if (!routerApps.contains(appId)) {
+          LOG.debug("removing finished application entry for {}", app);
+          getRegistryClient().removeAppFromRegistry(appId, true);
+        }
+      }
     } catch (Throwable e) {
       LOG.error("Application cleaner started at time {} fails. ", now, e);
     }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
index 1e703b51960e..c028bbdbe2c1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHome
 import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
 import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
@@ -63,6 +64,8 @@ public class TestDefaultApplicationCleaner {
   // The list of applications returned by mocked router
   private Set<ApplicationId> routerAppIds;
 
+  private ApplicationId appIdToAddConcurrently;
+
   @Before
   public void setup() throws Exception {
     conf = new YarnConfiguration();
@@ -111,6 +114,7 @@ public class TestDefaultApplicationCleaner {
           new Token<AMRMTokenIdentifier>());
     }
     Assert.assertEquals(3, registryClient.getAllApplications().size());
+    appIdToAddConcurrently = null;
   }
 
   @After
@@ -159,7 +163,42 @@ public class TestDefaultApplicationCleaner {
       extends DefaultApplicationCleaner {
     @Override
     public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+      if (appIdToAddConcurrently != null) {
+        SubClusterId scId = SubClusterId.newInstance("MySubClusterId");
+        try {
+          ApplicationHomeSubCluster appHomeSubCluster =
+              ApplicationHomeSubCluster.newInstance(appIdToAddConcurrently, 
scId);
+          AddApplicationHomeSubClusterRequest request =
+              
AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster);
+          stateStore.addApplicationHomeSubCluster(request);
+        } catch (YarnException e) {
+          throw new YarnRuntimeException(e);
+        }
+        registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently, 
scId.toString(),
+            new Token<>());
+      }
       return routerAppIds;
     }
   }
+
+  @Test
+  public void testConcurrentNewApp() throws YarnException {
+    appIdToAddConcurrently = ApplicationId.newInstance(1, 1);
+
+    appCleaner.run();
+
+    // The concurrently added app should be still there
+    GetApplicationsHomeSubClusterRequest appHomeSubClusterRequest =
+         GetApplicationsHomeSubClusterRequest.newInstance();
+    GetApplicationsHomeSubClusterResponse applicationsHomeSubCluster =
+        stateStore.getApplicationsHomeSubCluster(appHomeSubClusterRequest);
+    Assert.assertNotNull(applicationsHomeSubCluster);
+    List<ApplicationHomeSubCluster> appsHomeSubClusters =
+        applicationsHomeSubCluster.getAppsHomeSubClusters();
+    Assert.assertNotNull(appsHomeSubClusters);
+    Assert.assertEquals(1, appsHomeSubClusters.size());
+
+    // The concurrently added app should be still there
+    Assert.assertEquals(1, registryClient.getAllApplications().size());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to