YARN-7599. [GPG] ApplicationCleaner in Global Policy Generator. Contributed by Botong Huang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1facc8a2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1facc8a2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1facc8a2 Branch: refs/heads/YARN-7402 Commit: 1facc8a28416efb7424cec101c80e60ac76f25ac Parents: 1e8686b Author: Botong Huang <bot...@apache.org> Authored: Fri Sep 21 17:30:44 2018 -0700 Committer: Botong Huang <bot...@apache.org> Committed: Mon Nov 12 15:09:38 2018 -0800 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 25 +++ .../src/main/resources/yarn-default.xml | 28 ++++ .../store/impl/MemoryFederationStateStore.java | 2 - .../pb/ApplicationHomeSubClusterPBImpl.java | 3 + .../utils/FederationStateStoreFacade.java | 33 ++++ .../server/globalpolicygenerator/GPGUtils.java | 21 ++- .../GlobalPolicyGenerator.java | 23 ++- .../applicationcleaner/ApplicationCleaner.java | 154 +++++++++++++++++++ .../DefaultApplicationCleaner.java | 82 ++++++++++ .../applicationcleaner/package-info.java | 19 +++ .../TestDefaultApplicationCleaner.java | 130 ++++++++++++++++ 11 files changed, 513 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c42ebb1..aa990d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3445,6 +3445,31 @@ public class YarnConfiguration extends Configuration { FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000; + // The application cleaner class to use + public static final String GPG_APPCLEANER_CLASS = + FEDERATION_GPG_PREFIX + "application.cleaner.class"; + public static final String DEFAULT_GPG_APPCLEANER_CLASS = + "org.apache.hadoop.yarn.server.globalpolicygenerator" + + ".applicationcleaner.DefaultApplicationCleaner"; + + // The interval at which the application cleaner runs, -1 means disabled + public static final String GPG_APPCLEANER_INTERVAL_MS = + FEDERATION_GPG_PREFIX + "application.cleaner.interval-ms"; + public static final long DEFAULT_GPG_APPCLEANER_INTERVAL_MS = -1; + + /** + * Specifications on how (many times) to contact Router for apps. We need to + * do this because Router might return partial application list because some + * sub-cluster RM is not responsive (e.g. failing over). + * + * Should have three values separated by comma: minimal success retries, + * maximum total retry, retry interval (ms). + */ + public static final String GPG_APPCLEANER_CONTACT_ROUTER_SPEC = + FEDERATION_GPG_PREFIX + "application.cleaner.contact.router.spec"; + public static final String DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC = + "3,10,600000"; + public static final String FEDERATION_GPG_POLICY_PREFIX = FEDERATION_GPG_PREFIX + "policy.generator."; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index a08ff96..e496e28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3671,6 +3671,34 @@ <property> <description> + The Application Cleaner implementation class for GPG to use. + </description> + <name>yarn.federation.gpg.application.cleaner.class</name> + <value>org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.DefaultApplicationCleaner</value> + </property> + + <property> + <description> + The interval at which the application cleaner runs, -1 means disabled. + </description> + <name>yarn.federation.gpg.application.cleaner.interval-ms</name> + <value>-1</value> + </property> + + <property> + <description> + Specifications on how (many times) to contact Router for apps. We need to + do this because Router might return partial application list because some + sub-cluster RM is not responsive (e.g. failing over). + Should have three values separated by comma: minimal success retries, + maximum total retry, retry interval (ms). + </description> + <name>yarn.federation.gpg.application.cleaner.contact.router.spec</name> + <value>3,10,600000</value> + </property> + + <property> + <description> The interval at which the policy generator runs, default is one hour </description> <name>yarn.federation.gpg.policy.generator.interval-ms</name> http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index b42fc79..f7cdcd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -259,8 +259,6 @@ public class MemoryFederationStateStore implements FederationStateStore { result .add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue())); } - - GetApplicationsHomeSubClusterResponse.newInstance(result); return GetApplicationsHomeSubClusterResponse.newInstance(result); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java index 7e6a564..6bd80fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java @@ -110,6 +110,9 @@ public class ApplicationHomeSubClusterPBImpl extends ApplicationHomeSubCluster { @Override public ApplicationId getApplicationId() { ApplicationHomeSubClusterProtoOrBuilder p = viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } if (!p.hasApplicationId()) { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index d10e568..df5f50c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -52,8 +52,11 @@ import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateS import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; @@ -433,6 +436,36 @@ public final class FederationStateStoreFacade { } /** + * Get the {@code ApplicationHomeSubCluster} list representing the mapping of + * all submitted applications to it's home sub-cluster. + * + * @return the mapping of all submitted application to it's home sub-cluster + * @throws YarnException if the request is invalid/fails + */ + public List<ApplicationHomeSubCluster> getApplicationsHomeSubCluster() + throws YarnException { + GetApplicationsHomeSubClusterResponse response = + stateStore.getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest.newInstance()); + return response.getAppsHomeSubClusters(); + } + + /** + * Delete the mapping of home {@code SubClusterId} of a previously submitted + * {@code ApplicationId}. Currently response is empty if the operation is + * successful, if not an exception reporting reason for a failure. + * + * @param applicationId the application to delete the home sub-cluster of + * @throws YarnException if the request is invalid/fails + */ + public void deleteApplicationHomeSubCluster(ApplicationId applicationId) + throws YarnException { + stateStore.deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest.newInstance(applicationId)); + return; + } + + /** * Get the singleton instance of SubClusterResolver. * * @return SubClusterResolver instance http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java index 31cee1c..615cf3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; @@ -49,15 +50,19 @@ public final class GPGUtils { * Performs an invocation of the the remote RMWebService. */ public static <T> T invokeRMWebService(Configuration conf, String webAddr, - String path, final Class<T> returnType) { + String path, final Class<T> returnType, String deSelectParam) { Client client = Client.create(); T obj = null; - WebResource webResource = client.resource(webAddr); + WebResource webResource = + client.resource(webAddr).path("ws/v1/cluster").path(path); + if (deSelectParam != null) { + webResource = webResource.queryParam(RMWSConsts.DESELECTS, deSelectParam); + } ClientResponse response = null; try { - response = webResource.path("ws/v1/cluster").path(path) - .accept(MediaType.APPLICATION_XML).get(ClientResponse.class); + response = webResource.accept(MediaType.APPLICATION_XML) + .get(ClientResponse.class); if (response.getStatus() == SC_OK) { obj = response.getEntity(returnType); } else { @@ -74,6 +79,14 @@ public final class GPGUtils { } /** + * Performs an invocation of the the remote RMWebService. + */ + public static <T> T invokeRMWebService(Configuration conf, String webAddr, + String path, final Class<T> returnType) { + return invokeRMWebService(conf, webAddr, path, returnType, null); + } + + /** * Creates a uniform weighting of 1.0 for each sub cluster. */ public static Map<SubClusterIdInfo, Float> createUniformWeights( http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index 1ae07f3..c8ec4cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -31,6 +31,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner; import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator; import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; import org.slf4j.Logger; @@ -63,6 +64,7 @@ public class GlobalPolicyGenerator extends CompositeService { // Scheduler service that runs tasks periodically private ScheduledThreadPoolExecutor scheduledExecutorService; private SubClusterCleaner subClusterCleaner; + private ApplicationCleaner applicationCleaner; private PolicyGenerator policyGenerator; public GlobalPolicyGenerator() { @@ -82,7 +84,15 @@ public class GlobalPolicyGenerator extends CompositeService { this.scheduledExecutorService = new ScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); + this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext); + + this.applicationCleaner = FederationStateStoreFacade.createInstance(conf, + YarnConfiguration.GPG_APPCLEANER_CLASS, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS, + ApplicationCleaner.class); + this.applicationCleaner.init(conf, this.gpgContext); + this.policyGenerator = new PolicyGenerator(conf, this.gpgContext); DefaultMetricsSystem.initialize(METRICS_NAME); @@ -95,7 +105,7 @@ public class GlobalPolicyGenerator extends CompositeService { protected void serviceStart() throws Exception { super.serviceStart(); - // Scheduler SubClusterCleaner service + // Schedule SubClusterCleaner service long scCleanerIntervalMs = getConfig().getLong( YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS); @@ -106,6 +116,17 @@ public class GlobalPolicyGenerator extends CompositeService { DurationFormatUtils.formatDurationISO(scCleanerIntervalMs)); } + // Schedule ApplicationCleaner service + long appCleanerIntervalMs = + getConfig().getLong(YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS); + if (appCleanerIntervalMs > 0) { + this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner, + 0, appCleanerIntervalMs, TimeUnit.MILLISECONDS); + LOG.info("Scheduled application cleaner with interval: {}", + DurationFormatUtils.formatDurationISO(appCleanerIntervalMs)); + } + // Schedule PolicyGenerator long policyGeneratorIntervalMillis = getConfig().getLong( YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS, http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java new file mode 100644 index 0000000..85047ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The ApplicationCleaner is a runnable that cleans up old applications from + * table applicationsHomeSubCluster in FederationStateStore. + */ +public abstract class ApplicationCleaner implements Runnable { + private static final Logger LOG = + LoggerFactory.getLogger(ApplicationCleaner.class); + + private Configuration conf; + private GPGContext gpgContext; + + private int minRouterSuccessCount; + private int maxRouterRetry; + private long routerQueryIntevalMillis; + + public void init(Configuration config, GPGContext context) + throws YarnException { + + this.gpgContext = context; + this.conf = config; + + String routerSpecString = + this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC); + String[] specs = routerSpecString.split(","); + if (specs.length != 3) { + throw new YarnException("Expect three comma separated values in " + + YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC + " but get " + + routerSpecString); + } + this.minRouterSuccessCount = Integer.parseInt(specs[0]); + this.maxRouterRetry = Integer.parseInt(specs[1]); + this.routerQueryIntevalMillis = Long.parseLong(specs[2]); + + if (this.minRouterSuccessCount > this.maxRouterRetry) { + throw new YarnException("minRouterSuccessCount " + + this.minRouterSuccessCount + + " should not be larger than maxRouterRetry" + this.maxRouterRetry); + } + if (this.minRouterSuccessCount <= 0) { + throw new YarnException("minRouterSuccessCount " + + this.minRouterSuccessCount + " should be positive"); + } + + LOG.info( + "Initialized AppCleaner with Router query with min success {}, " + + "max retry {}, retry interval {}", + this.minRouterSuccessCount, this.maxRouterRetry, + DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis)); + } + + public GPGContext getGPGContext() { + return this.gpgContext; + } + + /** + * Query router for applications. + * + * @return the set of applications + * @throws YarnRuntimeException when router call fails + */ + public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException { + String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf); + + LOG.info(String.format("Contacting router at: %s", webAppAddress)); + AppsInfo appsInfo = (AppsInfo) GPGUtils.invokeRMWebService(conf, + webAppAddress, "apps", AppsInfo.class, + DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString()); + + Set<ApplicationId> appSet = new HashSet<ApplicationId>(); + for (AppInfo appInfo : appsInfo.getApps()) { + appSet.add(ApplicationId.fromString(appInfo.getAppId())); + } + return appSet; + } + + /** + * Get the list of known applications in the cluster from Router. + * + * @return the list of known applications + * @throws YarnException if get app fails + */ + public Set<ApplicationId> getRouterKnownApplications() throws YarnException { + int successCount = 0, totalAttemptCount = 0; + Set<ApplicationId> resultSet = new HashSet<ApplicationId>(); + while (totalAttemptCount < this.maxRouterRetry) { + try { + Set<ApplicationId> routerApps = getAppsFromRouter(); + resultSet.addAll(routerApps); + LOG.info("Attempt {}: {} known apps from Router, {} in total", + totalAttemptCount, routerApps.size(), resultSet.size()); + + successCount++; + if (successCount >= this.minRouterSuccessCount) { + return resultSet; + } + + // Wait for the next attempt + try { + Thread.sleep(this.routerQueryIntevalMillis); + } catch (InterruptedException e) { + LOG.warn("Sleep interrupted after attempt " + totalAttemptCount); + } + } catch (Exception e) { + LOG.warn("Router query attempt " + totalAttemptCount + " failed ", e); + } finally { + totalAttemptCount++; + } + } + throw new YarnException("Only " + successCount + + " success Router queries after " + totalAttemptCount + " retries"); + } + + @Override + public abstract void run(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java new file mode 100644 index 0000000..1ce9840 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; + +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; + +/** + * The default ApplicationCleaner that cleans up old applications from table + * applicationsHomeSubCluster in FederationStateStore. + */ +public class DefaultApplicationCleaner extends ApplicationCleaner { + private static final Logger LOG = + LoggerFactory.getLogger(DefaultApplicationCleaner.class); + + @Override + public void run() { + Date now = new Date(); + LOG.info("Application cleaner run at time {}", now); + + FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade(); + Set<ApplicationId> candidates = new HashSet<ApplicationId>(); + try { + List<ApplicationHomeSubCluster> response = + facade.getApplicationsHomeSubCluster(); + for (ApplicationHomeSubCluster app : response) { + candidates.add(app.getApplicationId()); + } + LOG.info("{} app entries in FederationStateStore", candidates.size()); + + Set<ApplicationId> routerApps = getRouterKnownApplications(); + LOG.info("{} known applications from Router", routerApps.size()); + + candidates = Sets.difference(candidates, routerApps); + LOG.info("Deleting {} applications from statestore", candidates.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("Apps to delete: ", candidates.stream().map(Object::toString) + .collect(Collectors.joining(","))); + } + for (ApplicationId appId : candidates) { + try { + facade.deleteApplicationHomeSubCluster(appId); + } catch (Exception e) { + LOG.error( + "deleteApplicationHomeSubCluster failed at application " + appId, + e); + } + } + + } catch (Throwable e) { + LOG.error("Application cleaner started at time " + now + " fails: ", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java new file mode 100644 index 0000000..dd302c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java new file mode 100644 index 0000000..ec3f64e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for DefaultApplicationCleaner in GPG. + */ +public class TestDefaultApplicationCleaner { + private Configuration conf; + private MemoryFederationStateStore stateStore; + private FederationStateStoreFacade facade; + private ApplicationCleaner appCleaner; + private GPGContext gpgContext; + + private List<ApplicationId> appIds; + // The list of applications returned by mocked router + private Set<ApplicationId> routerAppIds; + + @Before + public void setup() throws Exception { + conf = new YarnConfiguration(); + + // No Router query retry + conf.set(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, "1,1,0"); + + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + + facade = FederationStateStoreFacade.getInstance(); + facade.reinitialize(stateStore, conf); + + gpgContext = new GPGContextImpl(); + gpgContext.setStateStoreFacade(facade); + + appCleaner = new TestableDefaultApplicationCleaner(); + appCleaner.init(conf, gpgContext); + + routerAppIds = new HashSet<ApplicationId>(); + + appIds = new ArrayList<ApplicationId>(); + for (int i = 0; i < 3; i++) { + ApplicationId appId = ApplicationId.newInstance(0, i); + appIds.add(appId); + + SubClusterId subClusterId = + SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i)); + + stateStore.addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest.newInstance( + ApplicationHomeSubCluster.newInstance(appId, subClusterId))); + } + } + + @After + public void breakDown() throws Exception { + if (stateStore != null) { + stateStore.close(); + } + } + + @Test + public void testFederationStateStoreAppsCleanUp() throws YarnException { + // Set first app to be still known by Router + ApplicationId appId = appIds.get(0); + routerAppIds.add(appId); + + // Another random app not in stateStore known by Router + appId = ApplicationId.newInstance(100, 200); + routerAppIds.add(appId); + + appCleaner.run(); + + // Only one app should be left + Assert.assertEquals(1, + stateStore + .getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest.newInstance()) + .getAppsHomeSubClusters().size()); + } + + /** + * Testable version of DefaultApplicationCleaner. + */ + public class TestableDefaultApplicationCleaner + extends DefaultApplicationCleaner { + @Override + public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException { + return routerAppIds; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org