This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 254dbab5a323 YARN-9013. [BackPort] [GPG] fix order of steps cleaning Registry entries in ApplicationCleaner. (#6147) Contributed by Botong Huang, Shilun Fan. 254dbab5a323 is described below commit 254dbab5a323b29897a4e90f3502dc32ccc30688 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Tue Oct 31 06:56:00 2023 +0800 YARN-9013. [BackPort] [GPG] fix order of steps cleaning Registry entries in ApplicationCleaner. (#6147) Contributed by Botong Huang, Shilun Fan. Co-authored-by: Botong Huang <bot...@apache.org> Reviewed-by: Inigo Goiri <inigo...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../src/main/resources/yarn-default.xml | 4 +-- .../applicationcleaner/ApplicationCleaner.java | 17 +++------ .../DefaultApplicationCleaner.java | 41 +++++++++++++++------- .../TestDefaultApplicationCleaner.java | 39 ++++++++++++++++++++ 4 files changed, 74 insertions(+), 27 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c2be5d420bfc..0e317712f826 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5132,10 +5132,10 @@ <property> <name>yarn.router.interceptor.user-thread-pool.keep-alive-time</name> - <value>0s</value> + <value>30s</value> <description> This configurable is used to set the keepAliveTime of the thread pool of the interceptor. - Default is 0s. + Default is 30s. </description> </property> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java index af0bd6184b79..76380af8c986 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.commons.lang3.time.DurationFormatUtils; @@ -95,6 +94,10 @@ public abstract class ApplicationCleaner implements Runnable { return this.gpgContext; } + public FederationRegistryClient getRegistryClient() { + return this.registryClient; + } + /** * Query router for applications. * @@ -152,18 +155,6 @@ public abstract class ApplicationCleaner implements Runnable { + " success Router queries after " + totalAttemptCount + " retries"); } - protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) { - List<String> allApps = this.registryClient.getAllApplications(); - LOG.info("Got {} existing apps in registry.", allApps.size()); - for (String app : allApps) { - ApplicationId appId = ApplicationId.fromString(app); - if (!knownApps.contains(appId)) { - LOG.info("removing finished application entry for {}", app); - this.registryClient.removeAppFromRegistry(appId, true); - } - } - } - @Override public abstract void run(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java index 5b2ff26fcfb4..c3f79d0284c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.util.Sets; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -45,33 +46,49 @@ public class DefaultApplicationCleaner extends ApplicationCleaner { LOG.info("Application cleaner run at time {}", now); FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade(); - Set<ApplicationId> candidates = new HashSet<>(); try { + // Get the candidate list from StateStore before calling router + Set<ApplicationId> allStateStoreApps = new HashSet<>(); List<ApplicationHomeSubCluster> response = facade.getApplicationsHomeSubCluster(); for (ApplicationHomeSubCluster app : response) { - candidates.add(app.getApplicationId()); + allStateStoreApps.add(app.getApplicationId()); } - LOG.info("{} app entries in FederationStateStore", candidates.size()); + LOG.info("{} app entries in FederationStateStore", allStateStoreApps.size()); + // Get the candidate list from Registry before calling router + List<String> allRegistryApps = getRegistryClient().getAllApplications(); + LOG.info("{} app entries in FederationRegistry", allStateStoreApps.size()); + + // Get the list of known apps from Router Set<ApplicationId> routerApps = getRouterKnownApplications(); LOG.info("{} known applications from Router", routerApps.size()); - candidates.removeAll(routerApps); - LOG.info("Deleting {} applications from statestore", candidates.size()); - if (LOG.isDebugEnabled()) { - LOG.debug("Apps to delete: {}.", candidates.stream().map(Object::toString) - .collect(Collectors.joining(","))); - } - for (ApplicationId appId : candidates) { + // Clean up StateStore entries + Set<ApplicationId> toDelete = + Sets.difference(allStateStoreApps, routerApps); + + LOG.info("Deleting {} applications from statestore", toDelete.size()); + LOG.debug("Apps to delete: {}.", + toDelete.stream().map(Object::toString).collect(Collectors.joining(","))); + + for (ApplicationId appId : toDelete) { try { + LOG.debug("Deleting {} from statestore ", appId); facade.deleteApplicationHomeSubCluster(appId); } catch (Exception e) { LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e); } } - // Clean up registry entries - cleanupAppRecordInRegistry(routerApps); + + // Clean up Registry entries + for (String app : allRegistryApps) { + ApplicationId appId = ApplicationId.fromString(app); + if (!routerApps.contains(appId)) { + LOG.debug("removing finished application entry for {}", app); + getRegistryClient().removeAppFromRegistry(appId, true); + } + } } catch (Throwable e) { LOG.error("Application cleaner started at time {} fails. ", now, e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java index 1e703b51960e..c028bbdbe2c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHome import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; @@ -63,6 +64,8 @@ public class TestDefaultApplicationCleaner { // The list of applications returned by mocked router private Set<ApplicationId> routerAppIds; + private ApplicationId appIdToAddConcurrently; + @Before public void setup() throws Exception { conf = new YarnConfiguration(); @@ -111,6 +114,7 @@ public class TestDefaultApplicationCleaner { new Token<AMRMTokenIdentifier>()); } Assert.assertEquals(3, registryClient.getAllApplications().size()); + appIdToAddConcurrently = null; } @After @@ -159,7 +163,42 @@ public class TestDefaultApplicationCleaner { extends DefaultApplicationCleaner { @Override public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException { + if (appIdToAddConcurrently != null) { + SubClusterId scId = SubClusterId.newInstance("MySubClusterId"); + try { + ApplicationHomeSubCluster appHomeSubCluster = + ApplicationHomeSubCluster.newInstance(appIdToAddConcurrently, scId); + AddApplicationHomeSubClusterRequest request = + AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster); + stateStore.addApplicationHomeSubCluster(request); + } catch (YarnException e) { + throw new YarnRuntimeException(e); + } + registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently, scId.toString(), + new Token<>()); + } return routerAppIds; } } + + @Test + public void testConcurrentNewApp() throws YarnException { + appIdToAddConcurrently = ApplicationId.newInstance(1, 1); + + appCleaner.run(); + + // The concurrently added app should be still there + GetApplicationsHomeSubClusterRequest appHomeSubClusterRequest = + GetApplicationsHomeSubClusterRequest.newInstance(); + GetApplicationsHomeSubClusterResponse applicationsHomeSubCluster = + stateStore.getApplicationsHomeSubCluster(appHomeSubClusterRequest); + Assert.assertNotNull(applicationsHomeSubCluster); + List<ApplicationHomeSubCluster> appsHomeSubClusters = + applicationsHomeSubCluster.getAppsHomeSubClusters(); + Assert.assertNotNull(appsHomeSubClusters); + Assert.assertEquals(1, appsHomeSubClusters.size()); + + // The concurrently added app should be still there + Assert.assertEquals(1, registryClient.getAllApplications().size()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org