This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8538af4638b YARN-7599. [BackPort][GPG] ApplicationCleaner in Global 
Policy Generator.  (#5934) Contributed by Botong Huang, Shilun Fan.
8538af4638b is described below

commit 8538af4638bdd4120c2aa0c0e2803a085e5ced74
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Thu Sep 14 21:28:49 2023 +0800

    YARN-7599. [BackPort][GPG] ApplicationCleaner in Global Policy Generator.  
(#5934) Contributed by Botong Huang, Shilun Fan.
    
    Co-authored-by: Botong Huang <bot...@apache.org>
    Co-authored-by: slfan1989 <slfan1...@apache.org>
    Reviewed-by: Inigo Goiri <inigo...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  25 ++++
 .../src/main/resources/yarn-default.xml            |  28 ++++
 .../utils/FederationStateStoreFacade.java          |  30 ++++
 .../server/globalpolicygenerator/GPGUtils.java     |  24 +++-
 .../GlobalPolicyGenerator.java                     |  22 ++-
 .../applicationcleaner/ApplicationCleaner.java     | 153 +++++++++++++++++++++
 .../DefaultApplicationCleaner.java                 |  77 +++++++++++
 .../applicationcleaner/package-info.java           |  19 +++
 .../policygenerator/PolicyGenerator.java           |   4 +-
 .../TestDefaultApplicationCleaner.java             | 131 ++++++++++++++++++
 .../policygenerator/TestPolicyGenerator.java       |   2 +-
 11 files changed, 510 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 874ee9d08d9..ef06299fcfd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4432,6 +4432,31 @@ public class YarnConfiguration extends Configuration {
   public static final String GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY = 
FEDERATION_GPG_PREFIX +
       "kerberos.principal.hostname";
 
+  // The application cleaner class to use
+  public static final String GPG_APPCLEANER_CLASS =
+      FEDERATION_GPG_PREFIX + "application.cleaner.class";
+  public static final String DEFAULT_GPG_APPCLEANER_CLASS =
+      "org.apache.hadoop.yarn.server.globalpolicygenerator"
+          + ".applicationcleaner.DefaultApplicationCleaner";
+
+  // The interval at which the application cleaner runs, -1 means disabled
+  public static final String GPG_APPCLEANER_INTERVAL_MS =
+      FEDERATION_GPG_PREFIX + "application.cleaner.interval-ms";
+  public static final long DEFAULT_GPG_APPCLEANER_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(-1);
+
+  /**
+   * Specifications on how (many times) to contact Router for apps. We need to
+   * do this because Router might return partial application list because some
+   * sub-cluster RM is not responsive (e.g. failing over).
+   *
+   * Should have three values separated by comma: minimal success retries,
+   * maximum total retry, retry interval (ms).
+   */
+  public static final String GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+      FEDERATION_GPG_PREFIX + "application.cleaner.contact.router.spec";
+  public static final String DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+      "3,10,600000";
+
   public static final String FEDERATION_GPG_POLICY_PREFIX =
       FEDERATION_GPG_PREFIX + "policy.generator.";
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f82540b8f46..9697f7aa88c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5538,6 +5538,14 @@
     <value>LINEAR</value>
   </property>
 
+  <property>
+    <description>
+      The Application Cleaner implementation class for GPG to use.
+    </description>
+    <name>yarn.federation.gpg.application.cleaner.class</name>
+    
<value>org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.DefaultApplicationCleaner</value>
+  </property>
+
   <property>
     <description>Flag to enable cross-origin (CORS) support in the GPG. This 
flag
       requires the CORS filter initializer to be added to the filter 
initializers
@@ -5546,6 +5554,14 @@
     <value>false</value>
   </property>
 
+  <property>
+    <description>
+      The interval at which the application cleaner runs, -1 means disabled.
+    </description>
+    <name>yarn.federation.gpg.application.cleaner.interval-ms</name>
+    <value>-1s</value>
+  </property>
+
   <property>
     <description>
       The http address of the GPG web application.
@@ -5556,6 +5572,18 @@
     <value>0.0.0.0:8069</value>
   </property>
 
+  <property>
+    <description>
+      Specifications on how (many times) to contact Router for apps. We need to
+      do this because Router might return partial application list because some
+      sub-cluster RM is not responsive (e.g. failing over).
+      Should have three values separated by comma: minimal success retries,
+      maximum total retry, retry interval (ms).
+    </description>
+    <name>yarn.federation.gpg.application.cleaner.contact.router.spec</name>
+    <value>3,10,600000</value>
+  </property>
+
   <property>
     <description>
       The https address of the GPG web application.
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 26136b11de6..d4c259b5160 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -83,6 +83,9 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRespo
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.slf4j.Logger;
@@ -884,6 +887,33 @@ public final class FederationStateStoreFacade {
     }
   }
 
+  /**
+   * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
+   * all submitted applications to it's home sub-cluster.
+   *
+   * @return the mapping of all submitted application to it's home sub-cluster
+   * @throws YarnException if the request is invalid/fails
+   */
+  public List<ApplicationHomeSubCluster> getApplicationsHomeSubCluster() 
throws YarnException {
+    GetApplicationsHomeSubClusterResponse response = 
stateStore.getApplicationsHomeSubCluster(
+        GetApplicationsHomeSubClusterRequest.newInstance());
+    return response.getAppsHomeSubClusters();
+  }
+
+  /**
+   * Delete the mapping of home {@code SubClusterId} of a previously submitted
+   * {@code ApplicationId}. Currently response is empty if the operation is
+   * successful, if not an exception reporting reason for a failure.
+   *
+   * @param applicationId the application to delete the home sub-cluster of
+   * @throws YarnException if the request is invalid/fails
+   */
+  public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
+      throws YarnException {
+    stateStore.deleteApplicationHomeSubCluster(
+        DeleteApplicationHomeSubClusterRequest.newInstance(applicationId));
+  }
+
   /**
    * Update ApplicationHomeSubCluster to FederationStateStore.
    *
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
index a802e37979b..02344a51493 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
 
 /**
  * GPGUtils contains utility functions for the GPG.
@@ -58,11 +59,12 @@ public final class GPGUtils {
    * @param webAddr WebAddress.
    * @param path url path.
    * @param returnType return type.
+   * @param selectParam query parameters.
    * @param conf configuration.
    * @return response entity.
    */
   public static <T> T invokeRMWebService(String webAddr, String path, final 
Class<T> returnType,
-      Configuration conf) {
+      Configuration conf, String selectParam) {
     Client client = Client.create();
     T obj;
 
@@ -72,6 +74,11 @@ public final class GPGUtils {
     String scheme = YarnConfiguration.useHttps(conf) ? HTTPS_PREFIX : 
HTTP_PREFIX;
     String webAddress = scheme + socketAddress.getHostName() + ":" + 
socketAddress.getPort();
     WebResource webResource = client.resource(webAddress);
+
+    if (selectParam != null) {
+      webResource = webResource.queryParam(RMWSConsts.DESELECTS, selectParam);
+    }
+
     ClientResponse response = null;
     try {
       response = webResource.path(RM_WEB_SERVICE_PATH).path(path)
@@ -92,6 +99,21 @@ public final class GPGUtils {
     }
   }
 
+  /**
+   * Performs an invocation of the remote RMWebService.
+   *
+   * @param <T> Generic T.
+   * @param webAddr WebAddress.
+   * @param path url path.
+   * @param returnType return type.
+   * @param config configuration.
+   * @return response entity.
+   */
+  public static <T> T invokeRMWebService(String webAddr,
+      String path, final Class<T> returnType, Configuration config) {
+    return invokeRMWebService(webAddr, path, returnType, config, null);
+  }
+
   /**
    * Creates a uniform weighting of 1.0 for each sub cluster.
    *
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index 81a999d76a2..ba8ce856cda 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import 
org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
 import 
org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
 import 
org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp;
@@ -84,6 +85,7 @@ public class GlobalPolicyGenerator extends CompositeService {
   // Scheduler service that runs tasks periodically
   private ScheduledThreadPoolExecutor scheduledExecutorService;
   private SubClusterCleaner subClusterCleaner;
+  private ApplicationCleaner applicationCleaner;
   private PolicyGenerator policyGenerator;
   private String webAppAddress;
   private JvmPauseMonitor pauseMonitor;
@@ -125,6 +127,12 @@ public class GlobalPolicyGenerator extends 
CompositeService {
         conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
             YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
     this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+
+    this.applicationCleaner = FederationStateStoreFacade.createInstance(conf,
+        YarnConfiguration.GPG_APPCLEANER_CLASS,
+        YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS, 
ApplicationCleaner.class);
+    this.applicationCleaner.init(conf, this.gpgContext);
+
     this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
 
     this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf);
@@ -149,7 +157,7 @@ public class GlobalPolicyGenerator extends CompositeService 
{
 
     super.serviceStart();
 
-    // Scheduler SubClusterCleaner service
+    // Schedule SubClusterCleaner service
     Configuration config = getConfig();
     long scCleanerIntervalMs = config.getTimeDuration(
         YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
@@ -161,6 +169,18 @@ public class GlobalPolicyGenerator extends 
CompositeService {
           DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
     }
 
+    // Schedule ApplicationCleaner service
+    long appCleanerIntervalMs = config.getTimeDuration(
+        YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
+
+    if (appCleanerIntervalMs > 0) {
+      
this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner,
+          0, appCleanerIntervalMs, TimeUnit.MILLISECONDS);
+      LOG.info("Scheduled application cleaner with interval: {}",
+          DurationFormatUtils.formatDurationISO(appCleanerIntervalMs));
+    }
+
     // Schedule PolicyGenerator
     // We recommend using yarn.federation.gpg.policy.generator.interval
     // instead of yarn.federation.gpg.policy.generator.interval-ms
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
new file mode 100644
index 00000000000..cd3f7618558
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ApplicationCleaner is a runnable that cleans up old applications from
+ * table applicationsHomeSubCluster in FederationStateStore.
+ */
+public abstract class ApplicationCleaner implements Runnable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ApplicationCleaner.class);
+
+  private Configuration conf;
+  private GPGContext gpgContext;
+
+  private int minRouterSuccessCount;
+  private int maxRouterRetry;
+  private long routerQueryIntevalMillis;
+
+  public void init(Configuration config, GPGContext context)
+      throws YarnException {
+
+    this.gpgContext = context;
+    this.conf = config;
+
+    String routerSpecString =
+        this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
+            YarnConfiguration.DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC);
+    String[] specs = routerSpecString.split(",");
+    if (specs.length != 3) {
+      throw new YarnException("Expect three comma separated values in "
+          + YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC + " but get "
+          + routerSpecString);
+    }
+    this.minRouterSuccessCount = Integer.parseInt(specs[0]);
+    this.maxRouterRetry = Integer.parseInt(specs[1]);
+    this.routerQueryIntevalMillis = Long.parseLong(specs[2]);
+
+    if (this.minRouterSuccessCount > this.maxRouterRetry) {
+      throw new YarnException("minRouterSuccessCount "
+          + this.minRouterSuccessCount
+          + " should not be larger than maxRouterRetry" + this.maxRouterRetry);
+    }
+    if (this.minRouterSuccessCount <= 0) {
+      throw new YarnException("minRouterSuccessCount "
+          + this.minRouterSuccessCount + " should be positive");
+    }
+
+    LOG.info(
+        "Initialized AppCleaner with Router query with min success {}, "
+            + "max retry {}, retry interval {}",
+        this.minRouterSuccessCount, this.maxRouterRetry,
+        DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
+  }
+
+  public GPGContext getGPGContext() {
+    return this.gpgContext;
+  }
+
+  /**
+   * Query router for applications.
+   *
+   * @return the set of applications
+   * @throws YarnRuntimeException when router call fails
+   */
+  public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+    String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
+
+    LOG.info(String.format("Contacting router at: %s", webAppAddress));
+    AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", 
AppsInfo.class, conf,
+        DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
+
+    Set<ApplicationId> appSet = new HashSet<>();
+    for (AppInfo appInfo : appsInfo.getApps()) {
+      appSet.add(ApplicationId.fromString(appInfo.getAppId()));
+    }
+    return appSet;
+  }
+
+  /**
+   * Get the list of known applications in the cluster from Router.
+   *
+   * @return the list of known applications
+   * @throws YarnException if get app fails
+   */
+  public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+    int successCount = 0, totalAttemptCount = 0;
+    Set<ApplicationId> resultSet = new HashSet<>();
+    while (totalAttemptCount < this.maxRouterRetry) {
+      try {
+        Set<ApplicationId> routerApps = getAppsFromRouter();
+        resultSet.addAll(routerApps);
+        LOG.info("Attempt {}: {} known apps from Router, {} in total",
+            totalAttemptCount, routerApps.size(), resultSet.size());
+
+        successCount++;
+        if (successCount >= this.minRouterSuccessCount) {
+          return resultSet;
+        }
+
+        // Wait for the next attempt
+        try {
+          Thread.sleep(this.routerQueryIntevalMillis);
+        } catch (InterruptedException e) {
+          LOG.warn("Sleep interrupted after attempt {}.", totalAttemptCount);
+        }
+      } catch (Exception e) {
+        LOG.warn("Router query attempt {} failed.", totalAttemptCount, e);
+      } finally {
+        totalAttemptCount++;
+      }
+    }
+    throw new YarnException("Only " + successCount
+        + " success Router queries after " + totalAttemptCount + " retries");
+  }
+
+  @Override
+  public abstract void run();
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
new file mode 100644
index 00000000000..857d2e645d4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The default ApplicationCleaner that cleans up old applications from table
+ * applicationsHomeSubCluster in FederationStateStore.
+ */
+public class DefaultApplicationCleaner extends ApplicationCleaner {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DefaultApplicationCleaner.class);
+
+  @Override
+  public void run() {
+    Date now = new Date();
+    LOG.info("Application cleaner run at time {}", now);
+
+    FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
+    Set<ApplicationId> candidates = new HashSet<>();
+    try {
+      List<ApplicationHomeSubCluster> response =
+          facade.getApplicationsHomeSubCluster();
+      for (ApplicationHomeSubCluster app : response) {
+        candidates.add(app.getApplicationId());
+      }
+      LOG.info("{} app entries in FederationStateStore", candidates.size());
+
+      Set<ApplicationId> routerApps = getRouterKnownApplications();
+      LOG.info("{} known applications from Router", routerApps.size());
+
+      candidates.removeAll(routerApps);
+      LOG.info("Deleting {} applications from statestore", candidates.size());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Apps to delete: {}.", 
candidates.stream().map(Object::toString)
+            .collect(Collectors.joining(",")));
+      }
+      for (ApplicationId appId : candidates) {
+        try {
+          facade.deleteApplicationHomeSubCluster(appId);
+        } catch (Exception e) {
+          LOG.error("deleteApplicationHomeSubCluster failed at application 
{}.", appId, e);
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Application cleaner started at time {} fails. ", now, e);
+    }
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
new file mode 100644
index 00000000000..dd302c81f45
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
@@ -0,0 +1,19 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
index df28192a0c6..1f0fbd11a74 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
@@ -159,7 +159,7 @@ public class PolicyGenerator implements Runnable, 
Configurable {
           clusterInfo.put(sci.getSubClusterId(), new HashMap<>());
         }
         Object ret = GPGUtils.invokeRMWebService(sci.getRMWebServiceAddress(),
-            e.getValue(), e.getKey(), getConf());
+            e.getValue(), e.getKey(), conf);
         clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
       }
     }
