This is an automated email from the ASF dual-hosted git repository.
keepromise pushed a commit to branch HDFS-17531
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-17531 by this push:
new 33c661bd180 HDFS-17656. [ARR] RouterNamenodeProtocol and
RouterUserProtocol supports asynchronous rpc. (#7159). Contributed by Jian
Zhang.
33c661bd180 is described below
commit 33c661bd1804b0112b1deb58f15fa43b9ac1dbb8
Author: Jian Zhang <[email protected]>
AuthorDate: Mon Nov 25 14:35:41 2024 +0800
HDFS-17656. [ARR] RouterNamenodeProtocol and RouterUserProtocol supports
asynchronous rpc. (#7159). Contributed by Jian Zhang.
Reviewed-by: Jian Zhang <[email protected]>
Signed-off-by: Jian Zhang <[email protected]>
---
.../async/RouterAsyncNamenodeProtocol.java | 198 +++++++++++++++++++++
.../federation/async/RouterAsyncUserProtocol.java | 129 ++++++++++++++
.../hdfs/server/federation/async/package-info.java | 31 ++++
.../federation/router/RouterAdminServer.java | 6 +-
.../server/federation/router/RouterRpcServer.java | 24 +--
.../async/RouterAsyncProtocolTestBase.java | 164 +++++++++++++++++
.../async/TestRouterAsyncNamenodeProtocol.java | 126 +++++++++++++
.../async/TestRouterAsyncUserProtocol.java | 48 +++++
8 files changed, 713 insertions(+), 13 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java
new file mode 100644
index 00000000000..fe05a57b854
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import
org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import java.io.IOException;
+import java.util.Map;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+/**
+ * Module that implements all the asynchronous RPC calls in {@link
NamenodeProtocol} in the
+ * {@link RouterRpcServer}.
+ */
+public class RouterAsyncNamenodeProtocol extends RouterNamenodeProtocol {
+
+ /** RPC server to receive client calls. */
+ private final RouterRpcServer rpcServer;
+ /** RPC clients to connect to the Namenodes. */
+ private final RouterRpcClient rpcClient;
+
+ public RouterAsyncNamenodeProtocol(RouterRpcServer server) {
+ super(server);
+ this.rpcServer = server;
+ this.rpcClient = this.rpcServer.getRPCClient();
+ }
+
+ /**
+ * Asynchronously get a list of blocks belonging to <code>datanode</code>
+ * whose total size equals <code>size</code>.
+ *
+ * @see org.apache.hadoop.hdfs.server.balancer.Balancer
+ * @param datanode a data node
+ * @param size requested size
+ * @param minBlockSize each block should be of this minimum Block Size
+ * @param hotBlockTimeInterval prefer to get blocks which are belong to
+ * the cold files accessed before the time interval
+ * @param storageType the given storage type {@link StorageType}
+ * @return BlocksWithLocations a list of blocks & their locations
+ * @throws IOException if size is less than or equal to 0 or
+ datanode does not exist
+ */
+ @Override
+ public BlocksWithLocations getBlocks(
+ DatanodeInfo datanode, long size,
+ long minBlockSize, long hotBlockTimeInterval, StorageType storageType)
throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ // Get the namespace where the datanode is located
+
rpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL);
+ asyncApply((AsyncApplyFunction<Map<String, DatanodeStorageReport[]>,
Object>) map -> {
+ String nsId = null;
+ for (Map.Entry<String, DatanodeStorageReport[]> entry : map.entrySet()) {
+ DatanodeStorageReport[] dns = entry.getValue();
+ for (DatanodeStorageReport dn : dns) {
+ DatanodeInfo dnInfo = dn.getDatanodeInfo();
+ if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) {
+ nsId = entry.getKey();
+ break;
+ }
+ }
+ // Break the loop if already found
+ if (nsId != null) {
+ break;
+ }
+ }
+ // Forward to the proper namenode
+ if (nsId != null) {
+ RemoteMethod method = new RemoteMethod(
+ NamenodeProtocol.class, "getBlocks", new Class<?>[]
+ {DatanodeInfo.class, long.class, long.class, long.class,
StorageType.class},
+ datanode, size, minBlockSize, hotBlockTimeInterval, storageType);
+ rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
+ } else {
+ asyncComplete(null);
+ }
+ });
+ return asyncReturn(BlocksWithLocations.class);
+ }
+
+ /**
+ * Asynchronously get the current block keys.
+ *
+ * @return ExportedBlockKeys containing current block keys
+ * @throws IOException if there is no namespace available or other
ioExceptions.
+ */
+ @Override
+ public ExportedBlockKeys getBlockKeys() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
+ rpcServer.invokeAtAvailableNsAsync(method, ExportedBlockKeys.class);
+ return asyncReturn(ExportedBlockKeys.class);
+ }
+
+ /**
+ * Asynchronously get the most recent transaction ID.
+ *
+ * @return The most recent transaction ID that has been synced to
+ * persistent storage, or applied from persistent storage in the
+ * case of a non-active node.
+ * @throws IOException if there is no namespace available or other
ioExceptions.
+ */
+ @Override
+ public long getTransactionID() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
+ rpcServer.invokeAtAvailableNsAsync(method, long.class);
+ return asyncReturn(Long.class);
+ }
+
+ /**
+ * Asynchronously get the transaction ID of the most recent checkpoint.
+ *
+ * @return The transaction ID of the most recent checkpoint.
+ * @throws IOException if there is no namespace available or other
ioExceptions.
+ */
+ @Override
+ public long getMostRecentCheckpointTxId() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class,
"getMostRecentCheckpointTxId");
+ rpcServer.invokeAtAvailableNsAsync(method, long.class);
+ return asyncReturn(Long.class);
+ }
+
+ /**
+ * Asynchronously get the transaction ID of the most recent checkpoint
+ * for the given NameNodeFile.
+ *
+ * @return The transaction ID of the most recent checkpoint
+ * for the given NameNodeFile.
+ * @throws IOException if there is no namespace available or other
ioExceptions.
+ */
+ @Override
+ public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class,
"getMostRecentNameNodeFileTxId",
+ new Class<?>[] {NNStorage.NameNodeFile.class}, nnf);
+ rpcServer.invokeAtAvailableNsAsync(method, long.class);
+ return asyncReturn(Long.class);
+ }
+
+ /**
+ * Asynchronously request name-node version and storage information.
+ *
+ * @return {@link NamespaceInfo} identifying versions and storage information
+ * of the name-node.
+ * @throws IOException if there is no namespace available or other
ioExceptions.
+ */
+ @Override
+ public NamespaceInfo versionRequest() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class, "versionRequest");
+ rpcServer.invokeAtAvailableNsAsync(method, NamespaceInfo.class);
+ return asyncReturn(NamespaceInfo.class);
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java
new file mode 100644
index 00000000000..3e03a9bdd5c
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.RouterUserProtocol;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+/**
+ * Module that implements all the asynchronous RPC calls in
+ * {@link RefreshUserMappingsProtocol} {@link GetUserMappingsProtocol} in the
+ * {@link RouterRpcServer}.
+ */
+public class RouterAsyncUserProtocol extends RouterUserProtocol {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterAsyncUserProtocol.class);
+
+ /** RPC server to receive client calls. */
+ private final RouterRpcServer rpcServer;
+ /** RPC clients to connect to the Namenodes. */
+ private final RouterRpcClient rpcClient;
+
+ private final ActiveNamenodeResolver namenodeResolver;
+
+ public RouterAsyncUserProtocol(RouterRpcServer server) {
+ super(server);
+ this.rpcServer = server;
+ this.rpcClient = this.rpcServer.getRPCClient();
+ this.namenodeResolver = this.rpcServer.getNamenodeResolver();
+ }
+
+ /**
+ * Asynchronously refresh user to group mappings.
+ * @throws IOException raised on errors performing I/O.
+ */
+ @Override
+ public void refreshUserToGroupsMappings() throws IOException {
+ LOG.debug("Refresh user groups mapping in Router.");
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ if (nss.isEmpty()) {
+ Groups.getUserToGroupsMappingService().refresh();
+ asyncComplete(null);
+ } else {
+ RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
+ "refreshUserToGroupsMappings");
+ rpcClient.invokeConcurrent(nss, method);
+ }
+ }
+
+ /**
+ * Asynchronously refresh superuser proxy group list.
+ * @throws IOException raised on errors performing I/O.
+ */
+ @Override
+ public void refreshSuperUserGroupsConfiguration() throws IOException {
+ LOG.debug("Refresh superuser groups configuration in Router.");
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ if (nss.isEmpty()) {
+ ProxyUsers.refreshSuperUserGroupsConfiguration();
+ asyncComplete(null);
+ } else {
+ RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
+ "refreshSuperUserGroupsConfiguration");
+ rpcClient.invokeConcurrent(nss, method);
+ }
+ }
+
+ /**
+ * Asynchronously get the groups which are mapped to the given user.
+ * @param user The user to get the groups for.
+ * @return The set of groups the user belongs to.
+ * @throws IOException raised on errors performing I/O.
+ */
+ @Override
+ public String[] getGroupsForUser(String user) throws IOException {
+ LOG.debug("Getting groups for user {}", user);
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ if (nss.isEmpty()) {
+ asyncComplete(UserGroupInformation.createRemoteUser(user)
+ .getGroupNames());
+ } else {
+ RemoteMethod method = new RemoteMethod(GetUserMappingsProtocol.class,
+ "getGroupsForUser", new Class<?>[] {String.class}, user);
+ rpcClient.invokeConcurrent(nss, method, String[].class);
+ asyncApply((ApplyFunction<Map<FederationNamespaceInfo, String[]>,
String[]>)
+ results -> merge(results, String.class));
+ }
+ return asyncReturn(String[].class);
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java
new file mode 100644
index 00000000000..36e0513bb6a
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes that facilitate asynchronous operations
within the Hadoop
+ * Distributed File System (HDFS) Federation router. These classes are
designed to work with
+ * the Hadoop ecosystem, providing utilities and interfaces to perform
non-blocking tasks that
+ * can improve the performance and responsiveness of HDFS operations.
+ */
[email protected]
[email protected]
+
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
index 2d96ab1be35..a462b7a5f73 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
@@ -21,6 +21,7 @@ import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static
org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -627,12 +628,15 @@ public class RouterAdminServer extends AbstractService
Map<RemoteLocation, HdfsFileStatus> responses =
rpcClient.invokeConcurrent(
locations, method, false, false, HdfsFileStatus.class);
+ if (rpcServer.isAsync()) {
+ responses = syncReturn(Map.class);
+ }
for (RemoteLocation location : locations) {
if (responses.get(location) != null) {
nsIds.add(location.getNameserviceId());
}
}
- } catch (IOException ioe) {
+ } catch (Exception ioe) {
LOG.error("Cannot get location for {}: {}",
src, ioe.getMessage());
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index c2c86d9015a..6fb189b0e01 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -72,7 +72,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
@@ -398,8 +397,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
NotReplicatedYetException.class,
IOException.class,
ConnectException.class,
- RetriableException.class,
- PathIsNotEmptyDirectoryException.class);
+ RetriableException.class);
this.rpcServer.addSuppressedLoggingExceptions(
StandbyException.class, UnresolvedPathException.class);
@@ -464,7 +462,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
/**
* Init router async handlers and router async responders.
*/
- protected void initAsyncThreadPool() {
+ public void initAsyncThreadPool() {
int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
@@ -607,7 +605,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @return routerStateIdContext
*/
@VisibleForTesting
- protected RouterStateIdContext getRouterStateIdContext() {
+ public RouterStateIdContext getRouterStateIdContext() {
return routerStateIdContext;
}
@@ -710,7 +708,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @throws StandbyException If the Router is in safe mode and cannot serve
* client requests.
*/
- void checkOperation(OperationCategory op)
+ public void checkOperation(OperationCategory op)
throws StandbyException {
// Log the function we are currently calling.
if (rpcMonitor != null) {
@@ -776,8 +774,9 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* If the namespace is unavailable, retry with other namespaces.
* @param <T> expected return type.
* @param method the remote method.
+ * @param clazz the type of return value.
* @return the response received after invoking method.
- * @throws IOException
+ * @throws IOException if there is no namespace available or other
ioExceptions.
*/
<T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
throws IOException {
@@ -810,10 +809,11 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* Asynchronous version of invokeAtAvailableNs method.
* @param <T> expected return type.
* @param method the remote method.
+ * @param clazz the type of return value.
* @return the response received after invoking method.
- * @throws IOException
+ * @throws IOException if there is no namespace available or other
ioExceptions.
*/
- <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
+ public <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
throws IOException {
String nsId = subclusterResolver.getDefaultNamespace();
// If default Ns is not present return result from first namespace.
@@ -851,7 +851,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @param ioe IOException .
* @param nss List of name spaces in the federation
* @return the response received after invoking method.
- * @throws IOException
+ * @throws IOException if there is no namespace available or other
ioExceptions.
*/
<T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
Set<FederationNamespaceInfo> nss) throws IOException {
@@ -885,7 +885,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @param ioe IOException .
* @param nss List of name spaces in the federation
* @return the response received after invoking method.
- * @throws IOException
+ * @throws IOException if there is no namespace available or other
ioExceptions.
*/
<T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
Set<FederationNamespaceInfo> nss) throws IOException {
@@ -2131,7 +2131,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @param clazz Class of the values.
* @return Array with the outputs.
*/
- static <T> T[] merge(
+ public static <T> T[] merge(
Map<FederationNamespaceInfo, T[]> map, Class<T> clazz) {
// Put all results into a set to avoid repeats
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java
new file mode 100644
index 00000000000..86969f16953
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterAsyncRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.ipc.CallerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Used to test the functionality of async router rps.
+ */
+public class RouterAsyncProtocolTestBase {
+ private static Configuration routerConf;
+ /** Federated HDFS cluster. */
+ private static MiniRouterDFSCluster cluster;
+ private static String ns0;
+
+ /** Random Router for this federated cluster. */
+ private MiniRouterDFSCluster.RouterContext router;
+ private FileSystem routerFs;
+ private RouterRpcServer routerRpcServer;
+ private RouterRpcServer routerAsyncRpcServer;
+
+ @BeforeClass
+ public static void setUpCluster() throws Exception {
+ cluster = new MiniRouterDFSCluster(true, 1, 2,
+ DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
+ cluster.setNumDatanodesPerNameservice(3);
+
+ cluster.startCluster();
+
+ // Making one Namenode active per nameservice
+ if (cluster.isHighAvailability()) {
+ for (String ns : cluster.getNameservices()) {
+ cluster.switchToActive(ns, NAMENODES[0]);
+ cluster.switchToStandby(ns, NAMENODES[1]);
+ }
+ }
+ // Start routers with only an RPC service
+ routerConf = new RouterConfigBuilder()
+ .rpc()
+ .build();
+
+ // Reduce the number of RPC clients threads to overload the Router easy
+ routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
+ routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
+ routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+ // We decrease the DN cache times to make the test faster
+ routerConf.setTimeDuration(
+ RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+ cluster.addRouterOverrides(routerConf);
+ // Start routers with only an RPC service
+ cluster.startRouters();
+
+ // Register and verify all NNs with all routers
+ cluster.registerNamenodes();
+ cluster.waitNamenodeRegistration();
+ cluster.waitActiveNamespaces();
+ ns0 = cluster.getNameservices().get(0);
+ }
+
+ @AfterClass
+ public static void shutdownCluster() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ router = cluster.getRandomRouter();
+ routerFs = router.getFileSystem();
+ routerRpcServer = router.getRouterRpcServer();
+ routerRpcServer.initAsyncThreadPool();
+ RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
+ routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
+ routerRpcServer.getRPCMonitor(),
+ routerRpcServer.getRouterStateIdContext());
+ routerAsyncRpcServer = Mockito.spy(routerRpcServer);
+
Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
+
+ // Create mock locations
+ MockResolver resolver = (MockResolver)
router.getRouter().getSubclusterResolver();
+ resolver.addLocation("/", ns0, "/");
+ FsPermission permission = new FsPermission("705");
+ routerFs.mkdirs(new Path("/testdir"), permission);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ // clear client context
+ CallerContext.setCurrent(null);
+ boolean delete = routerFs.delete(new Path("/testdir"));
+ assertTrue(delete);
+ if (routerFs != null) {
+ routerFs.close();
+ }
+ }
+
+ public static Configuration getRouterConf() {
+ return routerConf;
+ }
+
+ public static MiniRouterDFSCluster getCluster() {
+ return cluster;
+ }
+
+ public static String getNs0() {
+ return ns0;
+ }
+
+ public MiniRouterDFSCluster.RouterContext getRouter() {
+ return router;
+ }
+
+ public FileSystem getRouterFs() {
+ return routerFs;
+ }
+
+ public RouterRpcServer getRouterRpcServer() {
+ return routerRpcServer;
+ }
+
+ public RouterRpcServer getRouterAsyncRpcServer() {
+ return routerAsyncRpcServer;
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java
new file mode 100644
index 00000000000..86081260536
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncNamenodeProtocol}.
+ */
+public class TestRouterAsyncNamenodeProtocol extends
RouterAsyncProtocolTestBase {
+
+ private RouterAsyncNamenodeProtocol asyncNamenodeProtocol;
+ private RouterNamenodeProtocol namenodeProtocol;
+
+ @Before
+ public void setup() throws Exception {
+ asyncNamenodeProtocol = new
RouterAsyncNamenodeProtocol(getRouterAsyncRpcServer());
+ namenodeProtocol = new RouterNamenodeProtocol(getRouterRpcServer());
+ }
+
+ @Test
+ public void getBlocks() throws Exception {
+ DatanodeInfo[] dns = getRouter().getClient()
+ .getNamenode().getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
+
+ DatanodeInfo dn0 = dns[0];
+ asyncNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
+ null);
+ BlocksWithLocations asyncRouterBlockLocations =
syncReturn(BlocksWithLocations.class);
+ assertNotNull(asyncRouterBlockLocations);
+
+ BlocksWithLocations syncRouterBlockLocations =
namenodeProtocol.getBlocks(dn0, 1024,
+ 0, 0, null);
+
+ BlockWithLocations[] asyncRouterBlocks =
asyncRouterBlockLocations.getBlocks();
+ BlockWithLocations[] syncRouterBlocks =
syncRouterBlockLocations.getBlocks();
+
+ assertEquals(asyncRouterBlocks.length, syncRouterBlocks.length);
+ for (int i = 0; i < syncRouterBlocks.length; i++) {
+ assertEquals(
+ asyncRouterBlocks[i].getBlock().getBlockId(),
+ syncRouterBlocks[i].getBlock().getBlockId());
+ }
+ }
+
+ @Test
+ public void getBlockKeys() throws Exception {
+ asyncNamenodeProtocol.getBlockKeys();
+ ExportedBlockKeys asyncBlockKeys = syncReturn(ExportedBlockKeys.class);
+ assertNotNull(asyncBlockKeys);
+
+ ExportedBlockKeys syncBlockKeys = namenodeProtocol.getBlockKeys();
+ compareBlockKeys(asyncBlockKeys, syncBlockKeys);
+ }
+
+ @Test
+ public void getTransactionID() throws Exception {
+ asyncNamenodeProtocol.getTransactionID();
+ long asyncTransactionID = syncReturn(Long.class);
+ assertNotNull(asyncTransactionID);
+
+ long transactionID = namenodeProtocol.getTransactionID();
+ assertEquals(asyncTransactionID, transactionID);
+ }
+
+ @Test
+ public void getMostRecentCheckpointTxId() throws Exception {
+ asyncNamenodeProtocol.getMostRecentCheckpointTxId();
+ long asyncMostRecentCheckpointTxId = syncReturn(Long.class);
+ assertNotNull(asyncMostRecentCheckpointTxId);
+
+ long mostRecentCheckpointTxId =
namenodeProtocol.getMostRecentCheckpointTxId();
+ assertEquals(asyncMostRecentCheckpointTxId, mostRecentCheckpointTxId);
+ }
+
+ @Test
+ public void versionRequest() throws Exception {
+ asyncNamenodeProtocol.versionRequest();
+ NamespaceInfo asyncNamespaceInfo = syncReturn(NamespaceInfo.class);
+ assertNotNull(asyncNamespaceInfo);
+ NamespaceInfo syncNamespaceInfo = namenodeProtocol.versionRequest();
+ compareVersion(asyncNamespaceInfo, syncNamespaceInfo);
+ }
+
+ private void compareBlockKeys(
+ ExportedBlockKeys blockKeys, ExportedBlockKeys otherBlockKeys) {
+ assertEquals(blockKeys.getCurrentKey(), otherBlockKeys.getCurrentKey());
+ assertEquals(blockKeys.getKeyUpdateInterval(),
otherBlockKeys.getKeyUpdateInterval());
+ assertEquals(blockKeys.getTokenLifetime(),
otherBlockKeys.getTokenLifetime());
+ }
+
+ private void compareVersion(NamespaceInfo version, NamespaceInfo
otherVersion) {
+ assertEquals(version.getBlockPoolID(), otherVersion.getBlockPoolID());
+ assertEquals(version.getNamespaceID(), otherVersion.getNamespaceID());
+ assertEquals(version.getClusterID(), otherVersion.getClusterID());
+ assertEquals(version.getLayoutVersion(), otherVersion.getLayoutVersion());
+ assertEquals(version.getCTime(), otherVersion.getCTime());
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java
new file mode 100644
index 00000000000..a3fcd6109e5
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.async;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncUserProtocol}.
+ */
+public class TestRouterAsyncUserProtocol extends RouterAsyncProtocolTestBase {
+
+ private RouterAsyncUserProtocol asyncUserProtocol;
+
+ @Before
+ public void setup() throws Exception {
+ asyncUserProtocol = new RouterAsyncUserProtocol(getRouterAsyncRpcServer());
+ }
+
+ @Test
+ public void testgetGroupsForUser() throws Exception {
+ String[] group = new String[] {"bar", "group2"};
+ UserGroupInformation.createUserForTesting("user",
+ new String[] {"bar", "group2"});
+ asyncUserProtocol.getGroupsForUser("user");
+ String[] result = syncReturn(String[].class);
+ assertArrayEquals(group, result);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]