This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3de66f5c4046 YARN-11547. [Federation] Router Supports Remove
individual application records from FederationStateStore. (#6055)
3de66f5c4046 is described below
commit 3de66f5c4046cf9e44c1f6b14779bc4f6442e92f
Author: slfan1989 <[email protected]>
AuthorDate: Tue Sep 26 04:52:57 2023 +0800
YARN-11547. [Federation] Router Supports Remove individual application
records from FederationStateStore. (#6055)
---
.../apache/hadoop/yarn/server/router/Router.java | 96 ++++++++++++++++++----
.../server/router/TestRouterStoreCommands.java | 77 +++++++++++++++++
2 files changed, 159 insertions(+), 14 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index 4761866253ff..e4defc308dd7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -28,6 +28,12 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@@ -37,14 +43,16 @@ import
org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.router.cleaner.SubClusterCleaner;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
@@ -103,6 +111,9 @@ public class Router extends CompositeService {
protected String webAppAddress;
private static long clusterTimeStamp = System.currentTimeMillis();
private FedAppReportFetcher fetcher = null;
+ private static final String CMD_FORMAT_STATE_STORE = "-format-state-store";
+ private static final String CMD_REMOVE_APPLICATION_FROM_STATE_STORE =
+ "-remove-application-from-state-store";
/**
* Priority of the Router shutdown hook.
@@ -191,7 +202,7 @@ public class Router extends CompositeService {
}
protected void shutDown() {
- new Thread(() -> Router.this.stop()).start();
+ new Thread(Router.this::stop).start();
}
protected RouterClientRMService createClientRMProxyService() {
@@ -292,24 +303,14 @@ public class Router extends CompositeService {
public static void main(String[] argv) {
Configuration conf = new YarnConfiguration();
- Thread
- .setDefaultUncaughtExceptionHandler(new
YarnUncaughtExceptionHandler());
+ Thread.setDefaultUncaughtExceptionHandler(new
YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(Router.class, argv, LOG);
Router router = new Router();
try {
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
if (argv.length > 1) {
- if (argv[0].equals("-format-state-store")) {
- // TODO: YARN-11548. [Federation] Router Supports Format
FederationStateStore.
- System.err.println("format-state-store is not yet supported.");
- } else if (argv[0].equals("-remove-application-from-state-store") &&
argv.length == 2) {
- // TODO: YARN-11547. [Federation]
- // Router Supports Remove individual application records from
FederationStateStore.
- System.err.println("remove-application-from-state-store is not yet
supported.");
- } else {
- printUsage(System.err);
- }
+ executeRouterCommand(conf, argv);
} else {
// Remove the old hook if we are rebooting.
if (null != routerShutdownHook) {
@@ -362,6 +363,73 @@ public class Router extends CompositeService {
return fetcher;
}
+ @VisibleForTesting
+ public static void removeApplication(Configuration conf, String
applicationId)
+ throws Exception {
+ FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance(conf);
+ ApplicationId removeAppId = ApplicationId.fromString(applicationId);
+ LOG.info("Deleting application {} from state store.", removeAppId);
+ facade.deleteApplicationHomeSubCluster(removeAppId);
+ LOG.info("Application is deleted from state store");
+ }
+
+ private static void handFormatStateStore() {
+ // TODO: YARN-11548. [Federation] Router Supports Format
FederationStateStore.
+ System.err.println("format-state-store is not yet supported.");
+ }
+
+ private static void handRemoveApplicationFromStateStore(Configuration conf,
+ String applicationId) {
+ try {
+ removeApplication(conf, applicationId);
+ System.out.println("Application " + applicationId + " is deleted from
state store");
+ } catch (Exception e) {
+ System.err.println("Application " + applicationId + " error, exception =
" + e);
+ }
+ }
+
+ private static void executeRouterCommand(Configuration conf, String[] args) {
+ // Step1. Define Options.
+ Options opts = new Options();
+ Option formatStateStoreOpt = new Option("format-state-store", false,
+ " Formats the FederationStateStore. " +
+ "This will clear the FederationStateStore and " +
+ "is useful if past applications are no longer needed. " +
+ "This should be run only when the Router is not running.");
+ Option removeApplicationFromStateStoreOpt = new
Option("remove-application-from-state-store",
+ false, " Remove the application from FederationStateStore. " +
+ " This should be run only when the Router is not running. ");
+ opts.addOption(formatStateStoreOpt);
+ opts.addOption(removeApplicationFromStateStoreOpt);
+
+ // Step2. Parse Options.
+ try {
+ String cmd = args[0];
+
+ CommandLine cliParser = new DefaultParser().parse(opts, args);
+
+ if (CMD_FORMAT_STATE_STORE.equals(cmd)) {
+ handFormatStateStore();
+ } else if (CMD_REMOVE_APPLICATION_FROM_STATE_STORE.equals(cmd)) {
+ if (cliParser.hasOption(removeApplicationFromStateStoreOpt)) {
+ String applicationId =
cliParser.getOptionValue(removeApplicationFromStateStoreOpt);
+ handRemoveApplicationFromStateStore(conf, applicationId);
+ } else {
+ System.err.println("remove-application-from-state-store requires
application arg.");
+ }
+ } else {
+ System.out.println("No related commands found.");
+ printUsage(System.err);
+ }
+ } catch (MissingArgumentException ex) {
+ System.out.println("Missing argument for options.");
+ printUsage(System.err);
+ } catch (ParseException e) {
+ System.out.println("Parsing of a command-line error.");
+ printUsage(System.err);
+ }
+ }
+
private static void printUsage(PrintStream out) {
out.println("Usage: yarn router [-format-state-store] | " +
"[-remove-application-from-state-store <appId>]");
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java
new file mode 100644
index 000000000000..04007ca88dfb
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRouterStoreCommands {
+
+ ////////////////////////////////
+ // Router Constants
+ ////////////////////////////////
+ private Configuration conf;
+ private MemoryFederationStateStore stateStore;
+ private FederationStateStoreFacade facade;
+
+ @Before
+ public void setup() throws YarnException {
+ conf = new YarnConfiguration();
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+ facade = FederationStateStoreFacade.getInstance(conf);
+ facade.reinitialize(stateStore, conf);
+ }
+
+ @Test
+ public void testRemoveApplicationFromRouterStateStore() throws Exception {
+
+ // We will design such a unit test.
+ // We will write the applicationId and subCluster into the stateStore,
+ // and then remove the application through Router.removeApplication.
+ // At this time, if we continue to query through the stateStore,
+ // We will get a prompt that application not exists.
+
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ SubClusterId homeSubCluster = SubClusterId.newInstance("SC-1");
+ ApplicationHomeSubCluster applicationHomeSubCluster =
+ ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
+ AddApplicationHomeSubClusterRequest request =
+
AddApplicationHomeSubClusterRequest.newInstance(applicationHomeSubCluster);
+ stateStore.addApplicationHomeSubCluster(request);
+ Router.removeApplication(conf, appId.toString());
+
+ GetApplicationHomeSubClusterRequest request1 =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+ LambdaTestUtils.intercept(YarnException.class, "Application " + appId + "
does not exist.",
+ () -> stateStore.getApplicationHomeSubCluster(request1));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]