YARN-7599. [GPG] ApplicationCleaner in Global Policy Generator. Contributed by 
Botong Huang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1facc8a2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1facc8a2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1facc8a2

Branch: refs/heads/YARN-7402
Commit: 1facc8a28416efb7424cec101c80e60ac76f25ac
Parents: 1e8686b
Author: Botong Huang <bot...@apache.org>
Authored: Fri Sep 21 17:30:44 2018 -0700
Committer: Botong Huang <bot...@apache.org>
Committed: Mon Nov 12 15:09:38 2018 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  25 +++
 .../src/main/resources/yarn-default.xml         |  28 ++++
 .../store/impl/MemoryFederationStateStore.java  |   2 -
 .../pb/ApplicationHomeSubClusterPBImpl.java     |   3 +
 .../utils/FederationStateStoreFacade.java       |  33 ++++
 .../server/globalpolicygenerator/GPGUtils.java  |  21 ++-
 .../GlobalPolicyGenerator.java                  |  23 ++-
 .../applicationcleaner/ApplicationCleaner.java  | 154 +++++++++++++++++++
 .../DefaultApplicationCleaner.java              |  82 ++++++++++
 .../applicationcleaner/package-info.java        |  19 +++
 .../TestDefaultApplicationCleaner.java          | 130 ++++++++++++++++
 11 files changed, 513 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c42ebb1..aa990d3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3445,6 +3445,31 @@ public class YarnConfiguration extends Configuration {
       FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
   public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000;
 
+  // The application cleaner class to use
+  public static final String GPG_APPCLEANER_CLASS =
+      FEDERATION_GPG_PREFIX + "application.cleaner.class";
+  public static final String DEFAULT_GPG_APPCLEANER_CLASS =
+      "org.apache.hadoop.yarn.server.globalpolicygenerator"
+          + ".applicationcleaner.DefaultApplicationCleaner";
+
+  // The interval at which the application cleaner runs, -1 means disabled
+  public static final String GPG_APPCLEANER_INTERVAL_MS =
+      FEDERATION_GPG_PREFIX + "application.cleaner.interval-ms";
+  public static final long DEFAULT_GPG_APPCLEANER_INTERVAL_MS = -1;
+
+  /**
+   * Specifications on how (many times) to contact Router for apps. We need to
+   * do this because Router might return partial application list because some
+   * sub-cluster RM is not responsive (e.g. failing over).
+   *
+   * Should have three values separated by comma: minimal success retries,
+   * maximum total retry, retry interval (ms).
+   */
+  public static final String GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+      FEDERATION_GPG_PREFIX + "application.cleaner.contact.router.spec";
+  public static final String DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+      "3,10,600000";
+
   public static final String FEDERATION_GPG_POLICY_PREFIX =
       FEDERATION_GPG_PREFIX + "policy.generator.";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a08ff96..e496e28 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3671,6 +3671,34 @@
 
   <property>
     <description>
+      The Application Cleaner implementation class for GPG to use.
+    </description>
+    <name>yarn.federation.gpg.application.cleaner.class</name>
+    
<value>org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.DefaultApplicationCleaner</value>
+  </property>
+
+  <property>
+    <description>
+      The interval at which the application cleaner runs, -1 means disabled.
+    </description>
+    <name>yarn.federation.gpg.application.cleaner.interval-ms</name>
+    <value>-1</value>
+  </property>
+
+  <property>
+    <description>
+      Specifications on how (many times) to contact Router for apps. We need to
+      do this because Router might return partial application list because some
+      sub-cluster RM is not responsive (e.g. failing over).
+      Should have three values separated by comma: minimal success retries,
+      maximum total retry, retry interval (ms).
+    </description>
+    <name>yarn.federation.gpg.application.cleaner.contact.router.spec</name>
+    <value>3,10,600000</value>
+  </property>
+
+  <property>
+    <description>
       The interval at which the policy generator runs, default is one hour
     </description>
     <name>yarn.federation.gpg.policy.generator.interval-ms</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index b42fc79..f7cdcd5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -259,8 +259,6 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
       result
           .add(ApplicationHomeSubCluster.newInstance(e.getKey(), 
e.getValue()));
     }
-
-    GetApplicationsHomeSubClusterResponse.newInstance(result);
     return GetApplicationsHomeSubClusterResponse.newInstance(result);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
