YARN-6648. [GPG] Add SubClusterCleaner in Global Policy Generator. (botong)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/33c2916d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/33c2916d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/33c2916d

Branch: refs/heads/YARN-7402
Commit: 33c2916d6360e5abf4efaf924612c4daa1061aff
Parents: 6bea5ee
Author: Botong Huang <bot...@apache.org>
Authored: Thu Feb 1 14:43:48 2018 -0800
Committer: Botong Huang <bot...@apache.org>
Committed: Wed Sep 19 13:47:32 2018 -0700

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |   5 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  18 +++
 .../src/main/resources/yarn-default.xml         |  24 ++++
 .../store/impl/MemoryFederationStateStore.java  |  13 ++
 .../utils/FederationStateStoreFacade.java       |  41 ++++++-
 .../GlobalPolicyGenerator.java                  |  92 ++++++++++-----
 .../subclustercleaner/SubClusterCleaner.java    | 109 +++++++++++++++++
 .../subclustercleaner/package-info.java         |  19 +++
 .../TestSubClusterCleaner.java                  | 118 +++++++++++++++++++
 9 files changed, 409 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml 
b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 216c3bd..9fcafad 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -387,6 +387,11 @@
     <Method name="initAndStartNodeManager" />
     <Bug pattern="DM_EXIT" />
   </Match>
+  <Match>
+    <Class 
name="org.apache.hadoop.yarn.server.globalpolicygenerator.GlobalPolicyGenerator"
 />
+    <Medhod name="startGPG" />
+    <Bug pattern="DM_EXIT" />
+  </Match>
  
   <!-- Ignore heartbeat exception when killing localizer -->
   <Match>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a82801d..5c7bf26 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3365,6 +3365,24 @@ public class YarnConfiguration extends Configuration {
   public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
       false;
 
+  private static final String FEDERATION_GPG_PREFIX =
+      FEDERATION_PREFIX + "gpg.";
+
+  // The number of threads to use for the GPG scheduled executor service
+  public static final String GPG_SCHEDULED_EXECUTOR_THREADS =
+      FEDERATION_GPG_PREFIX + "scheduled.executor.threads";
+  public static final int DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS = 10;
+
+  // The interval at which the subcluster cleaner runs, -1 means disabled
+  public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS =
+      FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms";
+  public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = -1;
+
+  // The expiration time for a subcluster heartbeat, default is 30 minutes
+  public static final String GPG_SUBCLUSTER_EXPIRATION_MS =
+      FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
+  public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0700902..7df0a67 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3611,6 +3611,30 @@
 
   <property>
     <description>
+      The number of threads to use for the GPG scheduled executor service.
+    </description>
+    <name>yarn.federation.gpg.scheduled.executor.threads</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>
+      The interval at which the subcluster cleaner runs, -1 means disabled.
+    </description>
+    <name>yarn.federation.gpg.subcluster.cleaner.interval-ms</name>
+    <value>-1</value>
+  </property>
+
+  <property>
+    <description>
+      The expiration time for a subcluster heartbeat, default is 30 minutes.
+    </description>
+    <name>yarn.federation.gpg.subcluster.heartbeat.expiration-ms</name>
+    <value>1800000</value>
+  </property>
+
+  <property>
+    <description>
        It is TimelineClient 1.5 configuration whether to store active
        application’s timeline data with in user directory i.e
        ${yarn.timeline-service.entity-group-fs-store.active-dir}/${user.name}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index 7c06256..b42fc79 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -68,6 +68,8 @@ import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * In-memory implementation of {@link FederationStateStore}.
  */
@@ -158,6 +160,17 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
     return SubClusterHeartbeatResponse.newInstance();
   }
 
