This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 72d7b43a32c6 YARN-11548. [Federation] Router Supports Format 
FederationStateStore. (#6116) Contributed by Shilun Fan.
72d7b43a32c6 is described below

commit 72d7b43a32c63250d5fb6e3a149846a669c15abe
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Wed Nov 8 07:41:56 2023 +0800

    YARN-11548. [Federation] Router Supports Format FederationStateStore. 
(#6116) Contributed by Shilun Fan.
    
    Reviewed-by: Inigo Goiri <inigo...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../federation/store/FederationStateStore.java     |  7 ++
 .../store/impl/MemoryFederationStateStore.java     | 10 +++
 .../store/impl/SQLFederationStateStore.java        | 36 +++++++++
 .../store/impl/ZookeeperFederationStateStore.java  | 88 +++++++++++++++-------
 .../store/sql/FederationQueryRunner.java           | 33 ++++++++
 .../utils/FederationStateStoreFacade.java          |  4 +
 .../store/impl/FederationStateStoreBaseTest.java   | 24 ++++++
 .../store/impl/TestSQLFederationStateStore.java    |  3 +-
 .../federation/FederationStateStoreService.java    |  5 ++
 .../apache/hadoop/yarn/server/router/Router.java   | 14 +++-
 10 files changed, 191 insertions(+), 33 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java
index e19733967146..31ea6578dd8d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java
@@ -101,4 +101,11 @@ public interface FederationStateStore extends
          ", but loading version " + loadedVersion);
     }
   }
+
+  /**
+   * We will clear the data in stateStore through the deleteStateStore method.
+   *
+   * @throws Exception an exception occurred in delete store.
+   */
+  void deleteStateStore() throws Exception;
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index 05c7c500fb9b..c652d300268b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -419,6 +419,16 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
     version = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
   }
 
