This is an automated email from the ASF dual-hosted git repository.
keepromise pushed a commit to branch HDFS-17531
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-17531 by this push:
new d4a6a271385 HDFS-17672. [ARR] Move asynchronous related classes to the
async package. (#7184). Contributed by Jian Zhang.
d4a6a271385 is described below
commit d4a6a271385314318bdfb7284134888d60740c67
Author: Jian Zhang <[email protected]>
AuthorDate: Thu Nov 28 20:13:11 2024 +0800
HDFS-17672. [ARR] Move asynchronous related classes to the async package.
(#7184). Contributed by Jian Zhang.
---
.../hdfs/protocolPB/AsyncRpcProtocolPBUtil.java | 8 +-
.../hdfs/server/federation/router/Quota.java | 5 +-
.../federation/router/RouterAdminServer.java | 2 +-
.../router/RouterQuotaUpdateService.java | 2 +-
.../server/federation/router/RouterRpcClient.java | 18 +-
.../server/federation/router/RouterRpcServer.java | 31 ++--
.../federation/router/RouterStateIdContext.java | 13 +-
.../router/{ => async}/AsyncErasureCoding.java | 83 ++++++++-
.../federation/router/{ => async}/AsyncQuota.java | 22 ++-
.../router/{ => async}/RouterAsyncCacheAdmin.java | 42 ++++-
.../async/RouterAsyncNamenodeProtocol.java | 10 +-
.../router/{ => async}/RouterAsyncRpcClient.java | 42 +++--
.../router/{ => async}/RouterAsyncSnapshot.java | 69 +++++++-
.../{ => async}/RouterAsyncStoragePolicy.java | 31 +++-
.../async/RouterAsyncUserProtocol.java | 13 +-
.../federation/router/async/package-info.java | 4 -
.../router/async/{ => utils}/ApplyFunction.java | 4 +-
.../federation/router/async/{ => utils}/Async.java | 2 +-
.../async/{ => utils}/AsyncApplyFunction.java | 4 +-
.../router/async/{ => utils}/AsyncBiFunction.java | 2 +-
.../async/{ => utils}/AsyncCatchFunction.java | 6 +-
.../router/async/{ => utils}/AsyncForEachRun.java | 4 +-
.../router/async/{ => utils}/AsyncRun.java | 2 +-
.../router/async/{ => utils}/AsyncUtil.java | 6 +-
.../router/async/{ => utils}/CatchFunction.java | 8 +-
.../router/async/{ => utils}/FinallyFunction.java | 4 +-
.../async/utils}/package-info.java | 6 +-
.../protocolPB/TestAsyncRpcProtocolPBUtil.java | 4 +-
.../TestRouterClientSideTranslatorPB.java | 2 +-
.../router/TestRouterAsyncCacheAdmin.java | 195 ---------------------
.../router/TestRouterAsyncRpcServer.java | 191 --------------------
.../router/TestRouterAsyncStoragePolicy.java | 163 -----------------
.../async/RouterAsyncProtocolTestBase.java | 3 +-
.../router/async/TestRouterAsyncCacheAdmin.java | 102 +++++++++++
.../{ => async}/TestRouterAsyncErasureCoding.java | 6 +-
.../async/TestRouterAsyncNamenodeProtocol.java | 4 +-
.../router/{ => async}/TestRouterAsyncQuota.java | 6 +-
.../{ => async}/TestRouterAsyncRpcClient.java | 9 +-
.../router/async/TestRouterAsyncRpcServer.java | 96 ++++++++++
.../{ => async}/TestRouterAsyncSnapshot.java | 114 ++----------
.../router/async/TestRouterAsyncStoragePolicy.java | 66 +++++++
.../async/TestRouterAsyncUserProtocol.java | 4 +-
.../router/async/{ => utils}/AsyncClass.java | 20 +--
.../router/async/{ => utils}/BaseClass.java | 2 +-
.../router/async/{ => utils}/SyncClass.java | 2 +-
.../router/async/{ => utils}/TestAsyncUtil.java | 2 +-
.../security/TestRouterSecurityManager.java | 2 -
47 files changed, 649 insertions(+), 787 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
index fa798e2f358..d04a61e6862 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
@@ -32,9 +32,9 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
index f28af6afa7b..2c4bcc92b47 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
@@ -139,7 +139,7 @@ public class Quota {
* @return quota usage for each remote location.
* @throws IOException If the quota system is disabled.
*/
- Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
+ protected Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
if (!router.isQuotaEnabled()) {
@@ -252,8 +252,9 @@ public class Quota {
* @param path Federation path of the results.
* @param results Quota query result.
* @return Aggregated Quota.
+ * @throws IOException If the quota system is disabled.
*/
- QuotaUsage aggregateQuota(String path,
+ protected QuotaUsage aggregateQuota(String path,
Map<RemoteLocation, QuotaUsage> results) throws IOException {
long nsCount = 0;
long ssCount = 0;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
index a462b7a5f73..d9cacf2e75e 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
@@ -21,7 +21,7 @@ import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static
org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import java.io.IOException;
import java.net.InetSocketAddress;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
index 235190d2a48..124d55586d8 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
@@ -41,7 +41,7 @@ import
org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
/**
* Service to periodically update the {@link RouterQuotaUsage}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 70b5034272d..e07de092dd7 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -1940,7 +1940,7 @@ public class RouterRpcClient {
* @return A prioritized list of NNs to use for communication.
* @throws IOException If a NN cannot be located for the nameservice ID.
*/
- protected List<? extends FederationNamenodeContext>
getOrderedNamenodes(String nsId,
+ public List<? extends FederationNamenodeContext> getOrderedNamenodes(String
nsId,
boolean isObserverRead) throws IOException {
final List<? extends FederationNamenodeContext> namenodes;
@@ -2047,39 +2047,39 @@ public class RouterRpcClient {
private static final byte SHOULD_USE_OBSERVER_BIT = 2;
private static final byte COMPLETE_BIT = 4;
- ExecutionStatus() {
+ public ExecutionStatus() {
this(false, false);
}
- ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
+ public ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
this.flag = 0;
setFailOver(failOver);
setShouldUseObserver(shouldUseObserver);
setComplete(false);
}
- private void setFailOver(boolean failOver) {
+ public void setFailOver(boolean failOver) {
flag = (byte) (failOver ? (flag | FAIL_OVER_BIT) : (flag &
~FAIL_OVER_BIT));
}
- private void setShouldUseObserver(boolean shouldUseObserver) {
+ public void setShouldUseObserver(boolean shouldUseObserver) {
flag = (byte) (shouldUseObserver ?
(flag | SHOULD_USE_OBSERVER_BIT) : (flag &
~SHOULD_USE_OBSERVER_BIT));
}
- void setComplete(boolean complete) {
+ public void setComplete(boolean complete) {
flag = (byte) (complete ? (flag | COMPLETE_BIT) : (flag &
~COMPLETE_BIT));
}
- boolean isFailOver() {
+ public boolean isFailOver() {
return (flag & FAIL_OVER_BIT) != 0;
}
- boolean isShouldUseObserver() {
+ public boolean isShouldUseObserver() {
return (flag & SHOULD_USE_OBSERVER_BIT) != 0;
}
- boolean isComplete() {
+ public boolean isComplete() {
return (flag & COMPLETE_BIT) != 0;
}
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 6fb189b0e01..39a50d4e3a6 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -37,12 +37,12 @@ import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_R
import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static
org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
import static
org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
import java.io.FileNotFoundException;
@@ -75,9 +75,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
-import
org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
-import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
@@ -686,7 +687,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* client requests.
* @throws UnsupportedOperationException If the operation is not supported.
*/
- void checkOperation(OperationCategory op, boolean supported)
+ public void checkOperation(OperationCategory op, boolean supported)
throws StandbyException, UnsupportedOperationException {
checkOperation(op);
@@ -1032,7 +1033,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
*/
- RemoteLocation getCreateLocationAsync(
+ public RemoteLocation getCreateLocationAsync(
final String src, final List<RemoteLocation> locations)
throws IOException {
@@ -1995,7 +1996,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @return Prioritized list of locations in the federated cluster.
* @throws IOException If the location for this path cannot be determined.
*/
- protected List<RemoteLocation> getLocationsForPath(String path,
+ public List<RemoteLocation> getLocationsForPath(String path,
boolean failIfLocked) throws IOException {
return getLocationsForPath(path, failIfLocked, true);
}
@@ -2010,7 +2011,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @return Prioritized list of locations in the federated cluster.
* @throws IOException If the location for this path cannot be determined.
*/
- protected List<RemoteLocation> getLocationsForPath(String path,
+ public List<RemoteLocation> getLocationsForPath(String path,
boolean failIfLocked, boolean needQuotaVerify) throws IOException {
try {
if (failIfLocked) {
@@ -2227,9 +2228,9 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* mount entry.
* @param path The path on which the operation need to be invoked.
* @return true if the call is supposed to invoked on all locations.
- * @throws IOException
+ * @throws IOException If an I/O error occurs.
*/
- boolean isInvokeConcurrent(final String path) throws IOException {
+ public boolean isInvokeConcurrent(final String path) throws IOException {
if (subclusterResolver instanceof MountTableResolver) {
MountTableResolver mountTableResolver =
(MountTableResolver) subclusterResolver;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
index e239e5e9059..21e3f16f206 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
@@ -48,7 +48,7 @@ import
org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-class RouterStateIdContext implements AlignmentContext {
+public class RouterStateIdContext implements AlignmentContext {
private final HashSet<String> coordinatedMethods;
/**
@@ -93,6 +93,8 @@ class RouterStateIdContext implements AlignmentContext {
/**
* Adds the {@link #namespaceIdMap} to the response header that will be sent
to a client.
+ *
+ * @param headerBuilder the response header that will be sent to a client.
*/
public void setResponseHeaderState(RpcResponseHeaderProto.Builder
headerBuilder) {
if (namespaceIdMap.isEmpty()) {
@@ -110,7 +112,8 @@ class RouterStateIdContext implements AlignmentContext {
}
public LongAccumulator getNamespaceStateId(String nsId) {
- return namespaceIdMap.computeIfAbsent(nsId, key -> new
LongAccumulator(Math::max, Long.MIN_VALUE));
+ return namespaceIdMap.computeIfAbsent(nsId,
+ key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
}
public List<String> getNamespaces() {
@@ -127,6 +130,9 @@ class RouterStateIdContext implements AlignmentContext {
/**
* Utility function to parse routerFederatedState field in RPC headers.
+ *
+ * @param byteString the byte string of routerFederatedState.
+ * @return the router federated state map.
*/
public static Map<String, Long> getRouterFederatedStateMap(ByteString
byteString) {
if (byteString != null) {
@@ -148,7 +154,8 @@ class RouterStateIdContext implements AlignmentContext {
if (call != null) {
ByteString callFederatedNamespaceState =
call.getFederatedNamespaceState();
if (callFederatedNamespaceState != null) {
- Map<String, Long> clientFederatedStateIds =
getRouterFederatedStateMap(callFederatedNamespaceState);
+ Map<String, Long> clientFederatedStateIds =
+ getRouterFederatedStateMap(callFederatedNamespaceState);
clientStateID = clientFederatedStateIds.getOrDefault(nsId,
Long.MIN_VALUE);
}
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncErasureCoding.java
similarity index 66%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncErasureCoding.java
index 9f1dbe5f2c4..d40c6a1a93a 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncErasureCoding.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
@@ -25,7 +25,12 @@ import
org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.ErasureCoding;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import java.io.IOException;
@@ -36,9 +41,15 @@ import java.util.Map;
import java.util.Set;
import static
org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+/**
+ * Provides asynchronous operations for erasure coding in HDFS Federation.
+ * This class extends {@link
org.apache.hadoop.hdfs.server.federation.router.ErasureCoding}
+ * and overrides its methods to perform erasure coding operations in a
non-blocking manner,
+ * allowing for concurrent execution and improved performance.
+ */
public class AsyncErasureCoding extends ErasureCoding {
/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
@@ -54,6 +65,17 @@ public class AsyncErasureCoding extends ErasureCoding {
this.namenodeResolver = this.rpcClient.getNamenodeResolver();
}
+ /**
+ * Asynchronously get an array of all erasure coding policies.
+ * This method checks the operation category and then invokes the
+ * getErasureCodingPolicies method concurrently across all namespaces.
+ * <p>
+ * The results are merged and returned as an array of
ErasureCodingPolicyInfo.
+ *
+ * @return Array of ErasureCodingPolicyInfo.
+ * @throws IOException If an I/O error occurs.
+ */
+ @Override
public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -70,6 +92,16 @@ public class AsyncErasureCoding extends ErasureCoding {
return asyncReturn(ErasureCodingPolicyInfo[].class);
}
+ /**
+ * Asynchronously get the erasure coding codecs available.
+ * This method checks the operation category and then invokes the
+ * getErasureCodingCodecs method concurrently across all namespaces.
+ * <p>
+ * The results are merged into a single map of codec names to codec
properties.
+ *
+ * @return Map of erasure coding codecs.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public Map<String, String> getErasureCodingCodecs() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -97,6 +129,17 @@ public class AsyncErasureCoding extends ErasureCoding {
return asyncReturn(Map.class);
}
+ /**
+ * Asynchronously add an array of erasure coding policies.
+ * This method checks the operation category and then invokes the
+ * addErasureCodingPolicies method concurrently across all namespaces.
+ * <p>
+ * The results are merged and returned as an array of
AddErasureCodingPolicyResponse.
+ *
+ * @param policies Array of erasure coding policies to add.
+ * @return Array of AddErasureCodingPolicyResponse.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
ErasureCodingPolicy[] policies) throws IOException {
@@ -117,6 +160,17 @@ public class AsyncErasureCoding extends ErasureCoding {
return asyncReturn(AddErasureCodingPolicyResponse[].class);
}
+ /**
+ * Asynchronously get the erasure coding policy for a given source path.
+ * This method checks the operation category and then invokes the
+ * getErasureCodingPolicy method sequentially for the given path.
+ * <p>
+ * The result is returned as an ErasureCodingPolicy object.
+ *
+ * @param src Source path to get the erasure coding policy for.
+ * @return ErasureCodingPolicy for the given path.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public ErasureCodingPolicy getErasureCodingPolicy(String src)
throws IOException {
@@ -136,6 +190,17 @@ public class AsyncErasureCoding extends ErasureCoding {
return asyncReturn(ErasureCodingPolicy.class);
}
+ /**
+ * Asynchronously get the EC topology result for the given policies.
+ * This method checks the operation category and then invokes the
+ * getECTopologyResultForPolicies method concurrently across all namespaces.
+ * <p>
+ * The results are merged and the first unsupported result is returned.
+ *
+ * @param policyNames Array of policy names to check.
+ * @return ECTopologyVerifierResult for the policies.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public ECTopologyVerifierResult getECTopologyResultForPolicies(
String[] policyNames) throws IOException {
@@ -162,6 +227,16 @@ public class AsyncErasureCoding extends ErasureCoding {
return asyncReturn(ECTopologyVerifierResult.class);
}
+ /**
+ * Asynchronously get the erasure coding block group statistics.
+ * This method checks the operation category and then invokes the
+ * getECBlockGroupStats method concurrently across all namespaces.
+ * <p>
+ * The results are merged and returned as an ECBlockGroupStats object.
+ *
+ * @return ECBlockGroupStats for the erasure coding block groups.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncQuota.java
similarity index 76%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncQuota.java
index 5d76171a548..0980cf2a2fb 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncQuota.java
@@ -15,10 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.Quota;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import java.io.IOException;
@@ -26,9 +32,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionException;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+/**
+ * Provides asynchronous operations for managing quotas in HDFS Federation.
+ * This class extends {@link
org.apache.hadoop.hdfs.server.federation.router.Quota}
+ * and overrides its methods to perform quota operations in a non-blocking
manner,
+ * allowing for concurrent execution and improved performance.
+ */
public class AsyncQuota extends Quota {
/** RPC server to receive client calls. */
@@ -50,6 +62,7 @@ public class AsyncQuota extends Quota {
* @return Aggregated quota.
* @throws IOException If the quota system is disabled.
*/
+ @Override
public QuotaUsage getQuotaUsage(String path) throws IOException {
getEachQuotaUsage(path);
@@ -70,7 +83,8 @@ public class AsyncQuota extends Quota {
* @return quota usage for each remote location.
* @throws IOException If the quota system is disabled.
*/
- Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
+ @Override
+ protected Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
if (!router.isQuotaEnabled()) {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncCacheAdmin.java
similarity index 61%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncCacheAdmin.java
index fca43e15879..20ec36c935c 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncCacheAdmin.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncCacheAdmin.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -25,14 +25,16 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.RouterCacheAdmin;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
/**
* Module that implements all the asynchronous RPC calls in
@@ -45,6 +47,17 @@ public class RouterAsyncCacheAdmin extends RouterCacheAdmin {
super(server);
}
+ /**
+ * Asynchronously adds a new cache directive with the given path and flags.
+ * This method invokes the addCacheDirective method concurrently across all
+ * namespaces, and returns the first response as a long value representing
the
+ * directive ID.
+ *
+ * @param path The cache directive path.
+ * @param flags The cache flags.
+ * @return The ID of the newly added cache directive.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public long addCacheDirective(
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
@@ -54,6 +67,17 @@ public class RouterAsyncCacheAdmin extends RouterCacheAdmin {
return asyncReturn(Long.class);
}
+ /**
+ * Asynchronously lists cache directives based on the provided previous ID
and filter.
+ * This method invokes the listCacheDirectives method concurrently across all
+ * namespaces, and returns the first response as a BatchedEntries object
containing
+ * the cache directive entries.
+ *
+ * @param prevId The previous ID from which to start listing.
+ * @param filter The filter to apply to the cache directives.
+ * @return BatchedEntries of cache directive entries.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
long prevId, CacheDirectiveInfo filter) throws IOException {
@@ -64,6 +88,16 @@ public class RouterAsyncCacheAdmin extends RouterCacheAdmin {
return asyncReturn(BatchedEntries.class);
}
+ /**
+ * Asynchronously lists cache pools starting from the provided key.
+ * This method invokes the listCachePools method concurrently across all
namespaces,
+ * and returns the first response as a BatchedEntries object containing the
cache
+ * pool entries.
+ *
+ * @param prevKey The previous key from which to start listing.
+ * @return BatchedEntries of cache pool entries.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) throws
IOException {
invokeListCachePools(prevKey);
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncNamenodeProtocol.java
similarity index 96%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncNamenodeProtocol.java
index fe05a57b854..fc461dd22af 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncNamenodeProtocol.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -25,7 +25,7 @@ import
org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
-import
org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -35,9 +35,9 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import java.io.IOException;
import java.util.Map;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
/**
* Module that implements all the asynchronous RPC calls in {@link
NamenodeProtocol} in the
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
similarity index 94%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
index 2bdcd7ce287..249b7e1c82a 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncRpcClient.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
@@ -24,9 +24,19 @@ import
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
-import
org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
-import
org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
+import org.apache.hadoop.hdfs.server.federation.router.ConnectionContext;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdContext;
+import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -50,18 +60,18 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import static
org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApplyUseExecutor;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.getCompletableFuture;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApplyUseExecutor;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncThrowException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture;
/**
* The {@code RouterAsyncRpcClient} class extends the functionality of the base
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncSnapshot.java
similarity index 73%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncSnapshot.java
index 8d830b84271..c38d243aa38 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncSnapshot.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -26,7 +26,13 @@ import
org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.RouterSnapshot;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import java.io.IOException;
@@ -35,8 +41,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
/**
* Module that implements all the asynchronous RPC calls related to snapshots
in
@@ -57,6 +63,16 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
this.namenodeResolver = rpcServer.getNamenodeResolver();
}
+ /**
+ * Asynchronously creates a snapshot with the given root and name.
+ * This method checks the operation category and then invokes the
createSnapshot
+ * method concurrently across all namespaces, returning the first successful
response.
+ *
+ * @param snapshotRoot The root path of the snapshot.
+ * @param snapshotName The name of the snapshot.
+ * @return The path of the created snapshot.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public String createSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
@@ -89,6 +105,15 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
return asyncReturn(String.class);
}
+ /**
+ * Asynchronously get an array of snapshottable directory listings.
+ * This method checks the operation category and then invokes the
+ * getSnapshottableDirListing method concurrently across all namespaces,
merging
+ * the results into a single array.
+ *
+ * @return Array of SnapshottableDirectoryStatus.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws
IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -103,6 +128,16 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
return asyncReturn(SnapshottableDirectoryStatus[].class);
}
+ /**
+ * Asynchronously get an array of snapshot listings for the given snapshot
root.
+ * This method checks the operation category and then invokes the
+ * getSnapshotListing method, either sequentially or concurrently based on
the
+ * configuration, and returns the merged results.
+ *
+ * @param snapshotRoot The root path of the snapshots.
+ * @return Array of SnapshotStatus.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public SnapshotStatus[] getSnapshotListing(String snapshotRoot) throws
IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -145,6 +180,18 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
return asyncReturn(SnapshotStatus[].class);
}
+ /**
+ * Asynchronously get a snapshot diff report for the given root and snapshot
names.
+ * This method checks the operation category and then invokes the
+ * getSnapshotDiffReport method, either sequentially or concurrently based
on the
+ * configuration, and returns the result.
+ *
+ * @param snapshotRoot The root path of the snapshot.
+ * @param earlierSnapshotName The name of the earlier snapshot.
+ * @param laterSnapshotName The name of the later snapshot.
+ * @return SnapshotDiffReport for the snapshots.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public SnapshotDiffReport getSnapshotDiffReport(
String snapshotRoot, String earlierSnapshotName,
@@ -169,6 +216,20 @@ public class RouterAsyncSnapshot extends RouterSnapshot {
}
}
+ /**
+ * Asynchronously get a snapshot diff report listing for the given root and
snapshot names.
+ * This method checks the operation category and then invokes the
+ * getSnapshotDiffReportListing method, either sequentially or concurrently
based
+ * on the configuration, and returns the result.
+ *
+ * @param snapshotRoot The root path of the snapshot.
+ * @param earlierSnapshotName The name of the earlier snapshot.
+ * @param laterSnapshotName The name of the later snapshot.
+ * @param startPath The starting path for the diff report.
+ * @param index The index for the diff report listing.
+ * @return SnapshotDiffReportListing for the snapshots.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public SnapshotDiffReportListing getSnapshotDiffReportListing(
String snapshotRoot, String earlierSnapshotName, String
laterSnapshotName,
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncStoragePolicy.java
similarity index 64%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncStoragePolicy.java
index 7e019e13bbe..cf23d0f7cc8 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncStoragePolicy.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncStoragePolicy.java
@@ -15,17 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.router.RouterStoragePolicy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import java.io.IOException;
import java.util.List;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+/**
+ * Module that implements all the asynchronous RPC calls in
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to
+ * Storage Policy in the {@link RouterRpcServer}.
+ */
public class RouterAsyncStoragePolicy extends RouterStoragePolicy {
/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
@@ -38,6 +48,15 @@ public class RouterAsyncStoragePolicy extends
RouterStoragePolicy {
this.rpcClient = this.rpcServer.getRPCClient();
}
+ /**
+ * Asynchronously get the storage policy for a given path.
+ * This method checks the operation category and then invokes the
+ * getStoragePolicy method sequentially for the given path.
+ *
+ * @param path The path for which to retrieve the storage policy.
+ * @return The BlockStoragePolicy for the given path.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public BlockStoragePolicy getStoragePolicy(String path)
throws IOException {
@@ -52,6 +71,14 @@ public class RouterAsyncStoragePolicy extends
RouterStoragePolicy {
return asyncReturn(BlockStoragePolicy.class);
}
+ /**
+ * Asynchronously get an array of all available storage policies.
+ * This method checks the operation category and then invokes the
+ * getStoragePolicies method across all available namespaces.
+ *
+ * @return An array of BlockStoragePolicy.
+ * @throws IOException If an I/O error occurs.
+ */
@Override
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncUserProtocol.java
similarity index 95%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncUserProtocol.java
index 3e03a9bdd5c..68b5aa85284 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncUserProtocol.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
@@ -27,7 +27,7 @@ import java.util.Map;
import java.util.Set;
import
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
-import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
@@ -38,9 +38,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static
org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
/**
* Module that implements all the asynchronous RPC calls in
@@ -67,6 +67,7 @@ public class RouterAsyncUserProtocol extends
RouterUserProtocol {
/**
* Asynchronously refresh user to group mappings.
+ *
* @throws IOException raised on errors performing I/O.
*/
@Override
@@ -86,6 +87,7 @@ public class RouterAsyncUserProtocol extends
RouterUserProtocol {
/**
* Asynchronously refresh superuser proxy group list.
+ *
* @throws IOException raised on errors performing I/O.
*/
@Override
@@ -105,6 +107,7 @@ public class RouterAsyncUserProtocol extends
RouterUserProtocol {
/**
* Asynchronously get the groups which are mapped to the given user.
+ *
* @param user The user to get the groups for.
* @return The set of groups the user belongs to.
* @throws IOException raised on errors performing I/O.
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java
index 48fd0ad89ab..e0ecc4b36f6 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java
@@ -21,10 +21,6 @@
* Distributed File System (HDFS) Federation router. These classes are
designed to work with
* the Hadoop ecosystem, providing utilities and interfaces to perform
non-blocking tasks that
* can improve the performance and responsiveness of HDFS operations.
- *
- * <p>These classes work together to enable complex asynchronous workflows,
making it easier to
- * write code that can handle long-running tasks without blocking, thus
improving the overall
- * efficiency and scalability of HDFS operations.</p>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/ApplyFunction.java
similarity index 96%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/ApplyFunction.java
index ac3d5edb635..721adf1e4d6 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/ApplyFunction.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
/**
* Represents a function that accepts a value of type T and produces a result
of type R.
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/Async.java
similarity index 98%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/Async.java
index e184bffaeef..09cd649b7e6 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/Async.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncApplyFunction.java
similarity index 98%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncApplyFunction.java
index b34a6c479cd..0355efabbc9 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncApplyFunction.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
/**
* The AsyncApplyFunction interface represents a function that
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncBiFunction.java
similarity index 98%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncBiFunction.java
index 3e94736e7f1..8e7ac35726b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncBiFunction.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncCatchFunction.java
similarity index 98%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncCatchFunction.java
index 01fcc44a19f..714f57827ff 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncCatchFunction.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.unWarpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
/**
* The AsyncCatchFunction interface represents a function that handles
exceptions
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncForEachRun.java
similarity index 98%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncForEachRun.java
index 322242d1c49..ae984a84b46 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncForEachRun.java
@@ -15,14 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
/**
* The AsyncForEachRun class is part of the asynchronous operation utilities
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncRun.java
similarity index 97%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncRun.java
index 03d39f36d7d..e619a026e19 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncRun.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java
similarity index 99%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java
index 834741b52db..79ae88f6bcb 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.Collection;
@@ -25,8 +25,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.CUR_COMPLETABLE_FUTURE;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.CUR_COMPLETABLE_FUTURE;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
/**
* The AsyncUtil class provides a collection of utility methods to simplify
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/CatchFunction.java
similarity index 96%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/CatchFunction.java
index fbb0af56ce0..a87ddf18aae 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/CatchFunction.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.unWarpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
/**
* The {@code CatchFunction} interface represents a function that handles
exceptions
@@ -58,7 +58,7 @@ import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCo
*/
@FunctionalInterface
public interface CatchFunction<R, E extends Throwable>
- extends Async<R>{
+ extends Async<R> {
/**
* Applies this catch function to the given result and exception.
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/FinallyFunction.java
similarity index 97%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/FinallyFunction.java
index 0243f0a0a1a..671d380ac8b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/FinallyFunction.java
@@ -15,12 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
/**
* The {@code FinallyFunction} interface represents a function that is used to
perform
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/package-info.java
similarity index 80%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/package-info.java
index 36e0513bb6a..5ffbebf9e71 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/package-info.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/package-info.java
@@ -21,11 +21,15 @@
* Distributed File System (HDFS) Federation router. These classes are
designed to work with
* the Hadoop ecosystem, providing utilities and interfaces to perform
non-blocking tasks that
* can improve the performance and responsiveness of HDFS operations.
+ *
+ * <p>These classes work together to enable complex asynchronous workflows,
making it easier to
+ * write code that can handle long-running tasks without blocking, thus
improving the overall
+ * efficiency and scalability of HDFS operations.</p>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
index 15da20fdd11..9b88e3b9956 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil;
+import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
@@ -42,7 +42,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ForkJoinPool;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
index 3519a968c5b..637a4b38ae7 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
@@ -62,7 +62,7 @@ import static org.apache.hadoop.fs.permission.FsAction.ALL;
import static org.apache.hadoop.fs.permission.FsAction.NONE;
import static org.apache.hadoop.fs.permission.FsAction.READ;
import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java
deleted file mode 100644
index 4ad5ccbfee6..00000000000
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncCacheAdmin.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CacheFlag;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
-import org.apache.hadoop.hdfs.server.federation.MockResolver;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
-import org.apache.hadoop.ipc.CallerContext;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
-import static
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestRouterAsyncCacheAdmin {
- private static Configuration routerConf;
- /** Federated HDFS cluster. */
- private static MiniRouterDFSCluster cluster;
- private static String ns0;
-
- /** Random Router for this federated cluster. */
- private MiniRouterDFSCluster.RouterContext router;
- private FileSystem routerFs;
- private RouterRpcServer routerRpcServer;
- private RouterAsyncCacheAdmin asyncCacheAdmin;
-
- @BeforeClass
- public static void setUpCluster() throws Exception {
- cluster = new MiniRouterDFSCluster(true, 1, 2,
- DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
- cluster.setNumDatanodesPerNameservice(3);
-
- cluster.startCluster();
-
- // Making one Namenode active per nameservice
- if (cluster.isHighAvailability()) {
- for (String ns : cluster.getNameservices()) {
- cluster.switchToActive(ns, NAMENODES[0]);
- cluster.switchToStandby(ns, NAMENODES[1]);
- }
- }
- // Start routers with only an RPC service
- routerConf = new RouterConfigBuilder()
- .rpc()
- .build();
-
- // Reduce the number of RPC clients threads to overload the Router easy
- routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
- routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
- routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
- // We decrease the DN cache times to make the test faster
- routerConf.setTimeDuration(
- RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
- cluster.addRouterOverrides(routerConf);
- // Start routers with only an RPC service
- cluster.startRouters();
-
- // Register and verify all NNs with all routers
- cluster.registerNamenodes();
- cluster.waitNamenodeRegistration();
- cluster.waitActiveNamespaces();
- ns0 = cluster.getNameservices().get(0);
- }
-
- @AfterClass
- public static void shutdownCluster() throws Exception {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- @Before
- public void setUp() throws IOException {
- router = cluster.getRandomRouter();
- routerFs = router.getFileSystem();
- routerRpcServer = router.getRouterRpcServer();
- routerRpcServer.initAsyncThreadPool();
- RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
- routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
- routerRpcServer.getRPCMonitor(),
- routerRpcServer.getRouterStateIdContext());
- RouterRpcServer spy = Mockito.spy(routerRpcServer);
- Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
- asyncCacheAdmin = new RouterAsyncCacheAdmin(spy);
-
- // Create mock locations
- MockResolver resolver = (MockResolver)
router.getRouter().getSubclusterResolver();
- resolver.addLocation("/", ns0, "/");
- FSDataOutputStream fsDataOutputStream = routerFs.create(
- new Path("/testCache.file"), true);
- fsDataOutputStream.write(new byte[1024]);
- fsDataOutputStream.close();
- }
-
- @After
- public void tearDown() throws IOException {
- // clear client context
- CallerContext.setCurrent(null);
- boolean delete = routerFs.delete(new Path("/testCache.file"));
- assertTrue(delete);
- if (routerFs != null) {
- routerFs.close();
- }
- }
-
- @Test
- public void testRouterAsyncCacheAdmin() throws Exception {
- asyncCacheAdmin.addCachePool(new CachePoolInfo("pool"));
- syncReturn(null);
-
- CacheDirectiveInfo path = new CacheDirectiveInfo.Builder().
- setPool("pool").
- setPath(new Path("/testCache.file")).
- build();
- asyncCacheAdmin.addCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
- long result = syncReturn(long.class);
- assertEquals(1, result);
-
- asyncCacheAdmin.listCachePools("");
- BatchedEntries<CachePoolEntry> cachePoolEntries =
syncReturn(BatchedEntries.class);
- assertEquals("pool", cachePoolEntries.get(0).getInfo().getPoolName());
-
- CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder().
- setPool("pool").
- build();
- asyncCacheAdmin.listCacheDirectives(0, filter);
- BatchedEntries<CacheDirectiveEntry> cacheDirectiveEntries =
syncReturn(BatchedEntries.class);
- assertEquals(new Path("/testCache.file"),
cacheDirectiveEntries.get(0).getInfo().getPath());
-
- CachePoolInfo pool = new CachePoolInfo("pool").setOwnerName("pool_user");
- asyncCacheAdmin.modifyCachePool(pool);
- syncReturn(null);
-
- asyncCacheAdmin.listCachePools("");
- cachePoolEntries = syncReturn(BatchedEntries.class);
- assertEquals("pool_user",
cachePoolEntries.get(0).getInfo().getOwnerName());
-
- path = new CacheDirectiveInfo.Builder().
- setPool("pool").
- setPath(new Path("/testCache.file")).
- setReplication((short) 2).
- setId(1L).
- build();
- asyncCacheAdmin.modifyCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
- syncReturn(null);
-
- asyncCacheAdmin.listCacheDirectives(0, filter);
- cacheDirectiveEntries = syncReturn(BatchedEntries.class);
- assertEquals(Short.valueOf((short) 2),
cacheDirectiveEntries.get(0).getInfo().getReplication());
-
- asyncCacheAdmin.removeCacheDirective(1L);
- syncReturn(null);
- asyncCacheAdmin.removeCachePool("pool");
- syncReturn(null);
- }
-}
\ No newline at end of file
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java
deleted file mode 100644
index 72dc6815442..00000000000
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
-import org.apache.hadoop.hdfs.server.federation.MockResolver;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.ipc.CallerContext;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
-import static
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Used to test the async functionality of {@link RouterRpcServer}.
- */
-public class TestRouterAsyncRpcServer {
- private static Configuration routerConf;
- /** Federated HDFS cluster. */
- private static MiniRouterDFSCluster cluster;
- private static String ns0;
-
- /** Random Router for this federated cluster. */
- private MiniRouterDFSCluster.RouterContext router;
- private FileSystem routerFs;
- private RouterRpcServer asyncRouterRpcServer;
-
- @BeforeClass
- public static void setUpCluster() throws Exception {
- cluster = new MiniRouterDFSCluster(true, 1, 2,
- DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
- cluster.setNumDatanodesPerNameservice(3);
-
- cluster.startCluster();
-
- // Making one Namenode active per nameservice
- if (cluster.isHighAvailability()) {
- for (String ns : cluster.getNameservices()) {
- cluster.switchToActive(ns, NAMENODES[0]);
- cluster.switchToStandby(ns, NAMENODES[1]);
- }
- }
- // Start routers with only an RPC service
- routerConf = new RouterConfigBuilder()
- .rpc()
- .build();
-
- // Reduce the number of RPC clients threads to overload the Router easy
- routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
- routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
- routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
- // We decrease the DN cache times to make the test faster
- routerConf.setTimeDuration(
- RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
- cluster.addRouterOverrides(routerConf);
- // Start routers with only an RPC service
- cluster.startRouters();
-
- // Register and verify all NNs with all routers
- cluster.registerNamenodes();
- cluster.waitNamenodeRegistration();
- cluster.waitActiveNamespaces();
- ns0 = cluster.getNameservices().get(0);
- }
-
- @AfterClass
- public static void shutdownCluster() throws Exception {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- @Before
- public void setUp() throws IOException {
- router = cluster.getRandomRouter();
- routerFs = router.getFileSystem();
- RouterRpcServer routerRpcServer = router.getRouterRpcServer();
- routerRpcServer.initAsyncThreadPool();
- RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
- routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
- routerRpcServer.getRPCMonitor(),
- routerRpcServer.getRouterStateIdContext());
- asyncRouterRpcServer = Mockito.spy(routerRpcServer);
-
Mockito.when(asyncRouterRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
-
- // Create mock locations
- MockResolver resolver = (MockResolver)
router.getRouter().getSubclusterResolver();
- resolver.addLocation("/", ns0, "/");
- FsPermission permission = new FsPermission("705");
- routerFs.mkdirs(new Path("/testdir"), permission);
- }
-
- @After
- public void tearDown() throws IOException {
- // clear client context
- CallerContext.setCurrent(null);
- boolean delete = routerFs.delete(new Path("/testdir"));
- assertTrue(delete);
- if (routerFs != null) {
- routerFs.close();
- }
- }
-
- /**
- * Test that the async RPC server can invoke a method at an available
Namenode.
- */
- @Test
- public void testInvokeAtAvailableNsAsync() throws Exception {
- RemoteMethod method = new RemoteMethod("getStoragePolicies");
- asyncRouterRpcServer.invokeAtAvailableNsAsync(method,
BlockStoragePolicy[].class);
- BlockStoragePolicy[] storagePolicies =
syncReturn(BlockStoragePolicy[].class);
- assertEquals(8, storagePolicies.length);
- }
-
- /**
- * Test get create location async.
- */
- @Test
- public void testGetCreateLocationAsync() throws Exception {
- final List<RemoteLocation> locations =
- asyncRouterRpcServer.getLocationsForPath("/testdir", true);
- asyncRouterRpcServer.getCreateLocationAsync("/testdir", locations);
- RemoteLocation remoteLocation = syncReturn(RemoteLocation.class);
- assertNotNull(remoteLocation);
- assertEquals(ns0, remoteLocation.getNameserviceId());
- }
-
- /**
- * Test get datanode report async.
- */
- @Test
- public void testGetDatanodeReportAsync() throws Exception {
- asyncRouterRpcServer.getDatanodeReportAsync(
- HdfsConstants.DatanodeReportType.ALL, true, 0);
- DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class);
- assertEquals(3, datanodeInfos.length);
-
- // Get the namespace where the datanode is located
-
asyncRouterRpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL);
- Map<String, DatanodeStorageReport[]> map = syncReturn(Map.class);
- assertEquals(1, map.size());
- assertEquals(3, map.get(ns0).length);
-
- DatanodeInfo[] slowDatanodeReport1 =
- asyncRouterRpcServer.getSlowDatanodeReport(true, 0);
-
- asyncRouterRpcServer.getSlowDatanodeReportAsync(true, 0);
- DatanodeInfo[] slowDatanodeReport2 = syncReturn(DatanodeInfo[].class);
- assertEquals(slowDatanodeReport1, slowDatanodeReport2);
- }
-}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncStoragePolicy.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncStoragePolicy.java
deleted file mode 100644
index 6671d2d1d8d..00000000000
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncStoragePolicy.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
-import org.apache.hadoop.hdfs.server.federation.MockResolver;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.ipc.CallerContext;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
-import static
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestRouterAsyncStoragePolicy {
- private static Configuration routerConf;
- /** Federated HDFS cluster. */
- private static MiniRouterDFSCluster cluster;
- private static String ns0;
-
- /** Random Router for this federated cluster. */
- private MiniRouterDFSCluster.RouterContext router;
- private FileSystem routerFs;
- private RouterRpcServer routerRpcServer;
- private RouterAsyncStoragePolicy asyncStoragePolicy;
-
- private final String testfilePath = "/testdir/testAsyncStoragePolicy.file";
-
- @BeforeClass
- public static void setUpCluster() throws Exception {
- cluster = new MiniRouterDFSCluster(true, 1, 2,
- DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
- cluster.setNumDatanodesPerNameservice(3);
-
- cluster.startCluster();
-
- // Making one Namenode active per nameservice
- if (cluster.isHighAvailability()) {
- for (String ns : cluster.getNameservices()) {
- cluster.switchToActive(ns, NAMENODES[0]);
- cluster.switchToStandby(ns, NAMENODES[1]);
- }
- }
- // Start routers with only an RPC service
- routerConf = new RouterConfigBuilder()
- .rpc()
- .build();
-
- // Reduce the number of RPC clients threads to overload the Router easy
- routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
- routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
- routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
- // We decrease the DN cache times to make the test faster
- routerConf.setTimeDuration(
- RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
- cluster.addRouterOverrides(routerConf);
- // Start routers with only an RPC service
- cluster.startRouters();
-
- // Register and verify all NNs with all routers
- cluster.registerNamenodes();
- cluster.waitNamenodeRegistration();
- cluster.waitActiveNamespaces();
- ns0 = cluster.getNameservices().get(0);
- }
-
- @AfterClass
- public static void shutdownCluster() throws Exception {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- @Before
- public void setUp() throws IOException {
- router = cluster.getRandomRouter();
- routerFs = router.getFileSystem();
- routerRpcServer = router.getRouterRpcServer();
- routerRpcServer.initAsyncThreadPool();
- RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
- routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
- routerRpcServer.getRPCMonitor(),
- routerRpcServer.getRouterStateIdContext());
- RouterRpcServer spy = Mockito.spy(routerRpcServer);
- Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
- asyncStoragePolicy = new RouterAsyncStoragePolicy(spy);
-
- // Create mock locations
- MockResolver resolver = (MockResolver)
router.getRouter().getSubclusterResolver();
- resolver.addLocation("/", ns0, "/");
- FsPermission permission = new FsPermission("705");
- routerFs.mkdirs(new Path("/testdir"), permission);
- FSDataOutputStream fsDataOutputStream = routerFs.create(
- new Path(testfilePath), true);
- fsDataOutputStream.write(new byte[1024]);
- fsDataOutputStream.close();
- }
-
- @After
- public void tearDown() throws IOException {
- // clear client context
- CallerContext.setCurrent(null);
- boolean delete = routerFs.delete(new Path("/testdir"));
- assertTrue(delete);
- if (routerFs != null) {
- routerFs.close();
- }
- }
-
- @Test
- public void testRouterAsyncStoragePolicy() throws Exception {
- BlockStoragePolicy[] storagePolicies = cluster.getNamenodes().get(0)
- .getClient().getStoragePolicies();
- asyncStoragePolicy.getStoragePolicies();
- BlockStoragePolicy[] storagePoliciesAsync =
syncReturn(BlockStoragePolicy[].class);
- assertArrayEquals(storagePolicies, storagePoliciesAsync);
-
- asyncStoragePolicy.getStoragePolicy(testfilePath);
- BlockStoragePolicy blockStoragePolicy1 =
syncReturn(BlockStoragePolicy.class);
-
- asyncStoragePolicy.setStoragePolicy(testfilePath, "COLD");
- syncReturn(null);
- asyncStoragePolicy.getStoragePolicy(testfilePath);
- BlockStoragePolicy blockStoragePolicy2 =
syncReturn(BlockStoragePolicy.class);
- assertNotEquals(blockStoragePolicy1, blockStoragePolicy2);
- assertEquals("COLD", blockStoragePolicy2.getName());
- }
-}
\ No newline at end of file
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
similarity index 97%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
index 86969f16953..cc25516d59f 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncProtocolTestBase.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -25,7 +25,6 @@ import
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
-import org.apache.hadoop.hdfs.server.federation.router.RouterAsyncRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.ipc.CallerContext;
import org.junit.After;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncCacheAdmin.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncCacheAdmin.java
new file mode 100644
index 00000000000..ef82ec267eb
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncCacheAdmin.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncCacheAdmin}.
+ */
+public class TestRouterAsyncCacheAdmin extends RouterAsyncProtocolTestBase {
+ private RouterAsyncCacheAdmin asyncCacheAdmin;
+
+ @Before
+ public void setup() throws IOException {
+ asyncCacheAdmin = new RouterAsyncCacheAdmin(getRouterAsyncRpcServer());
+ FSDataOutputStream fsDataOutputStream = getRouterFs().create(
+ new Path("/testCache.file"), true);
+ fsDataOutputStream.write(new byte[1024]);
+ fsDataOutputStream.close();
+ }
+
+ @Test
+ public void testRouterAsyncCacheAdmin() throws Exception {
+ asyncCacheAdmin.addCachePool(new CachePoolInfo("pool"));
+ syncReturn(null);
+
+ CacheDirectiveInfo path = new CacheDirectiveInfo.Builder().
+ setPool("pool").
+ setPath(new Path("/testCache.file")).
+ build();
+ asyncCacheAdmin.addCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
+ long result = syncReturn(long.class);
+ assertEquals(1, result);
+
+ asyncCacheAdmin.listCachePools("");
+ BatchedEntries<CachePoolEntry> cachePoolEntries =
syncReturn(BatchedEntries.class);
+ assertEquals("pool", cachePoolEntries.get(0).getInfo().getPoolName());
+
+ CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder().
+ setPool("pool").
+ build();
+ asyncCacheAdmin.listCacheDirectives(0, filter);
+ BatchedEntries<CacheDirectiveEntry> cacheDirectiveEntries =
syncReturn(BatchedEntries.class);
+ assertEquals(new Path("/testCache.file"),
cacheDirectiveEntries.get(0).getInfo().getPath());
+
+ CachePoolInfo pool = new CachePoolInfo("pool").setOwnerName("pool_user");
+ asyncCacheAdmin.modifyCachePool(pool);
+ syncReturn(null);
+
+ asyncCacheAdmin.listCachePools("");
+ cachePoolEntries = syncReturn(BatchedEntries.class);
+ assertEquals("pool_user",
cachePoolEntries.get(0).getInfo().getOwnerName());
+
+ path = new CacheDirectiveInfo.Builder().
+ setPool("pool").
+ setPath(new Path("/testCache.file")).
+ setReplication((short) 2).
+ setId(1L).
+ build();
+ asyncCacheAdmin.modifyCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
+ syncReturn(null);
+
+ asyncCacheAdmin.listCacheDirectives(0, filter);
+ cacheDirectiveEntries = syncReturn(BatchedEntries.class);
+ assertEquals(Short.valueOf((short) 2),
cacheDirectiveEntries.get(0).getInfo().getReplication());
+
+ asyncCacheAdmin.removeCacheDirective(1L);
+ syncReturn(null);
+ asyncCacheAdmin.removeCachePool("pool");
+ syncReturn(null);
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
similarity index 97%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
index 047cf6bdb55..86ba2b2aed8 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.CallerContext;
import org.junit.After;
@@ -49,7 +51,7 @@ import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMEN
import static
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncNamenodeProtocol.java
similarity index 98%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncNamenodeProtocol.java
index 86081260536..1814031cfb7 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncNamenodeProtocol.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncNamenodeProtocol.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.junit.Before;
import org.junit.Test;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
similarity index 96%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
index 0b1eeeec0be..ecbf916aaff 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncQuota.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.ipc.CallerContext;
import org.junit.After;
import org.junit.AfterClass;
@@ -44,7 +46,7 @@ import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMEN
import static
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.Assert.assertTrue;
public class TestRouterAsyncQuota {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
similarity index 96%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
index e3429d493dc..e0ce4746cda 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcClient.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -30,6 +30,11 @@ import
org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RetriableException;
@@ -54,7 +59,7 @@ import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMEN
import static
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcServer.java
new file mode 100644
index 00000000000..c022789a2eb
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcServer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Used to test the async functionality of {@link RouterRpcServer}.
+ */
+public class TestRouterAsyncRpcServer extends RouterAsyncProtocolTestBase {
+ private RouterRpcServer asyncRouterRpcServer;
+
+ @Before
+ public void setup() throws IOException {
+ asyncRouterRpcServer = getRouterAsyncRpcServer();
+ }
+
+ /**
+ * Test that the async RPC server can invoke a method at an available
Namenode.
+ */
+ @Test
+ public void testInvokeAtAvailableNsAsync() throws Exception {
+ RemoteMethod method = new RemoteMethod("getStoragePolicies");
+ asyncRouterRpcServer.invokeAtAvailableNsAsync(method,
BlockStoragePolicy[].class);
+ BlockStoragePolicy[] storagePolicies =
syncReturn(BlockStoragePolicy[].class);
+ assertEquals(8, storagePolicies.length);
+ }
+
+ /**
+ * Test get create location async.
+ */
+ @Test
+ public void testGetCreateLocationAsync() throws Exception {
+ final List<RemoteLocation> locations =
+ asyncRouterRpcServer.getLocationsForPath("/testdir", true);
+ asyncRouterRpcServer.getCreateLocationAsync("/testdir", locations);
+ RemoteLocation remoteLocation = syncReturn(RemoteLocation.class);
+ assertNotNull(remoteLocation);
+ assertEquals(getNs0(), remoteLocation.getNameserviceId());
+ }
+
+ /**
+ * Test get datanode report async.
+ */
+ @Test
+ public void testGetDatanodeReportAsync() throws Exception {
+ asyncRouterRpcServer.getDatanodeReportAsync(
+ HdfsConstants.DatanodeReportType.ALL, true, 0);
+ DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class);
+ assertEquals(3, datanodeInfos.length);
+
+ // Get the namespace where the datanode is located
+
asyncRouterRpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL);
+ Map<String, DatanodeStorageReport[]> map = syncReturn(Map.class);
+ assertEquals(1, map.size());
+ assertEquals(3, map.get(getNs0()).length);
+
+ DatanodeInfo[] slowDatanodeReport1 =
+ asyncRouterRpcServer.getSlowDatanodeReport(true, 0);
+
+ asyncRouterRpcServer.getSlowDatanodeReportAsync(true, 0);
+ DatanodeInfo[] slowDatanodeReport2 = syncReturn(DatanodeInfo[].class);
+ assertEquals(slowDatanodeReport1, slowDatanodeReport2);
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncSnapshot.java
similarity index 50%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncSnapshot.java
index 49a682cea4b..a44664ec23d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncSnapshot.java
@@ -15,135 +15,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router;
+package org.apache.hadoop.hdfs.server.federation.router.async;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
-import org.apache.hadoop.hdfs.server.federation.MockResolver;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
-import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.test.LambdaTestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.Mockito;
-
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
import static
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY;
-import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
-import static
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestRouterAsyncSnapshot {
- private static Configuration routerConf;
- /** Federated HDFS cluster. */
- private static MiniRouterDFSCluster cluster;
- private static String ns0;
- /** Random Router for this federated cluster. */
- private MiniRouterDFSCluster.RouterContext router;
+/**
+ * Used to test the functionality of {@link RouterAsyncSnapshot}.
+ */
+public class TestRouterAsyncSnapshot extends RouterAsyncProtocolTestBase {
+ private final String testFile = "/testdir/testSnapshot.file";
private FileSystem routerFs;
- private RouterRpcServer routerRpcServer;
private RouterAsyncSnapshot asyncSnapshot;
- @BeforeClass
- public static void setUpCluster() throws Exception {
- cluster = new MiniRouterDFSCluster(true, 1, 2,
- DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
- cluster.setNumDatanodesPerNameservice(3);
-
- cluster.startCluster();
-
- // Making one Namenode active per nameservice
- if (cluster.isHighAvailability()) {
- for (String ns : cluster.getNameservices()) {
- cluster.switchToActive(ns, NAMENODES[0]);
- cluster.switchToStandby(ns, NAMENODES[1]);
- }
- }
- // Start routers with only an RPC service
- routerConf = new RouterConfigBuilder()
- .rpc()
- .build();
-
- // Reduce the number of RPC clients threads to overload the Router easy
- routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
- routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
- routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
- // We decrease the DN cache times to make the test faster
- routerConf.setTimeDuration(
- RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
- cluster.addRouterOverrides(routerConf);
- // Start routers with only an RPC service
- cluster.startRouters();
-
- // Register and verify all NNs with all routers
- cluster.registerNamenodes();
- cluster.waitNamenodeRegistration();
- cluster.waitActiveNamespaces();
- ns0 = cluster.getNameservices().get(0);
- }
-
- @AfterClass
- public static void shutdownCluster() throws Exception {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
@Before
- public void setUp() throws IOException {
- router = cluster.getRandomRouter();
- routerFs = router.getFileSystem();
- routerRpcServer = router.getRouterRpcServer();
- routerRpcServer.initAsyncThreadPool();
- RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
- routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
- routerRpcServer.getRPCMonitor(),
- routerRpcServer.getRouterStateIdContext());
- RouterRpcServer spy = Mockito.spy(routerRpcServer);
- Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
- asyncSnapshot = new RouterAsyncSnapshot(spy);
-
- // Create mock locations
- MockResolver resolver = (MockResolver)
router.getRouter().getSubclusterResolver();
- resolver.addLocation("/", ns0, "/");
- FsPermission permission = new FsPermission("705");
- routerFs.mkdirs(new Path("/testdir"), permission);
+ public void setup() throws IOException {
+ routerFs = getRouterFs();
+ asyncSnapshot = new RouterAsyncSnapshot(getRouterAsyncRpcServer());
FSDataOutputStream fsDataOutputStream = routerFs.create(
- new Path("/testdir/testSnapshot.file"), true);
+ new Path(testFile), true);
fsDataOutputStream.write(new byte[1024]);
fsDataOutputStream.close();
}
- @After
- public void tearDown() throws IOException {
- // clear client context
- CallerContext.setCurrent(null);
- boolean delete = routerFs.delete(new Path("/testdir"));
- assertTrue(delete);
- if (routerFs != null) {
- routerFs.close();
- }
- }
-
@Test
public void testRouterAsyncSnapshot() throws Exception {
asyncSnapshot.allowSnapshot("/testdir");
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncStoragePolicy.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncStoragePolicy.java
new file mode 100644
index 00000000000..d2afe9ad4af
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncStoragePolicy.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.IOException;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncStoragePolicy}.
+ */
+public class TestRouterAsyncStoragePolicy extends RouterAsyncProtocolTestBase {
+ private final String testfilePath = "/testdir/testAsyncStoragePolicy.file";
+ private RouterAsyncStoragePolicy asyncStoragePolicy;
+
+ @Before
+ public void setup() throws IOException {
+ asyncStoragePolicy = new
RouterAsyncStoragePolicy(getRouterAsyncRpcServer());
+ FSDataOutputStream fsDataOutputStream = getRouterFs().create(
+ new Path(testfilePath), true);
+ fsDataOutputStream.write(new byte[1024]);
+ fsDataOutputStream.close();
+ }
+
+ @Test
+ public void testRouterAsyncStoragePolicy() throws Exception {
+ BlockStoragePolicy[] storagePolicies = getCluster().getNamenodes().get(0)
+ .getClient().getStoragePolicies();
+ asyncStoragePolicy.getStoragePolicies();
+ BlockStoragePolicy[] storagePoliciesAsync =
syncReturn(BlockStoragePolicy[].class);
+ assertArrayEquals(storagePolicies, storagePoliciesAsync);
+
+ asyncStoragePolicy.getStoragePolicy(testfilePath);
+ BlockStoragePolicy blockStoragePolicy1 =
syncReturn(BlockStoragePolicy.class);
+
+ asyncStoragePolicy.setStoragePolicy(testfilePath, "COLD");
+ syncReturn(null);
+ asyncStoragePolicy.getStoragePolicy(testfilePath);
+ BlockStoragePolicy blockStoragePolicy2 =
syncReturn(BlockStoragePolicy.class);
+ assertNotEquals(blockStoragePolicy1, blockStoragePolicy2);
+ assertEquals("COLD", blockStoragePolicy2.getName());
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncUserProtocol.java
similarity index 94%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncUserProtocol.java
index a3fcd6109e5..ce76be9ed7b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/async/TestRouterAsyncUserProtocol.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncUserProtocol.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.async;
+package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Before;
import org.junit.Test;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.Assert.assertArrayEquals;
/**
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncClass.java
similarity index 95%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncClass.java
index e5bf7ce08e8..bfc172edc02 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncClass.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,15 +28,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCurrent;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException;
-import static
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCurrent;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncThrowException;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
/**
* AsyncClass demonstrates the conversion of synchronous methods
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/BaseClass.java
similarity index 97%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/BaseClass.java
index 8d5b5b1dc82..084806d65c1 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/BaseClass.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.List;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/SyncClass.java
similarity index 98%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/SyncClass.java
index e55edb098e1..805b955661d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/SyncClass.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import java.io.IOException;
import java.util.ArrayList;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/TestAsyncUtil.java
similarity index 99%
rename from
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java
rename to
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/TestAsyncUtil.java
index c540af612b9..644f639ac9e 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/TestAsyncUtil.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.federation.router.async;
+package org.apache.hadoop.hdfs.server.federation.router.async.utils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java
index 9b2b5a06588..b73764cbe18 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/TestRouterSecurityManager.java
@@ -132,9 +132,7 @@ public class TestRouterSecurityManager {
// Cancel the delegation token
securityManager.cancelDelegationToken(token);
- String exceptionCause = "Renewal request for unknown token";
exceptionRule.expect(SecretManager.InvalidToken.class);
- exceptionRule.expectMessage(exceptionCause);
// This throws an exception as token has been cancelled.
securityManager.renewDelegationToken(token);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]