This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 1d2afc5cf6f8 YARN-8862. [BackPort] [GPG] Add Yarn Registry cleanup in ApplicationCleaner. (#6083) Contributed by Shilun Fan. 1d2afc5cf6f8 is described below commit 1d2afc5cf6f816461b76ec2bdbab8209052cd129 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Fri Sep 29 07:15:53 2023 +0800 YARN-8862. [BackPort] [GPG] Add Yarn Registry cleanup in ApplicationCleaner. (#6083) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri <inigo...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../federation/utils/FederationRegistryClient.java | 60 +++++++++++++--------- .../utils/TestFederationRegistryClient.java | 27 ++++++++++ .../server/globalpolicygenerator/GPGContext.java | 5 ++ .../globalpolicygenerator/GPGContextImpl.java | 12 +++++ .../GlobalPolicyGenerator.java | 21 ++++++++ .../applicationcleaner/ApplicationCleaner.java | 30 ++++++++--- .../DefaultApplicationCleaner.java | 2 + .../TestDefaultApplicationCleaner.java | 34 ++++++++++++ 8 files changed, 159 insertions(+), 32 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java index fa64188a608b..9e4d1e6ed0e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.collections.MapUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.registry.client.api.BindFlags; @@ -142,9 +143,7 @@ public class FederationRegistryClient { // Then update the subClusterTokenMap subClusterTokenMap.put(subClusterId, token); } catch (YarnException | IOException e) { - LOG.error( - "Failed writing AMRMToken to registry for subcluster " + subClusterId, - e); + LOG.error("Failed writing AMRMToken to registry for subcluster {}.", subClusterId, e); } return update; } @@ -189,8 +188,7 @@ public class FederationRegistryClient { retMap.put(scId, amrmToken); } catch (Exception e) { - LOG.error("Failed reading registry key " + key - + ", skipping subcluster " + scId, e); + LOG.error("Failed reading registry key {}, skipping subcluster {}.", key, scId, e); } } @@ -202,24 +200,39 @@ public class FederationRegistryClient { /** * Remove an application from registry. * - * @param appId application id + * @param appId application id. */ public synchronized void removeAppFromRegistry(ApplicationId appId) { + removeAppFromRegistry(appId, false); + } + + /** + * Remove an application from registry. + * + * @param appId application id + * @param ignoreMemoryState whether to ignore the memory data in terms of + * known application + */ + public synchronized void removeAppFromRegistry(ApplicationId appId, + boolean ignoreMemoryState) { Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap = this.appSubClusterTokenMap.get(appId); - LOG.info("Removing all registry entries for {}", appId); - - if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) { - return; + if (!ignoreMemoryState) { + if (MapUtils.isEmpty(subClusterTokenMap)) { + return; + } } + LOG.info("Removing all registry entries for {}.", appId); // Lastly remove the application directory String key = getRegistryKey(appId, null); try { removeKeyRegistry(this.registry, this.user, key, true, true); - subClusterTokenMap.clear(); + if (subClusterTokenMap != null) { + subClusterTokenMap.clear(); + } } catch (YarnException e) { - LOG.error("Failed removing registry directory key " + key, e); + LOG.error("Failed removing registry directory key {}.", key, e); } } @@ -247,7 +260,7 @@ public class FederationRegistryClient { } } catch (Throwable e) { if (throwIfFails) { - LOG.error("Registry resolve key " + key + " failed", e); + LOG.error("Registry resolve key {} failed.", key, e); } } return null; @@ -271,7 +284,7 @@ public class FederationRegistryClient { return true; } catch (Throwable e) { if (throwIfFails) { - LOG.error("Registry remove key " + key + " failed", e); + LOG.error("Registry remove key {} failed.", key, e); } } return false; @@ -300,7 +313,7 @@ public class FederationRegistryClient { return true; } catch (Throwable e) { if (throwIfFails) { - LOG.error("Registry write key " + key + " failed", e); + LOG.error("Registry write key {} failed.", key, e); } } return false; @@ -317,18 +330,15 @@ public class FederationRegistryClient { private List<String> listDirRegistry(final RegistryOperations registryImpl, UserGroupInformation ugi, final String key, final boolean throwIfFails) throws YarnException { - List<String> result = ugi.doAs(new PrivilegedAction<List<String>>() { - @Override - public List<String> run() { - try { - return registryImpl.list(key); - } catch (Throwable e) { - if (throwIfFails) { - LOG.error("Registry list key " + key + " failed", e); - } + List<String> result = ugi.doAs((PrivilegedAction<List<String>>) () -> { + try { + return registryImpl.list(key); + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry list key {} failed.", key, e); } - return null; } + return null; }); if (result == null && throwIfFails) { throw new YarnException("Registry list key " + key + " failed"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java index 42be851512af..cccfbb4613c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java @@ -87,4 +87,31 @@ public class TestFederationRegistryClient { this.registryClient.loadStateFromRegistry(appId).size()); } + @Test + public void testRemoveWithMemoryState() { + ApplicationId appId1 = ApplicationId.newInstance(0, 0); + ApplicationId appId2 = ApplicationId.newInstance(0, 1); + String scId0 = "subcluster0"; + + this.registryClient.writeAMRMTokenForUAM(appId1, scId0, new Token<>()); + this.registryClient.writeAMRMTokenForUAM(appId2, scId0, new Token<>()); + Assert.assertEquals(2, this.registryClient.getAllApplications().size()); + + // Create a new client instance + this.registryClient = + new FederationRegistryClient(this.conf, this.registry, this.user); + + this.registryClient.loadStateFromRegistry(appId2); + // Should remove app2 + this.registryClient.removeAppFromRegistry(appId2, false); + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + + // Should not remove app1 since memory state don't have it + this.registryClient.removeAppFromRegistry(appId1, false); + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + + // Should remove app1 + this.registryClient.removeAppFromRegistry(appId1, true); + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java index 6b0a5a43112b..e54244d7133d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; /** @@ -32,4 +33,8 @@ public interface GPGContext { GPGPolicyFacade getPolicyFacade(); void setPolicyFacade(GPGPolicyFacade facade); + + FederationRegistryClient getRegistryClient(); + + void setRegistryClient(FederationRegistryClient client); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java index bb498448fae8..b14f50299018 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; /** @@ -27,6 +28,7 @@ public class GPGContextImpl implements GPGContext { private FederationStateStoreFacade facade; private GPGPolicyFacade policyFacade; + private FederationRegistryClient registryClient; @Override public FederationStateStoreFacade getStateStoreFacade() { @@ -48,4 +50,14 @@ public class GPGContextImpl implements GPGContext { public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){ policyFacade = gpgPolicyfacade; } + + @Override + public FederationRegistryClient getRegistryClient() { + return registryClient; + } + + @Override + public void setRegistryClient(FederationRegistryClient client) { + registryClient = client; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index ba8ce856cdaa..7ea2f5f27277 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -37,6 +37,7 @@ import org.apache.hadoop.security.AuthenticationFilterInitializer; import org.apache.hadoop.security.HttpCrossOriginFilterInitializer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.JvmPauseMonitor; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner; import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator; import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; @@ -81,6 +83,7 @@ public class GlobalPolicyGenerator extends CompositeService { // Federation Variables private GPGContext gpgContext; + private RegistryOperations registry; // Scheduler service that runs tasks periodically private ScheduledThreadPoolExecutor scheduledExecutorService; @@ -123,6 +126,17 @@ public class GlobalPolicyGenerator extends CompositeService { new GPGPolicyFacade(this.gpgContext.getStateStoreFacade(), conf); this.gpgContext.setPolicyFacade(gpgPolicyFacade); + this.registry = FederationStateStoreFacade.createInstance(conf, + YarnConfiguration.YARN_REGISTRY_CLASS, + YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS, + RegistryOperations.class); + this.registry.init(conf); + + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + FederationRegistryClient registryClient = + new FederationRegistryClient(conf, this.registry, user); + this.gpgContext.setRegistryClient(registryClient); + this.scheduledExecutorService = new ScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); @@ -157,6 +171,8 @@ public class GlobalPolicyGenerator extends CompositeService { super.serviceStart(); + this.registry.start(); + // Schedule SubClusterCleaner service Configuration config = getConfig(); long scCleanerIntervalMs = config.getTimeDuration( @@ -214,6 +230,11 @@ public class GlobalPolicyGenerator extends CompositeService { @Override protected void serviceStop() throws Exception { + if (this.registry != null) { + this.registry.stop(); + this.registry = null; + } + try { if (this.scheduledExecutorService != null && !this.scheduledExecutorService.isShutdown()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java index cd3f7618558e..af0bd6184b79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.commons.lang3.time.DurationFormatUtils; @@ -27,9 +28,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -46,6 +49,7 @@ public abstract class ApplicationCleaner implements Runnable { private Configuration conf; private GPGContext gpgContext; + private FederationRegistryClient registryClient; private int minRouterSuccessCount; private int maxRouterRetry; @@ -56,6 +60,7 @@ public abstract class ApplicationCleaner implements Runnable { this.gpgContext = context; this.conf = config; + this.registryClient = context.getRegistryClient(); String routerSpecString = this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, @@ -80,10 +85,9 @@ public abstract class ApplicationCleaner implements Runnable { + this.minRouterSuccessCount + " should be positive"); } - LOG.info( - "Initialized AppCleaner with Router query with min success {}, " - + "max retry {}, retry interval {}", - this.minRouterSuccessCount, this.maxRouterRetry, + LOG.info("Initialized AppCleaner with Router query with min success {}, " + + "max retry {}, retry interval {}.", this.minRouterSuccessCount, + this.maxRouterRetry, DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis)); } @@ -100,9 +104,9 @@ public abstract class ApplicationCleaner implements Runnable { public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException { String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf); - LOG.info(String.format("Contacting router at: %s", webAppAddress)); - AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", AppsInfo.class, conf, - DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString()); + LOG.info("Contacting router at: {}.", webAppAddress); + AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, RMWSConsts.APPS, + AppsInfo.class, conf, DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString()); Set<ApplicationId> appSet = new HashSet<>(); for (AppInfo appInfo : appsInfo.getApps()) { @@ -148,6 +152,18 @@ public abstract class ApplicationCleaner implements Runnable { + " success Router queries after " + totalAttemptCount + " retries"); } + protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) { + List<String> allApps = this.registryClient.getAllApplications(); + LOG.info("Got {} existing apps in registry.", allApps.size()); + for (String app : allApps) { + ApplicationId appId = ApplicationId.fromString(app); + if (!knownApps.contains(appId)) { + LOG.info("removing finished application entry for {}", app); + this.registryClient.removeAppFromRegistry(appId, true); + } + } + } + @Override public abstract void run(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java index 857d2e645d4c..5b2ff26fcfb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java @@ -70,6 +70,8 @@ public class DefaultApplicationCleaner extends ApplicationCleaner { LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e); } } + // Clean up registry entries + cleanupAppRecordInRegistry(routerApps); } catch (Throwable e) { LOG.error("Application cleaner started at time {} fails. ", now, e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java index 2d63c48236fb..1e703b51960e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java @@ -24,15 +24,21 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; @@ -50,6 +56,8 @@ public class TestDefaultApplicationCleaner { private FederationStateStoreFacade facade; private ApplicationCleaner appCleaner; private GPGContext gpgContext; + private RegistryOperations registry; + private FederationRegistryClient registryClient; private List<ApplicationId> appIds; // The list of applications returned by mocked router @@ -68,8 +76,18 @@ public class TestDefaultApplicationCleaner { facade = FederationStateStoreFacade.getInstance(); facade.reinitialize(stateStore, conf); + registry = new FSRegistryOperationsService(); + registry.init(conf); + registry.start(); + + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + registryClient = new FederationRegistryClient(conf, registry, user); + registryClient.cleanAllApplications(); + Assert.assertEquals(0, registryClient.getAllApplications().size()); + gpgContext = new GPGContextImpl(); gpgContext.setStateStoreFacade(facade); + gpgContext.setRegistryClient(registryClient); appCleaner = new TestableDefaultApplicationCleaner(); appCleaner.init(conf, gpgContext); @@ -87,7 +105,12 @@ public class TestDefaultApplicationCleaner { stateStore.addApplicationHomeSubCluster( AddApplicationHomeSubClusterRequest.newInstance( ApplicationHomeSubCluster.newInstance(appId, subClusterId))); + + // Write some registry entries for the app + registryClient.writeAMRMTokenForUAM(appId, subClusterId.toString(), + new Token<AMRMTokenIdentifier>()); } + Assert.assertEquals(3, registryClient.getAllApplications().size()); } @After @@ -96,6 +119,14 @@ public class TestDefaultApplicationCleaner { stateStore.close(); stateStore = null; } + if (registryClient != null) { + registryClient.cleanAllApplications(); + registryClient = null; + } + if (registry != null) { + registry.stop(); + registry = null; + } } @Test @@ -116,6 +147,9 @@ public class TestDefaultApplicationCleaner { .getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest.newInstance()) .getAppsHomeSubClusters().size()); + + // The known app should not be cleaned in registry + Assert.assertEquals(1, registryClient.getAllApplications().size()); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org