+  @Override
+  public void deleteStateStore() throws Exception {
+    membership.clear();
+    applications.clear();
+    reservations.clear();
+    policies.clear();
+    sequenceNum = new AtomicInteger();
+    masterKeyId = new AtomicInteger();
+  }
+
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
       AddReservationHomeSubClusterRequest request) throws YarnException {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
index 76b4e20dcdad..e114271b53ea 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
@@ -29,6 +29,7 @@ import java.sql.Blob;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.List;
 import java.util.TimeZone;
@@ -216,6 +217,10 @@ public class SQLFederationStateStore implements 
FederationStateStore {
   private static final String CALL_SP_LOAD_VERSION =
       "{call sp_getVersion(?, ?)}";
 
+  private static final List<String> TABLES = new ArrayList<>(
+      Arrays.asList("applicationsHomeSubCluster", "membership", "policies", 
"versions",
+      "reservationsHomeSubCluster", "masterKeys", "delegationTokens", 
"sequenceTable"));
+
   private Calendar utcCalendar =
       Calendar.getInstance(TimeZone.getTimeZone("UTC"));
 
@@ -1122,6 +1127,11 @@ public class SQLFederationStateStore implements 
FederationStateStore {
     storeVersion(fedVersion, versionComment);
   }
 
+  @Override
+  public void deleteStateStore() throws Exception {
+    truncateTable();
+  }
+
   /**
    * Store the Federation Version in the database.
    *
@@ -2077,6 +2087,32 @@ public class SQLFederationStateStore implements 
FederationStateStore {
     }
   }
 
+  /**
+   * We will truncate the tables, iterate through each table individually,
+   * and then clean the tables.
+   */
+  private void truncateTable() {
+    Connection connection = null;
+    try {
+      connection = getConnection(false);
+      FederationQueryRunner runner = new FederationQueryRunner();
+      for (String table : TABLES) {
+        LOG.info("truncate table = {} start.", table);
+        runner.truncateTable(connection, table);
+        LOG.info("truncate table = {} finished.", table);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Could not truncate table!", e);
+    } finally {
+      // Return to the pool the CallableStatement
+      try {
+        FederationStateStoreUtils.returnToPool(LOG, null, connection);
+      } catch (YarnException e) {
+        LOG.error("close connection error.", e);
+      }
+    }
+  }
+
   @VisibleForTesting
   public HikariDataSource getDataSource() {
     return dataSource;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
index 8c59b2ad8cf4..4548cf42ca99 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
@@ -226,6 +226,8 @@ public class ZookeeperFederationStateStore implements 
FederationStateStore {
   private ZKFederationStateStoreOpDurations opDurations =
       ZKFederationStateStoreOpDurations.getInstance();
 
+  private Configuration configuration;
+
   /*
    * Indicates different app attempt state store operations.
    */
@@ -251,7 +253,7 @@ public class ZookeeperFederationStateStore implements 
FederationStateStore {
   public void init(Configuration conf) throws YarnException {
 
     LOG.info("Initializing ZooKeeper connection");
-
+    this.configuration = conf;
     maxAppsInStateStore = conf.getInt(
        YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
        YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
@@ -273,13 +275,8 @@ public class ZookeeperFederationStateStore implements 
FederationStateStore {
     reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);
     versionNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_VERSION);
 
-    String hierarchiesPath = getNodePath(appsZNode, 
ROUTER_APP_ROOT_HIERARCHIES);
-    routerAppRootHierarchies = new HashMap<>();
-    routerAppRootHierarchies.put(0, appsZNode);
-    for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
-      routerAppRootHierarchies.put(splitIndex,
-          getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
-    }
+    // Initialize hierarchical path
+    initHierarchiesPath();
 
     appIdNodeSplitIndex = 
conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
          YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
@@ -302,26 +299,7 @@ public class ZookeeperFederationStateStore implements 
FederationStateStore {
         ROUTER_RM_DT_MASTER_KEY_ID_ZNODE_NAME);
 
     // Create base znode for each entity
-    try {
-      List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
-      zkManager.createRootDirRecursively(membershipZNode, zkAcl);
-      zkManager.createRootDirRecursively(appsZNode, zkAcl);
-      zkManager.createRootDirRecursively(
-          getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES));
-      for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
-        zkManager.createRootDirRecursively(
-            routerAppRootHierarchies.get(splitIndex));
-      }
-      zkManager.createRootDirRecursively(policiesZNode, zkAcl);
-      zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
-      zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
-      zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl);
-      zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, 
zkAcl);
-      zkManager.createRootDirRecursively(versionNode, zkAcl);
-    } catch (Exception e) {
-      String errMsg = "Cannot create base directories: " + e.getMessage();
-      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
-    }
+    createBaseZNodeForEachEntity();
 
     // Distributed sequenceNum.
     try {
@@ -831,6 +809,60 @@ public class ZookeeperFederationStateStore implements 
FederationStateStore {
     put(versionNode, data, isUpdate);
   }
 
+  private void initHierarchiesPath() {
+    String hierarchiesPath = getNodePath(appsZNode, 
ROUTER_APP_ROOT_HIERARCHIES);
+    routerAppRootHierarchies = new HashMap<>();
+    routerAppRootHierarchies.put(0, appsZNode);
+    for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
+      routerAppRootHierarchies.put(splitIndex,
+          getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
+    }
+  }
+
+  private void createBaseZNodeForEachEntity() throws YarnException {
+    try {
+      List<ACL> zkAcl = ZKCuratorManager.getZKAcls(configuration);
+      zkManager.createRootDirRecursively(membershipZNode, zkAcl);
+      zkManager.createRootDirRecursively(appsZNode, zkAcl);
+      zkManager.createRootDirRecursively(
+          getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES));
+      for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
+        zkManager.createRootDirRecursively(
+            routerAppRootHierarchies.get(splitIndex));
+      }
+      zkManager.createRootDirRecursively(policiesZNode, zkAcl);
+      zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
+      zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
+      zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl);
+      zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, 
zkAcl);
+      zkManager.createRootDirRecursively(versionNode, zkAcl);
+    } catch (Exception e) {
+      String errMsg = "Cannot create base directories: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+  }
+
+  @Override
+  public void deleteStateStore() throws Exception {
+
+    // Cleaning ZNodes and their child nodes;
+    // after the cleaning is complete, the ZNodes will no longer exist.
+    zkManager.delete(appsZNode);
+    zkManager.delete(membershipZNode);
+    zkManager.delete(policiesZNode);
+    zkManager.delete(reservationsZNode);
+    zkManager.delete(routerRMDTSecretManagerRoot);
+    zkManager.delete(routerRMDTMasterKeysRootPath);
+    zkManager.delete(routerRMDelegationTokensRootPath);
+    zkManager.delete(versionNode);
+
+    // Initialize hierarchical path
+    initHierarchiesPath();
+
+    // We will continue to create ZNodes to ensure that the base path exists.
+    createBaseZNodeForEachEntity();
+  }
+
   /**
    * Get the subcluster for an application.
    *
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java
index 0cca531d0d9c..99ab7b2e1f71 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java
@@ -294,6 +294,39 @@ public class FederationQueryRunner {
     }
   }
 
+  public void truncateTable(Connection connection, String tableName)
+      throws SQLException {
+    DbType dbType = DatabaseProduct.getDbType(connection);
+    String deleteSQL = getTruncateStatement(dbType, tableName);
+    boolean committed = false;
+    Statement statement = null;
+    try {
+      statement = connection.createStatement();
+      statement.execute(deleteSQL);
+      connection.commit();
+      committed = true;
+    } catch (SQLException e) {
+      throw new SQLException("Unable to truncateTable due to: " + 
e.getMessage());
+    } finally {
+      if (!committed) {
+        rollbackDBConn(connection);
+      }
+      close(statement);
+    }
+  }
+
+  private String getTruncateStatement(DbType dbType, String tableName) {
+    if (isMYSQL(dbType)) {
+      return ("DELETE FROM \"" + tableName + "\"");
+    } else {
+      return("DELETE FROM " + tableName);
+    }
+  }
+
+  private boolean isMYSQL(DbType dbType) {
+    return dbType == DbType.MYSQL;
+  }
+
   static void rollbackDBConn(Connection dbConn) {
     try {
       if (dbConn != null && !dbConn.isClosed()) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index d4c259b51605..aaa475d68983 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -1118,4 +1118,8 @@ public final class FederationStateStoreFacade {
   public FederationCache getFederationCache() {
     return federationCache;
   }
+
+  public void deleteStore() throws Exception {
+    stateStore.deleteStateStore();
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index 626f4637f568..14107e6622b1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -91,6 +91,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /**
  * Base class for FederationMembershipStateStore implementations.
@@ -120,6 +121,7 @@ public abstract class FederationStateStoreBaseTest {
 
   @After
   public void after() throws Exception {
+    testDeleteStateStore();
     stateStore.close();
   }
 
@@ -1112,4 +1114,26 @@ public abstract class FederationStateStoreBaseTest {
     assertEquals(subClusterId, applicationHomeSubCluster.getHomeSubCluster());
     assertEquals(context, 
applicationHomeSubCluster.getApplicationSubmissionContext());
   }
+
+  public void testDeleteStateStore() throws Exception {
+    // Step1. We clean the StateStore.
+    FederationStateStore federationStateStore = this.getStateStore();
+    federationStateStore.deleteStateStore();
+
+    // Step2. When we query the sub-cluster information, it should not exist.
+    GetSubClustersInfoRequest request = 
GetSubClustersInfoRequest.newInstance(true);
+    List<SubClusterInfo> subClustersActive = 
stateStore.getSubClusters(request).getSubClusters();
+    assertNotNull(subClustersActive);
+    assertEquals(0, subClustersActive.size());
+
+    // Step3. When we query the applications' information, it should not exist.
+    GetApplicationsHomeSubClusterRequest getRequest =
+        GetApplicationsHomeSubClusterRequest.newInstance();
+    GetApplicationsHomeSubClusterResponse result =
+        stateStore.getApplicationsHomeSubCluster(getRequest);
+    assertNotNull(result);
+    List<ApplicationHomeSubCluster> appsHomeSubClusters = 
result.getAppsHomeSubClusters();
+    assertNotNull(appsHomeSubClusters);
+    assertEquals(0, appsHomeSubClusters.size());
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
index a1071ea0901f..e5031b759e6d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
@@ -106,6 +106,7 @@ public class TestSQLFederationStateStore extends 
FederationStateStoreBaseTest {
     conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL,
         DATABASE_URL + System.currentTimeMillis());
     conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
+    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS, 
10);
     super.setConf(conf);
     sqlFederationStateStore = new HSQLDBFederationStateStore();
     return sqlFederationStateStore;
@@ -647,6 +648,6 @@ public class TestSQLFederationStateStore extends 
FederationStateStoreBaseTest {
     assertEquals(10000, connTimeOut);
     assertEquals("YARN-Federation-DataBasePool", poolName);
     assertEquals(1, minimumIdle);
-    assertEquals(1, maximumPoolSize);
+    assertEquals(10, maximumPoolSize);
   }
 }
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
index d71a7f45e031..60586c7a6459 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
@@ -287,6 +287,11 @@ public class FederationStateStoreService extends 
AbstractService
     stateStoreClient.checkVersion();
   }
 
+  @Override
+  public void deleteStateStore() throws Exception {
+    stateStoreClient.deleteStateStore();
+  }
+
   @Override
   public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
       GetSubClusterPolicyConfigurationRequest request) throws YarnException {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index e4defc308dd7..bb7bfd55413f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -373,9 +373,15 @@ public class Router extends CompositeService {
     LOG.info("Application is deleted from state store");
   }
 
-  private static void handFormatStateStore() {
-    // TODO: YARN-11548. [Federation] Router Supports Format 
FederationStateStore.
-    System.err.println("format-state-store is not yet supported.");
+  private static void handFormatStateStore(Configuration conf) {
+    try {
+      System.out.println("Deleting Federation state store.");
+      FederationStateStoreFacade facade = 
FederationStateStoreFacade.getInstance(conf);
+      System.out.println("Federation state store has been cleaned.");
+      facade.deleteStore();
+    } catch (Exception e) {
+      System.err.println("Delete Federation state store error, exception = " + 
e);
+    }
   }
 
   private static void handRemoveApplicationFromStateStore(Configuration conf,
@@ -409,7 +415,7 @@ public class Router extends CompositeService {
       CommandLine cliParser = new DefaultParser().parse(opts, args);
 
       if (CMD_FORMAT_STATE_STORE.equals(cmd)) {
-        handFormatStateStore();
+        handFormatStateStore(conf);
       } else if (CMD_REMOVE_APPLICATION_FROM_STATE_STORE.equals(cmd)) {
         if (cliParser.hasOption(removeApplicationFromStateStoreOpt)) {
           String applicationId = 
cliParser.getOptionValue(removeApplicationFromStateStoreOpt);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to