@@ -181,7 +181,7 @@ public class PolicyGenerator implements Runnable, 
Configurable {
     for (SubClusterInfo sci : activeSubClusters.values()) {
       SchedulerTypeInfo sti = GPGUtils
           .invokeRMWebService(sci.getRMWebServiceAddress(),
-              RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, getConf());
+              RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, conf);
       if(sti != null){
         schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
       } else {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
new file mode 100644
index 00000000000..2d63c48236f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for DefaultApplicationCleaner in GPG.
+ */
+public class TestDefaultApplicationCleaner {
+  private Configuration conf;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreFacade facade;
+  private ApplicationCleaner appCleaner;
+  private GPGContext gpgContext;
+
+  private List<ApplicationId> appIds;
+  // The list of applications returned by mocked router
+  private Set<ApplicationId> routerAppIds;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new YarnConfiguration();
+
+    // No Router query retry
+    conf.set(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, "1,1,0");
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(conf);
+
+    facade = FederationStateStoreFacade.getInstance();
+    facade.reinitialize(stateStore, conf);
+
+    gpgContext = new GPGContextImpl();
+    gpgContext.setStateStoreFacade(facade);
+
+    appCleaner = new TestableDefaultApplicationCleaner();
+    appCleaner.init(conf, gpgContext);
+
+    routerAppIds = new HashSet<>();
+
+    appIds = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      ApplicationId appId = ApplicationId.newInstance(0, i);
+      appIds.add(appId);
+
+      SubClusterId subClusterId =
+          SubClusterId.newInstance("SUBCLUSTER-" + i);
+
+      stateStore.addApplicationHomeSubCluster(
+          AddApplicationHomeSubClusterRequest.newInstance(
+              ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
+    }
+  }
+
+  @After
+  public void breakDown() {
+    if (stateStore != null) {
+      stateStore.close();
+      stateStore = null;
+    }
+  }
+
+  @Test
+  public void testFederationStateStoreAppsCleanUp() throws YarnException {
+    // Set first app to be still known by Router
+    ApplicationId appId = appIds.get(0);
+    routerAppIds.add(appId);
+
+    // Another random app not in stateStore known by Router
+    appId = ApplicationId.newInstance(100, 200);
+    routerAppIds.add(appId);
+
+    appCleaner.run();
+
+    // Only one app should be left
+    Assert.assertEquals(1,
+        stateStore
+            .getApplicationsHomeSubCluster(
+                GetApplicationsHomeSubClusterRequest.newInstance())
+            .getAppsHomeSubClusters().size());
+  }
+
+  /**
+   * Testable version of DefaultApplicationCleaner.
+   */
+  public class TestableDefaultApplicationCleaner
+      extends DefaultApplicationCleaner {
+    @Override
+    public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+      return routerAppIds;
+    }
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
index 72e97f8a750..446eeee2cd9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
@@ -299,7 +299,7 @@ public class TestPolicyGenerator {
     String webAppAddress = 
getServiceAddress(NetUtils.createSocketAddr(rmAddress));
 
     SchedulerTypeInfo sti = GPGUtils.invokeRMWebService(webAppAddress, 
RMWSConsts.SCHEDULER,
-        SchedulerTypeInfo.class, this.conf);
+        SchedulerTypeInfo.class, conf);
 
     Assert.assertNotNull(sti);
     SchedulerInfo schedulerInfo = sti.getSchedulerInfo();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to