index 7e6a564..6bd80fb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
@@ -110,6 +110,9 @@ public class ApplicationHomeSubClusterPBImpl extends 
ApplicationHomeSubCluster {
   @Override
   public ApplicationId getApplicationId() {
     ApplicationHomeSubClusterProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.applicationId != null) {
+      return this.applicationId;
+    }
     if (!p.hasApplicationId()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index d10e568..df5f50c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -52,8 +52,11 @@ import 
org.apache.hadoop.yarn.server.federation.store.exception.FederationStateS
 import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import 
org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
@@ -433,6 +436,36 @@ public final class FederationStateStoreFacade {
   }
 
   /**
+   * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
+   * all submitted applications to it's home sub-cluster.
+   *
+   * @return the mapping of all submitted application to it's home sub-cluster
+   * @throws YarnException if the request is invalid/fails
+   */
+  public List<ApplicationHomeSubCluster> getApplicationsHomeSubCluster()
+      throws YarnException {
+    GetApplicationsHomeSubClusterResponse response =
+        stateStore.getApplicationsHomeSubCluster(
+            GetApplicationsHomeSubClusterRequest.newInstance());
+    return response.getAppsHomeSubClusters();
+  }
+
+  /**
+   * Delete the mapping of home {@code SubClusterId} of a previously submitted
+   * {@code ApplicationId}. Currently response is empty if the operation is
+   * successful, if not an exception reporting reason for a failure.
+   *
+   * @param applicationId the application to delete the home sub-cluster of
+   * @throws YarnException if the request is invalid/fails
+   */
+  public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
+      throws YarnException {
+    stateStore.deleteApplicationHomeSubCluster(
+        DeleteApplicationHomeSubClusterRequest.newInstance(applicationId));
+    return;
+  }
+
+  /**
    * Get the singleton instance of SubClusterResolver.
    *
    * @return SubClusterResolver instance

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
index 31cee1c..615cf3a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
 
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
@@ -49,15 +50,19 @@ public final class GPGUtils {
    * Performs an invocation of the the remote RMWebService.
    */
   public static <T> T invokeRMWebService(Configuration conf, String webAddr,
-      String path, final Class<T> returnType) {
+      String path, final Class<T> returnType, String deSelectParam) {
     Client client = Client.create();
     T obj = null;
 
-    WebResource webResource = client.resource(webAddr);
+    WebResource webResource =
+        client.resource(webAddr).path("ws/v1/cluster").path(path);
+    if (deSelectParam != null) {
+      webResource = webResource.queryParam(RMWSConsts.DESELECTS, 
deSelectParam);
+    }
     ClientResponse response = null;
     try {
-      response = webResource.path("ws/v1/cluster").path(path)
-          .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
+      response = webResource.accept(MediaType.APPLICATION_XML)
+          .get(ClientResponse.class);
       if (response.getStatus() == SC_OK) {
         obj = response.getEntity(returnType);
       } else {
@@ -74,6 +79,14 @@ public final class GPGUtils {
   }
 
   /**
+   * Performs an invocation of the the remote RMWebService.
+   */
+  public static <T> T invokeRMWebService(Configuration conf, String webAddr,
+      String path, final Class<T> returnType) {
+    return invokeRMWebService(conf, webAddr, path, returnType, null);
+  }
+
+  /**
    * Creates a uniform weighting of 1.0 for each sub cluster.
    */
   public static Map<SubClusterIdInfo, Float> createUniformWeights(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index 1ae07f3..c8ec4cd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import 
org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
 import 
org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
 import 
org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
 import org.slf4j.Logger;
@@ -63,6 +64,7 @@ public class GlobalPolicyGenerator extends CompositeService {
   // Scheduler service that runs tasks periodically
   private ScheduledThreadPoolExecutor scheduledExecutorService;
   private SubClusterCleaner subClusterCleaner;
+  private ApplicationCleaner applicationCleaner;
   private PolicyGenerator policyGenerator;
 
   public GlobalPolicyGenerator() {
@@ -82,7 +84,15 @@ public class GlobalPolicyGenerator extends CompositeService {
     this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
         conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
             YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
+
     this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+
+    this.applicationCleaner = FederationStateStoreFacade.createInstance(conf,
+        YarnConfiguration.GPG_APPCLEANER_CLASS,
+        YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS,
+        ApplicationCleaner.class);
+    this.applicationCleaner.init(conf, this.gpgContext);
+
     this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
 
     DefaultMetricsSystem.initialize(METRICS_NAME);
@@ -95,7 +105,7 @@ public class GlobalPolicyGenerator extends CompositeService {
   protected void serviceStart() throws Exception {
     super.serviceStart();
 
-    // Scheduler SubClusterCleaner service
+    // Schedule SubClusterCleaner service
     long scCleanerIntervalMs = getConfig().getLong(
         YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
         YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS);
@@ -106,6 +116,17 @@ public class GlobalPolicyGenerator extends 
CompositeService {
           DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
     }
 
+    // Schedule ApplicationCleaner service
+    long appCleanerIntervalMs =
+        getConfig().getLong(YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS);
+    if (appCleanerIntervalMs > 0) {
+      
this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner,
+          0, appCleanerIntervalMs, TimeUnit.MILLISECONDS);
+      LOG.info("Scheduled application cleaner with interval: {}",
+          DurationFormatUtils.formatDurationISO(appCleanerIntervalMs));
+    }
+
     // Schedule PolicyGenerator
     long policyGeneratorIntervalMillis = getConfig().getLong(
         YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
new file mode 100644
index 0000000..85047ef
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ApplicationCleaner is a runnable that cleans up old applications from
+ * table applicationsHomeSubCluster in FederationStateStore.
+ */
+public abstract class ApplicationCleaner implements Runnable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ApplicationCleaner.class);
+
+  private Configuration conf;
+  private GPGContext gpgContext;
+
+  private int minRouterSuccessCount;
+  private int maxRouterRetry;
+  private long routerQueryIntevalMillis;
+
+  public void init(Configuration config, GPGContext context)
+      throws YarnException {
+
+    this.gpgContext = context;
+    this.conf = config;
+
+    String routerSpecString =
+        this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
+            YarnConfiguration.DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC);
+    String[] specs = routerSpecString.split(",");
+    if (specs.length != 3) {
+      throw new YarnException("Expect three comma separated values in "
+          + YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC + " but get "
+          + routerSpecString);
+    }
+    this.minRouterSuccessCount = Integer.parseInt(specs[0]);
+    this.maxRouterRetry = Integer.parseInt(specs[1]);
+    this.routerQueryIntevalMillis = Long.parseLong(specs[2]);
+
+    if (this.minRouterSuccessCount > this.maxRouterRetry) {
+      throw new YarnException("minRouterSuccessCount "
+          + this.minRouterSuccessCount
+          + " should not be larger than maxRouterRetry" + this.maxRouterRetry);
+    }
+    if (this.minRouterSuccessCount <= 0) {
+      throw new YarnException("minRouterSuccessCount "
+          + this.minRouterSuccessCount + " should be positive");
+    }
+
+    LOG.info(
+        "Initialized AppCleaner with Router query with min success {}, "
+            + "max retry {}, retry interval {}",
+        this.minRouterSuccessCount, this.maxRouterRetry,
+        DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
+  }
+
+  public GPGContext getGPGContext() {
+    return this.gpgContext;
+  }
+
+  /**
+   * Query router for applications.
+   *
+   * @return the set of applications
+   * @throws YarnRuntimeException when router call fails
+   */
+  public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+    String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
+
+    LOG.info(String.format("Contacting router at: %s", webAppAddress));
+    AppsInfo appsInfo = (AppsInfo) GPGUtils.invokeRMWebService(conf,
+        webAppAddress, "apps", AppsInfo.class,
+        DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
+
+    Set<ApplicationId> appSet = new HashSet<ApplicationId>();
+    for (AppInfo appInfo : appsInfo.getApps()) {
+      appSet.add(ApplicationId.fromString(appInfo.getAppId()));
+    }
+    return appSet;
+  }
+
+  /**
+   * Get the list of known applications in the cluster from Router.
+   *
+   * @return the list of known applications
+   * @throws YarnException if get app fails
+   */
+  public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+    int successCount = 0, totalAttemptCount = 0;
+    Set<ApplicationId> resultSet = new HashSet<ApplicationId>();
+    while (totalAttemptCount < this.maxRouterRetry) {
+      try {
+        Set<ApplicationId> routerApps = getAppsFromRouter();
+        resultSet.addAll(routerApps);
+        LOG.info("Attempt {}: {} known apps from Router, {} in total",
+            totalAttemptCount, routerApps.size(), resultSet.size());
+
+        successCount++;
+        if (successCount >= this.minRouterSuccessCount) {
+          return resultSet;
+        }
+
+        // Wait for the next attempt
+        try {
+          Thread.sleep(this.routerQueryIntevalMillis);
+        } catch (InterruptedException e) {
+          LOG.warn("Sleep interrupted after attempt " + totalAttemptCount);
+        }
+      } catch (Exception e) {
+        LOG.warn("Router query attempt " + totalAttemptCount + " failed ", e);
+      } finally {
+        totalAttemptCount++;
+      }
+    }
+    throw new YarnException("Only " + successCount
+        + " success Router queries after " + totalAttemptCount + " retries");
+  }
+
+  @Override
+  public abstract void run();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
new file mode 100644
index 0000000..1ce9840
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+/**
+ * The default ApplicationCleaner that cleans up old applications from table
+ * applicationsHomeSubCluster in FederationStateStore.
+ */
+public class DefaultApplicationCleaner extends ApplicationCleaner {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DefaultApplicationCleaner.class);
+
+  @Override
+  public void run() {
+    Date now = new Date();
+    LOG.info("Application cleaner run at time {}", now);
+
+    FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
+    Set<ApplicationId> candidates = new HashSet<ApplicationId>();
+    try {
+      List<ApplicationHomeSubCluster> response =
+          facade.getApplicationsHomeSubCluster();
+      for (ApplicationHomeSubCluster app : response) {
+        candidates.add(app.getApplicationId());
+      }
+      LOG.info("{} app entries in FederationStateStore", candidates.size());
+
+      Set<ApplicationId> routerApps = getRouterKnownApplications();
+      LOG.info("{} known applications from Router", routerApps.size());
+
+      candidates = Sets.difference(candidates, routerApps);
+      LOG.info("Deleting {} applications from statestore", candidates.size());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Apps to delete: ", candidates.stream().map(Object::toString)
+            .collect(Collectors.joining(",")));
+      }
+      for (ApplicationId appId : candidates) {
+        try {
+          facade.deleteApplicationHomeSubCluster(appId);
+        } catch (Exception e) {
+          LOG.error(
+              "deleteApplicationHomeSubCluster failed at application " + appId,
+              e);
+        }
+      }
+
+    } catch (Throwable e) {
+      LOG.error("Application cleaner started at time " + now + " fails: ", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
new file mode 100644
index 0000000..dd302c8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
@@ -0,0 +1,19 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1facc8a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
new file mode 100644
index 0000000..ec3f64e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for DefaultApplicationCleaner in GPG.
+ */
+public class TestDefaultApplicationCleaner {
+  private Configuration conf;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreFacade facade;
+  private ApplicationCleaner appCleaner;
+  private GPGContext gpgContext;
+
+  private List<ApplicationId> appIds;
+  // The list of applications returned by mocked router
+  private Set<ApplicationId> routerAppIds;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new YarnConfiguration();
+
+    // No Router query retry
+    conf.set(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, "1,1,0");
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(conf);
+
+    facade = FederationStateStoreFacade.getInstance();
+    facade.reinitialize(stateStore, conf);
+
+    gpgContext = new GPGContextImpl();
+    gpgContext.setStateStoreFacade(facade);
+
+    appCleaner = new TestableDefaultApplicationCleaner();
+    appCleaner.init(conf, gpgContext);
+
+    routerAppIds = new HashSet<ApplicationId>();
+
+    appIds = new ArrayList<ApplicationId>();
+    for (int i = 0; i < 3; i++) {
+      ApplicationId appId = ApplicationId.newInstance(0, i);
+      appIds.add(appId);
+
+      SubClusterId subClusterId =
+          SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i));
+
+      stateStore.addApplicationHomeSubCluster(
+          AddApplicationHomeSubClusterRequest.newInstance(
+              ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
+    }
+  }
+
+  @After
+  public void breakDown() throws Exception {
+    if (stateStore != null) {
+      stateStore.close();
+    }
+  }
+
+  @Test
+  public void testFederationStateStoreAppsCleanUp() throws YarnException {
+    // Set first app to be still known by Router
+    ApplicationId appId = appIds.get(0);
+    routerAppIds.add(appId);
+
+    // Another random app not in stateStore known by Router
+    appId = ApplicationId.newInstance(100, 200);
+    routerAppIds.add(appId);
+
+    appCleaner.run();
+
+    // Only one app should be left
+    Assert.assertEquals(1,
+        stateStore
+            .getApplicationsHomeSubCluster(
+                GetApplicationsHomeSubClusterRequest.newInstance())
+            .getAppsHomeSubClusters().size());
+  }
+
+  /**
+   * Testable version of DefaultApplicationCleaner.
+   */
+  public class TestableDefaultApplicationCleaner
+      extends DefaultApplicationCleaner {
+    @Override
+    public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+      return routerAppIds;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to