This is an automated email from the ASF dual-hosted git repository.
keepromise pushed a commit to branch HDFS-17531
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-17531 by this push:
new d8768cff46c HDFS-17659. [ARR]Router Quota supports asynchronous rpc.
(#7157). Contributed by hfutatzhanghb.
d8768cff46c is described below
commit d8768cff46ca5a7a0fca4139a4b720cd22ed1f91
Author: hfutatzhanghb <[email protected]>
AuthorDate: Mon Nov 25 14:41:24 2024 +0800
HDFS-17659. [ARR]Router Quota supports asynchronous rpc. (#7157).
Contributed by hfutatzhanghb.
Reviewed-by: Jian Zhang <[email protected]>
---
.../hdfs/server/federation/router/AsyncQuota.java | 87 +++++++++++
.../hdfs/server/federation/router/Quota.java | 6 +-
.../router/RouterQuotaUpdateService.java | 10 ++
.../federation/router/TestRouterAsyncQuota.java | 166 +++++++++++++++++++++
4 files changed, 266 insertions(+), 3 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java
new file mode 100644
index 00000000000..5d76171a548
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionException;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+public class AsyncQuota extends Quota {
+
+ /** RPC server to receive client calls. */
+ private final RouterRpcServer rpcServer;
+ /** RPC clients to connect to the Namenodes. */
+ private final RouterRpcClient rpcClient;
+ private final Router router;
+
+ public AsyncQuota(Router router, RouterRpcServer server) {
+ super(router, server);
+ this.router = router;
+ this.rpcServer = server;
+ this.rpcClient = this.rpcServer.getRPCClient();
+ }
+
+ /**
+ * Async get aggregated quota usage for the federation path.
+ * @param path Federation path.
+ * @return Aggregated quota.
+ * @throws IOException If the quota system is disabled.
+ */
+ public QuotaUsage getQuotaUsage(String path) throws IOException {
+ getEachQuotaUsage(path);
+
+ asyncApply(o -> {
+ Map<RemoteLocation, QuotaUsage> results = (Map<RemoteLocation,
QuotaUsage>) o;
+ try {
+ return aggregateQuota(path, results);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
+ return asyncReturn(QuotaUsage.class);
+ }
+
+ /**
+ * Get quota usage for the federation path.
+ * @param path Federation path.
+ * @return quota usage for each remote location.
+ * @throws IOException If the quota system is disabled.
+ */
+ Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+ if (!router.isQuotaEnabled()) {
+ throw new IOException("The quota system is disabled in Router.");
+ }
+
+ final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path);
+ RemoteMethod method = new RemoteMethod("getQuotaUsage",
+ new Class<?>[] {String.class}, new RemoteParam());
+ rpcClient.invokeConcurrent(
+ quotaLocs, method, true, false, QuotaUsage.class);
+ return asyncReturn(Map.class);
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
index e19e51b5733..f28af6afa7b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
@@ -213,9 +213,9 @@ public class Quota {
* method will do some additional filtering.
* @param path Federation path.
* @return List of valid quota remote locations.
- * @throws IOException
+ * @throws IOException If the location for this path cannot be determined.
*/
- private List<RemoteLocation> getValidQuotaLocations(String path)
+ protected List<RemoteLocation> getValidQuotaLocations(String path)
throws IOException {
final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
@@ -359,7 +359,7 @@ public class Quota {
* federation path.
* @param path Federation path.
* @return List of quota remote locations.
- * @throws IOException
+ * @throws IOException If the location for this path cannot be determined.
*/
private List<RemoteLocation> getQuotaRemoteLocations(String path)
throws IOException {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
index e9b780d5bca..235190d2a48 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
@@ -41,6 +41,8 @@ import
org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+
/**
* Service to periodically update the {@link RouterQuotaUsage}
* cached information in the {@link Router}.
@@ -99,6 +101,9 @@ public class RouterQuotaUpdateService extends
PeriodicService {
// This is because mount table does not have mtime.
// For other mount entry get current quota usage
HdfsFileStatus ret = this.rpcServer.getFileInfo(src);
+ if (rpcServer.isAsync()) {
+ ret = syncReturn(HdfsFileStatus.class);
+ }
if (ret == null || ret.getModificationTime() == 0) {
long[] zeroConsume = new long[StorageType.values().length];
currentQuotaUsage =
@@ -113,6 +118,9 @@ public class RouterQuotaUpdateService extends
PeriodicService {
Quota quotaModule = this.rpcServer.getQuotaModule();
Map<RemoteLocation, QuotaUsage> usageMap =
quotaModule.getEachQuotaUsage(src);
+ if (this.rpcServer.isAsync()) {
+ usageMap = (Map<RemoteLocation,
QuotaUsage>)syncReturn(Map.class);
+ }
currentQuotaUsage = quotaModule.aggregateQuota(src, usageMap);
remoteQuotaUsage.putAll(usageMap);
} catch (IOException ioe) {
@@ -136,6 +144,8 @@ public class RouterQuotaUpdateService extends
PeriodicService {
}
} catch (IOException e) {
LOG.error("Quota cache updated error.", e);
+ } catch (Exception e) {
+ LOG.error(e.toString());
}
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java
new file mode 100644
index 00000000000..0b1eeeec0be
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.ipc.CallerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
+import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertTrue;
+
+public class TestRouterAsyncQuota {
+ private static Configuration routerConf;
+ /** Federated HDFS cluster. */
+ private static MiniRouterDFSCluster cluster;
+ private static String ns0;
+
+ /** Random Router for this federated cluster. */
+ private MiniRouterDFSCluster.RouterContext router;
+ private FileSystem routerFs;
+ private RouterRpcServer routerRpcServer;
+ private AsyncQuota asyncQuota;
+
+ private final String testfilePath = "/testdir/testAsyncQuota.file";
+
+ @BeforeClass
+ public static void setUpCluster() throws Exception {
+ cluster = new MiniRouterDFSCluster(true, 1, 2,
+ DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
+ cluster.setNumDatanodesPerNameservice(3);
+ cluster.setRacks(
+ new String[] {"/rack1", "/rack2", "/rack3"});
+ cluster.startCluster();
+
+ // Making one Namenode active per nameservice
+ if (cluster.isHighAvailability()) {
+ for (String ns : cluster.getNameservices()) {
+ cluster.switchToActive(ns, NAMENODES[0]);
+ cluster.switchToStandby(ns, NAMENODES[1]);
+ }
+ }
+ // Start routers with only an RPC service
+ routerConf = new RouterConfigBuilder()
+ .rpc()
+ .quota(true)
+ .build();
+
+ // Reduce the number of RPC clients threads to overload the Router easy
+ routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
+ routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
+ routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+ // We decrease the DN cache times to make the test faster
+ routerConf.setTimeDuration(
+ RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+ routerConf.setBoolean(DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY, true);
+ cluster.addRouterOverrides(routerConf);
+ // Start routers with only an RPC service
+ cluster.startRouters();
+
+ // Register and verify all NNs with all routers
+ cluster.registerNamenodes();
+ cluster.waitNamenodeRegistration();
+ cluster.waitActiveNamespaces();
+ ns0 = cluster.getNameservices().get(0);
+ }
+
+ @AfterClass
+ public static void shutdownCluster() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ router = cluster.getRandomRouter();
+ routerFs = router.getFileSystem();
+ routerRpcServer = router.getRouterRpcServer();
+ routerRpcServer.initAsyncThreadPool();
+ RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
+ routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
+ routerRpcServer.getRPCMonitor(),
+ routerRpcServer.getRouterStateIdContext());
+ RouterRpcServer spy = Mockito.spy(routerRpcServer);
+ Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
+ asyncQuota = new AsyncQuota(router.getRouter(), spy);
+
+ // Create mock locations
+ MockResolver resolver = (MockResolver)
router.getRouter().getSubclusterResolver();
+ resolver.addLocation("/", ns0, "/");
+ FsPermission permission = new FsPermission("705");
+ routerFs.mkdirs(new Path("/testdir"), permission);
+ FSDataOutputStream fsDataOutputStream = routerFs.create(
+ new Path(testfilePath), true);
+ fsDataOutputStream.write(new byte[1024]);
+ fsDataOutputStream.close();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ // clear client context
+ CallerContext.setCurrent(null);
+ boolean delete = routerFs.delete(new Path("/testdir"));
+ assertTrue(delete);
+ if (routerFs != null) {
+ routerFs.close();
+ }
+ }
+
+ @Test
+ public void testRouterAsyncGetQuotaUsage() throws Exception {
+ asyncQuota.getQuotaUsage("/testdir");
+ QuotaUsage quotaUsage = syncReturn(QuotaUsage.class);
+ // 3-replication.
+ Assert.assertEquals(3 * 1024, quotaUsage.getSpaceConsumed());
+ // We have one directory and one file.
+ Assert.assertEquals(2, quotaUsage.getFileAndDirectoryCount());
+ }
+
+ @Test
+ public void testRouterAsyncSetQuotaUsage() throws Exception {
+ asyncQuota.setQuota("/testdir", Long.MAX_VALUE, 8096, StorageType.DISK,
false);
+ syncReturn(void.class);
+ asyncQuota.getQuotaUsage("/testdir");
+ QuotaUsage quotaUsage = syncReturn(QuotaUsage.class);
+ Assert.assertEquals(8096, quotaUsage.getTypeQuota(StorageType.DISK));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]