+  @VisibleForTesting
+  public void setSubClusterLastHeartbeat(SubClusterId subClusterId,
+      long lastHeartbeat) throws YarnException {
+    SubClusterInfo subClusterInfo = membership.get(subClusterId);
+    if (subClusterInfo == null) {
+      throw new YarnException(
+          "Subcluster " + subClusterId.toString() + " does not exist");
+    }
+    subClusterInfo.setLastHeartBeat(lastHeartbeat);
+  }
+
   @Override
   public GetSubClusterInfoResponse getSubCluster(
       GetSubClusterInfoRequest request) throws YarnException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 5d9702f..0761773 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -62,9 +62,11 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
 import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -221,6 +223,22 @@ public final class FederationStateStoreFacade {
   }
 
   /**
+   * Deregister a <em>subcluster</em> identified by {@code SubClusterId} to
+   * change state in federation. This can be done to mark the sub cluster lost,
+   * deregistered, or decommissioned.
+   *
+   * @param subClusterId the target subclusterId
+   * @param subClusterState the state to update it to
+   * @throws YarnException if the request is invalid/fails
+   */
+  public void deregisterSubCluster(SubClusterId subClusterId,
+      SubClusterState subClusterState) throws YarnException {
+    stateStore.deregisterSubCluster(
+        SubClusterDeregisterRequest.newInstance(subClusterId, 
subClusterState));
+    return;
+  }
+
+  /**
    * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
    *
    * @param subClusterId the identifier of the sub-cluster
@@ -255,8 +273,7 @@ public final class FederationStateStoreFacade {
   public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
       final boolean flushCache) throws YarnException {
     if (flushCache && isCachingEnabled()) {
-      LOG.info("Flushing subClusters from cache and rehydrating from store,"
-          + " most likely on account of RM failover.");
+      LOG.info("Flushing subClusters from cache and rehydrating from store.");
       cache.remove(buildGetSubClustersCacheRequest(false));
     }
     return getSubCluster(subClusterId);
@@ -287,6 +304,26 @@ public final class FederationStateStoreFacade {
   }
 
   /**
+   * Updates the cache with the central {@link FederationStateStore} and 
returns
+   * the {@link SubClusterInfo} of all active sub cluster(s).
+   *
+   * @param filterInactiveSubClusters whether to filter out inactive
+   *          sub-clusters
+   * @param flushCache flag to indicate if the cache should be flushed or not
+   * @return the sub cluster information
+   * @throws YarnException if the call to the state store is unsuccessful
+   */
+  public Map<SubClusterId, SubClusterInfo> getSubClusters(
+      final boolean filterInactiveSubClusters, final boolean flushCache)
+      throws YarnException {
+    if (flushCache && isCachingEnabled()) {
+      LOG.info("Flushing subClusters from cache and rehydrating from store.");
+      cache.remove(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
+    }
+    return getSubClusters(filterInactiveSubClusters);
+  }
+
+  /**
    * Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
    *
    * @param queue the queue whose policy is required

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index c1f7460..f6cfba0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -18,8 +18,11 @@
 
 package org.apache.hadoop.yarn.server.globalpolicygenerator;
 
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.lang.time.DurationFormatUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.service.CompositeService;
@@ -28,6 +31,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import 
org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,36 +59,26 @@ public class GlobalPolicyGenerator extends CompositeService 
{
   // Federation Variables
   private GPGContext gpgContext;
 
+  // Scheduler service that runs tasks periodically
+  private ScheduledThreadPoolExecutor scheduledExecutorService;
+  private SubClusterCleaner subClusterCleaner;
+
   public GlobalPolicyGenerator() {
     super(GlobalPolicyGenerator.class.getName());
     this.gpgContext = new GPGContextImpl();
   }
 
-  protected void initAndStart(Configuration conf, boolean hasToReboot) {
-    try {
-      // Remove the old hook if we are rebooting.
-      if (hasToReboot && null != gpgShutdownHook) {
-        ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook);
-      }
-
-      gpgShutdownHook = new CompositeServiceShutdownHook(this);
-      ShutdownHookManager.get().addShutdownHook(gpgShutdownHook,
-          SHUTDOWN_HOOK_PRIORITY);
-
-      this.init(conf);
-      this.start();
-    } catch (Throwable t) {
-      LOG.error("Error starting globalpolicygenerator", t);
-      System.exit(-1);
-    }
-  }
-
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     // Set up the context
     this.gpgContext
         .setStateStoreFacade(FederationStateStoreFacade.getInstance());
 
+    this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
+        conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
+            YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
+    this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+
     DefaultMetricsSystem.initialize(METRICS_NAME);
 
     // super.serviceInit after all services are added
@@ -94,10 +88,32 @@ public class GlobalPolicyGenerator extends CompositeService 
{
   @Override
   protected void serviceStart() throws Exception {
     super.serviceStart();
+
+    // Scheduler SubClusterCleaner service
+    long scCleanerIntervalMs = getConfig().getLong(
+        YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS);
+    if (scCleanerIntervalMs > 0) {
+      this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner,
+          0, scCleanerIntervalMs, TimeUnit.MILLISECONDS);
+      LOG.info("Scheduled sub-cluster cleaner with interval: {}",
+          DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
+    }
   }
 
   @Override
   protected void serviceStop() throws Exception {
+    try {
+      if (this.scheduledExecutorService != null
+          && !this.scheduledExecutorService.isShutdown()) {
+        this.scheduledExecutorService.shutdown();
+        LOG.info("Stopped ScheduledExecutorService");
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to shutdown ScheduledExecutorService", e);
+      throw e;
+    }
+
     if (this.isStopping.getAndSet(true)) {
       return;
     }
@@ -113,20 +129,40 @@ public class GlobalPolicyGenerator extends 
CompositeService {
     return this.gpgContext;
   }
 
+  private void initAndStart(Configuration conf, boolean hasToReboot) {
+    // Remove the old hook if we are rebooting.
+    if (hasToReboot && null != gpgShutdownHook) {
+      ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook);
+    }
+
+    gpgShutdownHook = new CompositeServiceShutdownHook(this);
+    ShutdownHookManager.get().addShutdownHook(gpgShutdownHook,
+        SHUTDOWN_HOOK_PRIORITY);
+
+    this.init(conf);
+    this.start();
+  }
+
   @SuppressWarnings("resource")
   public static void startGPG(String[] argv, Configuration conf) {
     boolean federationEnabled =
         conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
             YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
-    if (federationEnabled) {
-      Thread.setDefaultUncaughtExceptionHandler(
-          new YarnUncaughtExceptionHandler());
-      StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv,
-          LOG);
-      GlobalPolicyGenerator globalPolicyGenerator = new 
GlobalPolicyGenerator();
-      globalPolicyGenerator.initAndStart(conf, false);
-    } else {
-      LOG.warn("Federation is not enabled. The gpg cannot start.");
+    try {
+      if (federationEnabled) {
+        Thread.setDefaultUncaughtExceptionHandler(
+            new YarnUncaughtExceptionHandler());
+        StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv,
+            LOG);
+        GlobalPolicyGenerator globalPolicyGenerator =
+            new GlobalPolicyGenerator();
+        globalPolicyGenerator.initAndStart(conf, false);
+      } else {
+        LOG.warn("Federation is not enabled. The gpg cannot start.");
+      }
+    } catch (Throwable t) {
+      LOG.error("Error starting globalpolicygenerator", t);
+      System.exit(-1);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java
new file mode 100644
index 0000000..dad5121
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The sub-cluster cleaner is one of the GPG's services that periodically 
checks
+ * the membership table in FederationStateStore and mark sub-clusters that have
+ * not sent a heartbeat in certain amount of time as LOST.
+ */
+public class SubClusterCleaner implements Runnable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SubClusterCleaner.class);
+
+  private GPGContext gpgContext;
+  private long heartbeatExpirationMillis;
+
+  /**
+   * The sub-cluster cleaner runnable is invoked by the sub cluster cleaner
+   * service to check the membership table and remove sub clusters that have 
not
+   * sent a heart beat in some amount of time.
+   */
+  public SubClusterCleaner(Configuration conf, GPGContext gpgContext) {
+    this.heartbeatExpirationMillis =
+        conf.getLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS,
+            YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS);
+    this.gpgContext = gpgContext;
+    LOG.info("Initialized SubClusterCleaner with heartbeat expiration of {}",
+        DurationFormatUtils.formatDurationISO(this.heartbeatExpirationMillis));
+  }
+
+  @Override
+  public void run() {
+    try {
+      Date now = new Date();
+      LOG.info("SubClusterCleaner at {}", now);
+
+      Map<SubClusterId, SubClusterInfo> infoMap =
+          this.gpgContext.getStateStoreFacade().getSubClusters(false, true);
+
+      // Iterate over each sub cluster and check last heartbeat
+      for (Map.Entry<SubClusterId, SubClusterInfo> entry : infoMap.entrySet()) 
{
+        SubClusterInfo subClusterInfo = entry.getValue();
+
+        Date lastHeartBeat = new Date(subClusterInfo.getLastHeartBeat());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Checking subcluster {} in state {}, last heartbeat at {}",
+              subClusterInfo.getSubClusterId(), subClusterInfo.getState(),
+              lastHeartBeat);
+        }
+
+        if (!subClusterInfo.getState().isUnusable()) {
+          long timeUntilDeregister = this.heartbeatExpirationMillis
+              - (now.getTime() - lastHeartBeat.getTime());
+          // Deregister sub-cluster as SC_LOST if last heartbeat too old
+          if (timeUntilDeregister < 0) {
+            LOG.warn(
+                "Deregistering subcluster {} in state {} last heartbeat at {}",
+                subClusterInfo.getSubClusterId(), subClusterInfo.getState(),
+                new Date(subClusterInfo.getLastHeartBeat()));
+            try {
+              this.gpgContext.getStateStoreFacade().deregisterSubCluster(
+                  subClusterInfo.getSubClusterId(), SubClusterState.SC_LOST);
+            } catch (Exception e) {
+              LOG.error("deregisterSubCluster failed on subcluster "
+                  + subClusterInfo.getSubClusterId(), e);
+            }
+          } else if (LOG.isDebugEnabled()) {
+            LOG.debug("Time until deregister for subcluster {}: {}",
+                entry.getKey(),
+                DurationFormatUtils.formatDurationISO(timeUntilDeregister));
+          }
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Subcluster cleaner fails: ", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java
new file mode 100644
index 0000000..f65444a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java
@@ -0,0 +1,19 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/33c2916d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java
new file mode 100644
index 0000000..19b8802
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for Sub-cluster Cleaner in GPG.
+ */
+public class TestSubClusterCleaner {
+
+  private Configuration conf;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreFacade facade;
+  private SubClusterCleaner cleaner;
+  private GPGContext gpgContext;
+
+  private ArrayList<SubClusterId> subClusterIds;
+
+  @Before
+  public void setup() throws YarnException {
+    conf = new YarnConfiguration();
+
+    // subcluster expires in one second
+    conf.setLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, 1000);
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(conf);
+
+    facade = FederationStateStoreFacade.getInstance();
+    facade.reinitialize(stateStore, conf);
+
+    gpgContext = new GPGContextImpl();
+    gpgContext.setStateStoreFacade(facade);
+
+    cleaner = new SubClusterCleaner(conf, gpgContext);
+
+    // Create and register six sub clusters
+    subClusterIds = new ArrayList<SubClusterId>();
+    for (int i = 0; i < 3; i++) {
+      // Create sub cluster id and info
+      SubClusterId subClusterId =
+          SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i));
+
+      SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+          "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", "1.2.3.4:4",
+          SubClusterState.SC_RUNNING, System.currentTimeMillis(), "");
+      // Register the sub cluster
+      stateStore.registerSubCluster(
+          SubClusterRegisterRequest.newInstance(subClusterInfo));
+      // Append the id to a local list
+      subClusterIds.add(subClusterId);
+    }
+  }
+
+  @After
+  public void breakDown() throws Exception {
+    stateStore.close();
+  }
+
+  @Test
+  public void testSubClusterRegisterHeartBeatTime() throws YarnException {
+    cleaner.run();
+    Assert.assertEquals(3, facade.getSubClusters(true, true).size());
+  }
+
+  /**
+   * Test the base use case.
+   */
+  @Test
+  public void testSubClusterHeartBeat() throws YarnException {
+    // The first subcluster reports as Unhealthy
+    SubClusterId subClusterId = subClusterIds.get(0);
+    stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
+        .newInstance(subClusterId, SubClusterState.SC_UNHEALTHY, "capacity"));
+
+    // The second subcluster didn't heartbeat for two seconds, should mark lost
+    subClusterId = subClusterIds.get(1);
+    stateStore.setSubClusterLastHeartbeat(subClusterId,
+        System.currentTimeMillis() - 2000);
+
+    cleaner.run();
+    Assert.assertEquals(1, facade.getSubClusters(true, true).size());
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to