This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3de66f5c4046 YARN-11547. [Federation] Router Supports Remove 
individual application records from FederationStateStore. (#6055)
3de66f5c4046 is described below

commit 3de66f5c4046cf9e44c1f6b14779bc4f6442e92f
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Tue Sep 26 04:52:57 2023 +0800

    YARN-11547. [Federation] Router Supports Remove individual application 
records from FederationStateStore. (#6055)
---
 .../apache/hadoop/yarn/server/router/Router.java   | 96 ++++++++++++++++++----
 .../server/router/TestRouterStoreCommands.java     | 77 +++++++++++++++++
 2 files changed, 159 insertions(+), 14 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index 4761866253ff..e4defc308dd7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -28,6 +28,12 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.MissingArgumentException;
 import org.apache.commons.lang.time.DurationFormatUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
@@ -37,14 +43,16 @@ import 
org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
 import org.apache.hadoop.yarn.server.router.cleaner.SubClusterCleaner;
 import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
@@ -103,6 +111,9 @@ public class Router extends CompositeService {
   protected String webAppAddress;
   private static long clusterTimeStamp = System.currentTimeMillis();
   private FedAppReportFetcher fetcher = null;
+  private static final String CMD_FORMAT_STATE_STORE = "-format-state-store";
+  private static final String CMD_REMOVE_APPLICATION_FROM_STATE_STORE =
+      "-remove-application-from-state-store";
 
   /**
    * Priority of the Router shutdown hook.
@@ -191,7 +202,7 @@ public class Router extends CompositeService {
   }
 
   protected void shutDown() {
-    new Thread(() -> Router.this.stop()).start();
+    new Thread(Router.this::stop).start();
   }
 
   protected RouterClientRMService createClientRMProxyService() {
@@ -292,24 +303,14 @@ public class Router extends CompositeService {
 
   public static void main(String[] argv) {
     Configuration conf = new YarnConfiguration();
-    Thread
-        .setDefaultUncaughtExceptionHandler(new 
YarnUncaughtExceptionHandler());
+    Thread.setDefaultUncaughtExceptionHandler(new 
YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(Router.class, argv, LOG);
     Router router = new Router();
     try {
       GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
       argv = hParser.getRemainingArgs();
       if (argv.length > 1) {
-        if (argv[0].equals("-format-state-store")) {
-          // TODO: YARN-11548. [Federation] Router Supports Format 
FederationStateStore.
-          System.err.println("format-state-store is not yet supported.");
-        } else if (argv[0].equals("-remove-application-from-state-store") && 
argv.length == 2) {
-          // TODO: YARN-11547. [Federation]
-          //  Router Supports Remove individual application records from 
FederationStateStore.
-          System.err.println("remove-application-from-state-store is not yet 
supported.");
-        } else {
-          printUsage(System.err);
-        }
+        executeRouterCommand(conf, argv);
       } else {
         // Remove the old hook if we are rebooting.
         if (null != routerShutdownHook) {
@@ -362,6 +363,73 @@ public class Router extends CompositeService {
     return fetcher;
   }
 
+  @VisibleForTesting
+  public static void removeApplication(Configuration conf, String 
applicationId)
+      throws Exception {
+    FederationStateStoreFacade facade = 
FederationStateStoreFacade.getInstance(conf);
+    ApplicationId removeAppId = ApplicationId.fromString(applicationId);
+    LOG.info("Deleting application {} from state store.", removeAppId);
+    facade.deleteApplicationHomeSubCluster(removeAppId);
+    LOG.info("Application is deleted from state store");
+  }
+
+  private static void handFormatStateStore() {
+    // TODO: YARN-11548. [Federation] Router Supports Format 
FederationStateStore.
+    System.err.println("format-state-store is not yet supported.");
+  }
+
+  private static void handRemoveApplicationFromStateStore(Configuration conf,
+      String applicationId) {
+    try {
+      removeApplication(conf, applicationId);
+      System.out.println("Application " + applicationId + " is deleted from 
state store");
+    } catch (Exception e) {
+      System.err.println("Application " + applicationId + " error, exception = 
" + e);
+    }
+  }
+
+  private static void executeRouterCommand(Configuration conf, String[] args) {
+    // Step1. Define Options.
+    Options opts = new Options();
+    Option formatStateStoreOpt = new Option("format-state-store",  false,
+        " Formats the FederationStateStore. " +
+        "This will clear the FederationStateStore and " +
+        "is useful if past applications are no longer needed. " +
+        "This should be run only when the Router is not running.");
+    Option removeApplicationFromStateStoreOpt = new 
Option("remove-application-from-state-store",
+        false, " Remove the application from FederationStateStore. " +
+         " This should be run only when the Router is not running. ");
+    opts.addOption(formatStateStoreOpt);
+    opts.addOption(removeApplicationFromStateStoreOpt);
+
+    // Step2. Parse Options.
+    try {
+      String cmd = args[0];
+
+      CommandLine cliParser = new DefaultParser().parse(opts, args);
+
+      if (CMD_FORMAT_STATE_STORE.equals(cmd)) {
+        handFormatStateStore();
+      } else if (CMD_REMOVE_APPLICATION_FROM_STATE_STORE.equals(cmd)) {
+        if (cliParser.hasOption(removeApplicationFromStateStoreOpt)) {
+          String applicationId = 
cliParser.getOptionValue(removeApplicationFromStateStoreOpt);
+          handRemoveApplicationFromStateStore(conf, applicationId);
+        } else {
+          System.err.println("remove-application-from-state-store requires 
application arg.");
+        }
+      } else {
+        System.out.println("No related commands found.");
+        printUsage(System.err);
+      }
+    } catch (MissingArgumentException ex) {
+      System.out.println("Missing argument for options.");
+      printUsage(System.err);
+    } catch (ParseException e) {
+      System.out.println("Parsing of a command-line error.");
+      printUsage(System.err);
+    }
+  }
+
   private static void printUsage(PrintStream out) {
     out.println("Usage: yarn router [-format-state-store] | " +
         "[-remove-application-from-state-store <appId>]");
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java
new file mode 100644
index 000000000000..04007ca88dfb
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRouterStoreCommands {
+
+  ////////////////////////////////
+  // Router Constants
+  ////////////////////////////////
+  private Configuration conf;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreFacade facade;
+
+  @Before
+  public void setup() throws YarnException {
+    conf = new YarnConfiguration();
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(conf);
+    facade = FederationStateStoreFacade.getInstance(conf);
+    facade.reinitialize(stateStore, conf);
+  }
+
+  @Test
+  public void testRemoveApplicationFromRouterStateStore() throws Exception {
+
+    // We will design such a unit test.
+    // We will write the applicationId and subCluster into the stateStore,
+    // and then remove the application through Router.removeApplication.
+    // At this time, if we continue to query through the stateStore,
+    // We will get a prompt that application not exists.
+
+    ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+    SubClusterId homeSubCluster = SubClusterId.newInstance("SC-1");
+    ApplicationHomeSubCluster applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
+    AddApplicationHomeSubClusterRequest request =
+        
AddApplicationHomeSubClusterRequest.newInstance(applicationHomeSubCluster);
+    stateStore.addApplicationHomeSubCluster(request);
+    Router.removeApplication(conf, appId.toString());
+
+    GetApplicationHomeSubClusterRequest request1 =
+        GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+    LambdaTestUtils.intercept(YarnException.class, "Application " + appId + " 
does not exist.",
+        () -> stateStore.getApplicationHomeSubCluster(request1));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to