YARN-6648. [GPG] Add SubClusterCleaner in Global Policy Generator. (botong)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/33c2916d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/33c2916d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/33c2916d Branch: refs/heads/YARN-7402 Commit: 33c2916d6360e5abf4efaf924612c4daa1061aff Parents: 6bea5ee Author: Botong Huang <bot...@apache.org> Authored: Thu Feb 1 14:43:48 2018 -0800 Committer: Botong Huang <bot...@apache.org> Committed: Wed Sep 19 13:47:32 2018 -0700 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 5 + .../hadoop/yarn/conf/YarnConfiguration.java | 18 +++ .../src/main/resources/yarn-default.xml | 24 ++++ .../store/impl/MemoryFederationStateStore.java | 13 ++ .../utils/FederationStateStoreFacade.java | 41 ++++++- .../GlobalPolicyGenerator.java | 92 ++++++++++----- .../subclustercleaner/SubClusterCleaner.java | 109 +++++++++++++++++ .../subclustercleaner/package-info.java | 19 +++ .../TestSubClusterCleaner.java | 118 +++++++++++++++++++ 9 files changed, 409 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 216c3bd..9fcafad 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -387,6 +387,11 @@ <Method name="initAndStartNodeManager" /> <Bug pattern="DM_EXIT" /> </Match> + <Match> + <Class name="org.apache.hadoop.yarn.server.globalpolicygenerator.GlobalPolicyGenerator" /> + <Medhod name="startGPG" /> + <Bug pattern="DM_EXIT" /> + </Match> <!-- Ignore heartbeat exception when killing localizer --> <Match> http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a82801d..5c7bf26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3365,6 +3365,24 @@ public class YarnConfiguration extends Configuration { public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = false; + private static final String FEDERATION_GPG_PREFIX = + FEDERATION_PREFIX + "gpg."; + + // The number of threads to use for the GPG scheduled executor service + public static final String GPG_SCHEDULED_EXECUTOR_THREADS = + FEDERATION_GPG_PREFIX + "scheduled.executor.threads"; + public static final int DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS = 10; + + // The interval at which the subcluster cleaner runs, -1 means disabled + public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = + FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms"; + public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = -1; + + // The expiration time for a subcluster heartbeat, default is 30 minutes + public static final String GPG_SUBCLUSTER_EXPIRATION_MS = + FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; + public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000; + //////////////////////////////// // Other Configs //////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0700902..7df0a67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3611,6 +3611,30 @@ <property> <description> + The number of threads to use for the GPG scheduled executor service. + </description> + <name>yarn.federation.gpg.scheduled.executor.threads</name> + <value>10</value> + </property> + + <property> + <description> + The interval at which the subcluster cleaner runs, -1 means disabled. + </description> + <name>yarn.federation.gpg.subcluster.cleaner.interval-ms</name> + <value>-1</value> + </property> + + <property> + <description> + The expiration time for a subcluster heartbeat, default is 30 minutes. + </description> + <name>yarn.federation.gpg.subcluster.heartbeat.expiration-ms</name> + <value>1800000</value> + </property> + + <property> + <description> It is TimelineClient 1.5 configuration whether to store active applicationâs timeline data with in user directory i.e ${yarn.timeline-service.entity-group-fs-store.active-dir}/${user.name} http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 7c06256..b42fc79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -68,6 +68,8 @@ import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * In-memory implementation of {@link FederationStateStore}. */ @@ -158,6 +160,17 @@ public class MemoryFederationStateStore implements FederationStateStore { return SubClusterHeartbeatResponse.newInstance(); } + @VisibleForTesting + public void setSubClusterLastHeartbeat(SubClusterId subClusterId, + long lastHeartbeat) throws YarnException { + SubClusterInfo subClusterInfo = membership.get(subClusterId); + if (subClusterInfo == null) { + throw new YarnException( + "Subcluster " + subClusterId.toString() + " does not exist"); + } + subClusterInfo.setLastHeartBeat(lastHeartbeat); + } + @Override public GetSubClusterInfoResponse getSubCluster( GetSubClusterInfoRequest request) throws YarnException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 5d9702f..0761773 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -62,9 +62,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -221,6 +223,22 @@ public final class FederationStateStoreFacade { } /** + * Deregister a <em>subcluster</em> identified by {@code SubClusterId} to + * change state in federation. This can be done to mark the sub cluster lost, + * deregistered, or decommissioned. + * + * @param subClusterId the target subclusterId + * @param subClusterState the state to update it to + * @throws YarnException if the request is invalid/fails + */ + public void deregisterSubCluster(SubClusterId subClusterId, + SubClusterState subClusterState) throws YarnException { + stateStore.deregisterSubCluster( + SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState)); + return; + } + + /** * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}. * * @param subClusterId the identifier of the sub-cluster @@ -255,8 +273,7 @@ public final class FederationStateStoreFacade { public SubClusterInfo getSubCluster(final SubClusterId subClusterId, final boolean flushCache) throws YarnException { if (flushCache && isCachingEnabled()) { - LOG.info("Flushing subClusters from cache and rehydrating from store," - + " most likely on account of RM failover."); + LOG.info("Flushing subClusters from cache and rehydrating from store."); cache.remove(buildGetSubClustersCacheRequest(false)); } return getSubCluster(subClusterId); @@ -287,6 +304,26 @@ public final class FederationStateStoreFacade { } /** + * Updates the cache with the central {@link FederationStateStore} and returns + * the {@link SubClusterInfo} of all active sub cluster(s). + * + * @param filterInactiveSubClusters whether to filter out inactive + * sub-clusters + * @param flushCache flag to indicate if the cache should be flushed or not + * @return the sub cluster information + * @throws YarnException if the call to the state store is unsuccessful + */ + public Map<SubClusterId, SubClusterInfo> getSubClusters( + final boolean filterInactiveSubClusters, final boolean flushCache) + throws YarnException { + if (flushCache && isCachingEnabled()) { + LOG.info("Flushing subClusters from cache and rehydrating from store."); + cache.remove(buildGetSubClustersCacheRequest(filterInactiveSubClusters)); + } + return getSubClusters(filterInactiveSubClusters); + } + + /** * Returns the {@link SubClusterPolicyConfiguration} for the specified queue. * * @param queue the queue whose policy is required http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index c1f7460..f6cfba0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang.time.DurationFormatUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.service.CompositeService; @@ -28,6 +31,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,36 +59,26 @@ public class GlobalPolicyGenerator extends CompositeService { // Federation Variables private GPGContext gpgContext; + // Scheduler service that runs tasks periodically + private ScheduledThreadPoolExecutor scheduledExecutorService; + private SubClusterCleaner subClusterCleaner; + public GlobalPolicyGenerator() { super(GlobalPolicyGenerator.class.getName()); this.gpgContext = new GPGContextImpl(); } - protected void initAndStart(Configuration conf, boolean hasToReboot) { - try { - // Remove the old hook if we are rebooting. - if (hasToReboot && null != gpgShutdownHook) { - ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook); - } - - gpgShutdownHook = new CompositeServiceShutdownHook(this); - ShutdownHookManager.get().addShutdownHook(gpgShutdownHook, - SHUTDOWN_HOOK_PRIORITY); - - this.init(conf); - this.start(); - } catch (Throwable t) { - LOG.error("Error starting globalpolicygenerator", t); - System.exit(-1); - } - } - @Override protected void serviceInit(Configuration conf) throws Exception { // Set up the context this.gpgContext .setStateStoreFacade(FederationStateStoreFacade.getInstance()); + this.scheduledExecutorService = new ScheduledThreadPoolExecutor( + conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, + YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); + this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext); + DefaultMetricsSystem.initialize(METRICS_NAME); // super.serviceInit after all services are added @@ -94,10 +88,32 @@ public class GlobalPolicyGenerator extends CompositeService { @Override protected void serviceStart() throws Exception { super.serviceStart(); + + // Scheduler SubClusterCleaner service + long scCleanerIntervalMs = getConfig().getLong( + YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, + YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS); + if (scCleanerIntervalMs > 0) { + this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner, + 0, scCleanerIntervalMs, TimeUnit.MILLISECONDS); + LOG.info("Scheduled sub-cluster cleaner with interval: {}", + DurationFormatUtils.formatDurationISO(scCleanerIntervalMs)); + } } @Override protected void serviceStop() throws Exception { + try { + if (this.scheduledExecutorService != null + && !this.scheduledExecutorService.isShutdown()) { + this.scheduledExecutorService.shutdown(); + LOG.info("Stopped ScheduledExecutorService"); + } + } catch (Exception e) { + LOG.error("Failed to shutdown ScheduledExecutorService", e); + throw e; + } + if (this.isStopping.getAndSet(true)) { return; } @@ -113,20 +129,40 @@ public class GlobalPolicyGenerator extends CompositeService { return this.gpgContext; } + private void initAndStart(Configuration conf, boolean hasToReboot) { + // Remove the old hook if we are rebooting. + if (hasToReboot && null != gpgShutdownHook) { + ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook); + } + + gpgShutdownHook = new CompositeServiceShutdownHook(this); + ShutdownHookManager.get().addShutdownHook(gpgShutdownHook, + SHUTDOWN_HOOK_PRIORITY); + + this.init(conf); + this.start(); + } + @SuppressWarnings("resource") public static void startGPG(String[] argv, Configuration conf) { boolean federationEnabled = conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED, YarnConfiguration.DEFAULT_FEDERATION_ENABLED); - if (federationEnabled) { - Thread.setDefaultUncaughtExceptionHandler( - new YarnUncaughtExceptionHandler()); - StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv, - LOG); - GlobalPolicyGenerator globalPolicyGenerator = new GlobalPolicyGenerator(); - globalPolicyGenerator.initAndStart(conf, false); - } else { - LOG.warn("Federation is not enabled. The gpg cannot start."); + try { + if (federationEnabled) { + Thread.setDefaultUncaughtExceptionHandler( + new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv, + LOG); + GlobalPolicyGenerator globalPolicyGenerator = + new GlobalPolicyGenerator(); + globalPolicyGenerator.initAndStart(conf, false); + } else { + LOG.warn("Federation is not enabled. The gpg cannot start."); + } + } catch (Throwable t) { + LOG.error("Error starting globalpolicygenerator", t); + System.exit(-1); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java new file mode 100644 index 0000000..dad5121 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner; + +import java.util.Date; +import java.util.Map; + +import org.apache.commons.lang.time.DurationFormatUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The sub-cluster cleaner is one of the GPG's services that periodically checks + * the membership table in FederationStateStore and mark sub-clusters that have + * not sent a heartbeat in certain amount of time as LOST. + */ +public class SubClusterCleaner implements Runnable { + + private static final Logger LOG = + LoggerFactory.getLogger(SubClusterCleaner.class); + + private GPGContext gpgContext; + private long heartbeatExpirationMillis; + + /** + * The sub-cluster cleaner runnable is invoked by the sub cluster cleaner + * service to check the membership table and remove sub clusters that have not + * sent a heart beat in some amount of time. + */ + public SubClusterCleaner(Configuration conf, GPGContext gpgContext) { + this.heartbeatExpirationMillis = + conf.getLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, + YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS); + this.gpgContext = gpgContext; + LOG.info("Initialized SubClusterCleaner with heartbeat expiration of {}", + DurationFormatUtils.formatDurationISO(this.heartbeatExpirationMillis)); + } + + @Override + public void run() { + try { + Date now = new Date(); + LOG.info("SubClusterCleaner at {}", now); + + Map<SubClusterId, SubClusterInfo> infoMap = + this.gpgContext.getStateStoreFacade().getSubClusters(false, true); + + // Iterate over each sub cluster and check last heartbeat + for (Map.Entry<SubClusterId, SubClusterInfo> entry : infoMap.entrySet()) { + SubClusterInfo subClusterInfo = entry.getValue(); + + Date lastHeartBeat = new Date(subClusterInfo.getLastHeartBeat()); + if (LOG.isDebugEnabled()) { + LOG.debug("Checking subcluster {} in state {}, last heartbeat at {}", + subClusterInfo.getSubClusterId(), subClusterInfo.getState(), + lastHeartBeat); + } + + if (!subClusterInfo.getState().isUnusable()) { + long timeUntilDeregister = this.heartbeatExpirationMillis + - (now.getTime() - lastHeartBeat.getTime()); + // Deregister sub-cluster as SC_LOST if last heartbeat too old + if (timeUntilDeregister < 0) { + LOG.warn( + "Deregistering subcluster {} in state {} last heartbeat at {}", + subClusterInfo.getSubClusterId(), subClusterInfo.getState(), + new Date(subClusterInfo.getLastHeartBeat())); + try { + this.gpgContext.getStateStoreFacade().deregisterSubCluster( + subClusterInfo.getSubClusterId(), SubClusterState.SC_LOST); + } catch (Exception e) { + LOG.error("deregisterSubCluster failed on subcluster " + + subClusterInfo.getSubClusterId(), e); + } + } else if (LOG.isDebugEnabled()) { + LOG.debug("Time until deregister for subcluster {}: {}", + entry.getKey(), + DurationFormatUtils.formatDurationISO(timeUntilDeregister)); + } + } + } + } catch (Throwable e) { + LOG.error("Subcluster cleaner fails: ", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java new file mode 100644 index 0000000..f65444a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner; http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java new file mode 100644 index 0000000..19b8802 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner; + +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for Sub-cluster Cleaner in GPG. + */ +public class TestSubClusterCleaner { + + private Configuration conf; + private MemoryFederationStateStore stateStore; + private FederationStateStoreFacade facade; + private SubClusterCleaner cleaner; + private GPGContext gpgContext; + + private ArrayList<SubClusterId> subClusterIds; + + @Before + public void setup() throws YarnException { + conf = new YarnConfiguration(); + + // subcluster expires in one second + conf.setLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, 1000); + + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + + facade = FederationStateStoreFacade.getInstance(); + facade.reinitialize(stateStore, conf); + + gpgContext = new GPGContextImpl(); + gpgContext.setStateStoreFacade(facade); + + cleaner = new SubClusterCleaner(conf, gpgContext); + + // Create and register six sub clusters + subClusterIds = new ArrayList<SubClusterId>(); + for (int i = 0; i < 3; i++) { + // Create sub cluster id and info + SubClusterId subClusterId = + SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i)); + + SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId, + "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", "1.2.3.4:4", + SubClusterState.SC_RUNNING, System.currentTimeMillis(), ""); + // Register the sub cluster + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + // Append the id to a local list + subClusterIds.add(subClusterId); + } + } + + @After + public void breakDown() throws Exception { + stateStore.close(); + } + + @Test + public void testSubClusterRegisterHeartBeatTime() throws YarnException { + cleaner.run(); + Assert.assertEquals(3, facade.getSubClusters(true, true).size()); + } + + /** + * Test the base use case. + */ + @Test + public void testSubClusterHeartBeat() throws YarnException { + // The first subcluster reports as Unhealthy + SubClusterId subClusterId = subClusterIds.get(0); + stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest + .newInstance(subClusterId, SubClusterState.SC_UNHEALTHY, "capacity")); + + // The second subcluster didn't heartbeat for two seconds, should mark lost + subClusterId = subClusterIds.get(1); + stateStore.setSubClusterLastHeartbeat(subClusterId, + System.currentTimeMillis() - 2000); + + cleaner.run(); + Assert.assertEquals(1, facade.getSubClusters(true, true).size()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org