This is an automated email from the ASF dual-hosted git repository.
keepromise pushed a commit to branch HDFS-17531
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-17531 by this push:
new c48db62c193 HDFS-17640.[ARR] RouterClientProtocol supports
asynchronous rpc. (#7188)
c48db62c193 is described below
commit c48db62c193fb54784a6aa66e40511437c2d5ff6
Author: hfutatzhanghb <[email protected]>
AuthorDate: Tue Dec 31 15:20:47 2024 +0800
HDFS-17640.[ARR] RouterClientProtocol supports asynchronous rpc. (#7188)
Co-authored-by: Jian Zhang <[email protected]>
---
.../federation/router/RouterClientProtocol.java | 133 ++-
.../federation/router/RouterFederationRename.java | 2 +-
.../server/federation/router/RouterRpcClient.java | 2 +-
.../server/federation/router/RouterRpcServer.java | 20 +-
.../router/async/RouterAsyncClientProtocol.java | 1083 ++++++++++++++++++++
.../router/async/RouterAsyncProtocolTestBase.java | 6 +-
.../async/TestRouterAsyncClientProtocol.java | 144 +++
7 files changed, 1352 insertions(+), 38 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index d5064821905..cab4fad1909 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -85,6 +85,10 @@ import
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import
org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
+import
org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding;
+import
org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncCacheAdmin;
+import
org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncSnapshot;
+import
org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncStoragePolicy;
import
org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -166,7 +170,7 @@ public class RouterClientProtocol implements ClientProtocol
{
/** Router security manager to handle token operations. */
private RouterSecurityManager securityManager = null;
- RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
+ public RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
this.rpcServer = rpcServer;
this.rpcClient = rpcServer.getRPCClient();
this.subclusterResolver = rpcServer.getSubclusterResolver();
@@ -194,10 +198,17 @@ public class RouterClientProtocol implements
ClientProtocol {
this.superGroup = conf.get(
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
- this.erasureCoding = new ErasureCoding(rpcServer);
- this.storagePolicy = new RouterStoragePolicy(rpcServer);
- this.snapshotProto = new RouterSnapshot(rpcServer);
- this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
+ if (rpcServer.isAsync()) {
+ this.erasureCoding = new AsyncErasureCoding(rpcServer);
+ this.storagePolicy = new RouterAsyncStoragePolicy(rpcServer);
+ this.snapshotProto = new RouterAsyncSnapshot(rpcServer);
+ this.routerCacheAdmin = new RouterAsyncCacheAdmin(rpcServer);
+ } else {
+ this.erasureCoding = new ErasureCoding(rpcServer);
+ this.storagePolicy = new RouterStoragePolicy(rpcServer);
+ this.snapshotProto = new RouterSnapshot(rpcServer);
+ this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
+ }
this.securityManager = rpcServer.getRouterSecurityManager();
this.rbfRename = new RouterFederationRename(rpcServer, conf);
this.defaultNameServiceEnabled = conf.getBoolean(
@@ -347,7 +358,7 @@ public class RouterClientProtocol implements ClientProtocol
{
* @throws IOException If this path is not fault tolerant or the exception
* should not be retried (e.g.,
NSQuotaExceededException).
*/
- private List<RemoteLocation> checkFaultTolerantRetry(
+ protected List<RemoteLocation> checkFaultTolerantRetry(
final RemoteMethod method, final String src, final IOException ioe,
final RemoteLocation excludeLoc, final List<RemoteLocation> locations)
throws IOException {
@@ -820,7 +831,7 @@ public class RouterClientProtocol implements ClientProtocol
{
/**
* For {@link #getListing(String,byte[],boolean) GetLisiting} to sort
results.
*/
- private static class GetListingComparator
+ protected static class GetListingComparator
implements Comparator<byte[]>, Serializable {
@Override
public int compare(byte[] o1, byte[] o2) {
@@ -831,6 +842,10 @@ public class RouterClientProtocol implements
ClientProtocol {
private static GetListingComparator comparator =
new GetListingComparator();
+ public static GetListingComparator getComparator() {
+ return comparator;
+ }
+
@Override
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation) throws IOException {
@@ -1104,7 +1119,7 @@ public class RouterClientProtocol implements
ClientProtocol {
return mergeDtanodeStorageReport(dnSubcluster);
}
- private DatanodeStorageReport[] mergeDtanodeStorageReport(
+ protected DatanodeStorageReport[] mergeDtanodeStorageReport(
Map<String, DatanodeStorageReport[]> dnSubcluster) {
// Avoid repeating machines in multiple subclusters
Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
@@ -1335,20 +1350,23 @@ public class RouterClientProtocol implements
ClientProtocol {
}
/**
- * Get all the locations of the path for {@link
this#getContentSummary(String)}.
+ * Get all the locations of the path for {@link
RouterClientProtocol#getContentSummary(String)}.
* For example, there are some mount points:
- * /a -> ns0 -> /a
- * /a/b -> ns0 -> /a/b
- * /a/b/c -> ns1 -> /a/b/c
+ * <p>
+ * /a - [ns0 - /a]
+ * /a/b - [ns0 - /a/b]
+ * /a/b/c - [ns1 - /a/b/c]
+ * </p>
* When the path is '/a', the result of locations should be
* [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
* When the path is '/b', will throw NoLocationException.
+ *
* @param path the path to get content summary
* @return one list contains all the remote location
- * @throws IOException
+ * @throws IOException if an I/O error occurs
*/
@VisibleForTesting
- List<RemoteLocation> getLocationsForContentSummary(String path) throws
IOException {
+ protected List<RemoteLocation> getLocationsForContentSummary(String path)
throws IOException {
// Try to get all the locations of the path.
final Map<String, List<RemoteLocation>> ns2Locations =
getAllLocations(path);
if (ns2Locations.isEmpty()) {
@@ -2039,7 +2057,7 @@ public class RouterClientProtocol implements
ClientProtocol {
* replacement value.
* @throws IOException If the dst paths could not be determined.
*/
- private RemoteParam getRenameDestinations(
+ protected RemoteParam getRenameDestinations(
final List<RemoteLocation> srcLocations,
final List<RemoteLocation> dstLocations) throws IOException {
@@ -2087,7 +2105,7 @@ public class RouterClientProtocol implements
ClientProtocol {
* @param summaries Collection of individual summaries.
* @return Aggregated content summary.
*/
- private ContentSummary aggregateContentSummary(
+ protected ContentSummary aggregateContentSummary(
Collection<ContentSummary> summaries) {
if (summaries.size() == 1) {
return summaries.iterator().next();
@@ -2142,7 +2160,7 @@ public class RouterClientProtocol implements
ClientProtocol {
* everywhere.
* @throws IOException If all the locations throw an exception.
*/
- private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+ protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method) throws IOException {
return getFileInfoAll(locations, method, -1);
}
@@ -2157,7 +2175,7 @@ public class RouterClientProtocol implements
ClientProtocol {
* everywhere.
* @throws IOException If all the locations throw an exception.
*/
- private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+ protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method, long timeOutMs) throws IOException {
// Get the file info from everybody
@@ -2186,12 +2204,11 @@ public class RouterClientProtocol implements
ClientProtocol {
/**
* Get the permissions for the parent of a child with given permissions.
- * Add implicit u+wx permission for parent. This is based on
- * @{FSDirMkdirOp#addImplicitUwx}.
+ * Add implicit u+wx permission for parent. This is based on
FSDirMkdirOp#addImplicitUwx.
* @param mask The permission mask of the child.
* @return The permission mask of the parent.
*/
- private static FsPermission getParentPermission(final FsPermission mask) {
+ protected static FsPermission getParentPermission(final FsPermission mask) {
FsPermission ret = new FsPermission(
mask.getUserAction().or(FsAction.WRITE_EXECUTE),
mask.getGroupAction(),
@@ -2208,7 +2225,7 @@ public class RouterClientProtocol implements
ClientProtocol {
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
- HdfsFileStatus getMountPointStatus(
+ protected HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date) {
return getMountPointStatus(name, childrenNum, date, true);
}
@@ -2223,7 +2240,7 @@ public class RouterClientProtocol implements
ClientProtocol {
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
- HdfsFileStatus getMountPointStatus(
+ protected HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date, boolean setPath) {
long modTime = date;
long accessTime = date;
@@ -2300,7 +2317,7 @@ public class RouterClientProtocol implements
ClientProtocol {
* @param path Name of the path to start checking dates from.
* @return Map with the modification dates for all sub-entries.
*/
- private Map<String, Long> getMountPointDates(String path) {
+ protected Map<String, Long> getMountPointDates(String path) {
Map<String, Long> ret = new TreeMap<>();
if (subclusterResolver instanceof MountTableResolver) {
try {
@@ -2361,9 +2378,15 @@ public class RouterClientProtocol implements
ClientProtocol {
}
/**
- * Get listing on remote locations.
+ * Get a partial listing of the indicated directory.
+ *
+ * @param src the directory name
+ * @param startAfter the name to start after
+ * @param needLocation if blockLocations need to be returned
+ * @return a partial listing starting after startAfter
+ * @throws IOException if other I/O error occurred
*/
- private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
+ protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
String src, byte[] startAfter, boolean needLocation) throws IOException {
try {
List<RemoteLocation> locations =
@@ -2400,9 +2423,9 @@ public class RouterClientProtocol implements
ClientProtocol {
* @param startAfter starting listing from client, used to define listing
* start boundary
* @param remainingEntries how many entries left from subcluster
- * @return
+ * @return true if should add mount point, otherwise false;
*/
- private static boolean shouldAddMountPoint(
+ protected static boolean shouldAddMountPoint(
byte[] mountPoint, byte[] lastEntry, byte[] startAfter,
int remainingEntries) {
if (comparator.compare(mountPoint, startAfter) > 0 &&
@@ -2425,7 +2448,7 @@ public class RouterClientProtocol implements
ClientProtocol {
* @throws IOException if unable to get the file status.
*/
@VisibleForTesting
- boolean isMultiDestDirectory(String src) throws IOException {
+ protected boolean isMultiDestDirectory(String src) throws IOException {
try {
if (rpcServer.isPathAll(src)) {
List<RemoteLocation> locations;
@@ -2449,4 +2472,56 @@ public class RouterClientProtocol implements
ClientProtocol {
public int getRouterFederationRenameCount() {
return rbfRename.getRouterFederationRenameCount();
}
+
+ public RouterRpcServer getRpcServer() {
+ return rpcServer;
+ }
+
+ public RouterRpcClient getRpcClient() {
+ return rpcClient;
+ }
+
+ public FileSubclusterResolver getSubclusterResolver() {
+ return subclusterResolver;
+ }
+
+ public ActiveNamenodeResolver getNamenodeResolver() {
+ return namenodeResolver;
+ }
+
+ public long getServerDefaultsLastUpdate() {
+ return serverDefaultsLastUpdate;
+ }
+
+ public long getServerDefaultsValidityPeriod() {
+ return serverDefaultsValidityPeriod;
+ }
+
+ public boolean isAllowPartialList() {
+ return allowPartialList;
+ }
+
+ public long getMountStatusTimeOut() {
+ return mountStatusTimeOut;
+ }
+
+ public String getSuperUser() {
+ return superUser;
+ }
+
+ public String getSuperGroup() {
+ return superGroup;
+ }
+
+ public RouterStoragePolicy getStoragePolicy() {
+ return storagePolicy;
+ }
+
+ public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) {
+ this.serverDefaultsLastUpdate = serverDefaultsLastUpdate;
+ }
+
+ public RouterFederationRename getRbfRename() {
+ return rbfRename;
+ }
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
index aafb685b886..772e7257888 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
@@ -93,7 +93,7 @@ public class RouterFederationRename {
* @throws IOException if rename fails.
* @return true if rename succeeds.
*/
- boolean routerFedRename(final String src, final String dst,
+ public boolean routerFedRename(final String src, final String dst,
final List<RemoteLocation> srcLocations,
final List<RemoteLocation> dstLocations) throws IOException {
if (!rpcServer.isEnableRenameAcrossNamespace()) {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index e07de092dd7..c7c3699f33e 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -1969,7 +1969,7 @@ public class RouterRpcClient {
* @param nsId namespaceID
* @return whether the 'namespace' has observer reads enabled.
*/
- boolean isNamespaceObserverReadEligible(String nsId) {
+ public boolean isNamespaceObserverReadEligible(String nsId) {
return observerReadEnabledDefault !=
observerReadEnabledOverrides.contains(nsId);
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 39a50d4e3a6..0c1d3dfbdec 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -75,7 +75,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
+import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota;
+import
org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol;
+import
org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol;
import
org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
+import
org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncUserProtocol;
import
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
import
org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
@@ -288,6 +292,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @param fileResolver File resolver to resolve file paths to subclusters.
* @throws IOException If the RPC server could not be created.
*/
+ @SuppressWarnings("checkstyle:MethodLength")
public RouterRpcServer(Configuration conf, Router router,
ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver)
throws IOException {
@@ -424,14 +429,19 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
if (this.enableAsync) {
this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
+ this.clientProto = new RouterAsyncClientProtocol(conf, this);
+ this.nnProto = new RouterAsyncNamenodeProtocol(this);
+ this.routerProto = new RouterAsyncUserProtocol(this);
+ this.quotaCall = new AsyncQuota(this.router, this);
} else {
this.rpcClient = new RouterRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
+ this.clientProto = new RouterClientProtocol(conf, this);
+ this.nnProto = new RouterNamenodeProtocol(this);
+ this.routerProto = new RouterUserProtocol(this);
+ this.quotaCall = new Quota(this.router, this);
}
- this.nnProto = new RouterNamenodeProtocol(this);
- this.quotaCall = new Quota(this.router, this);
- this.clientProto = new RouterClientProtocol(conf, this);
- this.routerProto = new RouterUserProtocol(this);
+
long dnCacheExpire = conf.getTimeDuration(
DN_REPORT_CACHE_EXPIRE,
DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
@@ -2193,7 +2203,7 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
* @param path Path to check.
* @return If a path should be in all subclusters.
*/
- boolean isPathAll(final String path) {
+ public boolean isPathAll(final String path) {
MountTable entry = getMountTable(path);
return entry != null && entry.isAll();
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
new file mode 100644
index 00000000000..ae44f7aaf1d
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
@@ -0,0 +1,1083 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import
org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
+import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
+import
org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static
org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture;
+
+/**
+ * Module that implements all the async RPC calls in {@link ClientProtocol} in
the
+ * {@link RouterRpcServer}.
+ */
+public class RouterAsyncClientProtocol extends RouterClientProtocol {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterAsyncClientProtocol.class.getName());
+
+ private final RouterRpcServer rpcServer;
+ private final RouterRpcClient rpcClient;
+ private final RouterFederationRename rbfRename;
+ private final FileSubclusterResolver subclusterResolver;
+ private final ActiveNamenodeResolver namenodeResolver;
+ /** If it requires response from all subclusters. */
+ private final boolean allowPartialList;
+ /** Time out when getting the mount statistics. */
+ private long mountStatusTimeOut;
+ /** Identifier for the super user. */
+ private String superUser;
+ /** Identifier for the super group. */
+ private final String superGroup;
+ /**
+ * Caching server defaults so as to prevent redundant calls to namenode,
+ * similar to DFSClient, caching saves efforts when router connects
+ * to multiple clients.
+ */
+ private volatile FsServerDefaults serverDefaults;
+
+ public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer
rpcServer) {
+ super(conf, rpcServer);
+ this.rpcServer = rpcServer;
+ this.rpcClient = rpcServer.getRPCClient();
+ this.rbfRename = getRbfRename();
+ this.subclusterResolver = getSubclusterResolver();
+ this.namenodeResolver = getNamenodeResolver();
+ this.allowPartialList = isAllowPartialList();
+ this.mountStatusTimeOut = getMountStatusTimeOut();
+ this.superUser = getSuperUser();
+ this.superGroup = getSuperGroup();
+ }
+
+ @Override
+ public FsServerDefaults getServerDefaults() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+ long now = Time.monotonicNow();
+ if ((serverDefaults == null) || (now - getServerDefaultsLastUpdate()
+ > getServerDefaultsValidityPeriod())) {
+ RemoteMethod method = new RemoteMethod("getServerDefaults");
+ rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class);
+ asyncApply(o -> {
+ serverDefaults = (FsServerDefaults) o;
+ setServerDefaultsLastUpdate(now);
+ return serverDefaults;
+ });
+ } else {
+ asyncComplete(serverDefaults);
+ }
+ return asyncReturn(FsServerDefaults.class);
+ }
+
+ @Override
+ public HdfsFileStatus create(String src, FsPermission masked,
+ String clientName, EnumSetWritable<CreateFlag> flag,
+ boolean createParent, short replication, long blockSize,
+ CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
+ String storagePolicy) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ if (createParent && rpcServer.isPathAll(src)) {
+ int index = src.lastIndexOf(Path.SEPARATOR);
+ String parent = src.substring(0, index);
+ LOG.debug("Creating {} requires creating parent {}", src, parent);
+ FsPermission parentPermissions = getParentPermission(masked);
+ mkdirs(parent, parentPermissions, createParent);
+ asyncApply((ApplyFunction<Boolean, Boolean>) success -> {
+ if (!success) {
+ // This shouldn't happen as mkdirs returns true or exception
+ LOG.error("Couldn't create parents for {}", src);
+ }
+ return success;
+ });
+ }
+
+ RemoteMethod method = new RemoteMethod("create",
+ new Class<?>[] {String.class, FsPermission.class, String.class,
+ EnumSetWritable.class, boolean.class, short.class,
+ long.class, CryptoProtocolVersion[].class,
+ String.class, String.class},
+ new RemoteParam(), masked, clientName, flag, createParent,
+ replication, blockSize, supportedVersions, ecPolicyName,
storagePolicy);
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ final RemoteLocation[] createLocation = new RemoteLocation[1];
+ asyncTry(() -> {
+ rpcServer.getCreateLocationAsync(src, locations);
+ asyncApply((AsyncApplyFunction<RemoteLocation, Object>) remoteLocation
-> {
+ createLocation[0] = remoteLocation;
+ rpcClient.invokeSingle(remoteLocation, method, HdfsFileStatus.class);
+ asyncApply((ApplyFunction<HdfsFileStatus, Object>) status -> {
+ status.setNamespace(remoteLocation.getNameserviceId());
+ return status;
+ });
+ });
+ });
+ asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
+ final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
+ method, src, ioe, createLocation[0], locations);
+ rpcClient.invokeSequential(
+ newLocations, method, HdfsFileStatus.class, null);
+ }, IOException.class);
+
+ return asyncReturn(HdfsFileStatus.class);
+ }
+
+ @Override
+ public LastBlockWithStatus append(
+ String src, String clientName,
+ EnumSetWritable<CreateFlag> flag) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("append",
+ new Class<?>[] {String.class, String.class, EnumSetWritable.class},
+ new RemoteParam(), clientName, flag);
+ rpcClient.invokeSequential(method, locations, LastBlockWithStatus.class,
null);
+ asyncApply((ApplyFunction<RemoteResult, LastBlockWithStatus>) result -> {
+ LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult();
+
lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId());
+ return lbws;
+ });
+ return asyncReturn(LastBlockWithStatus.class);
+ }
+
+ @Deprecated
+ @Override
+ public boolean rename(final String src, final String dst)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> srcLocations =
+ rpcServer.getLocationsForPath(src, true, false);
+ final List<RemoteLocation> dstLocations =
+ rpcServer.getLocationsForPath(dst, false, false);
+ // srcLocations may be trimmed by getRenameDestinations()
+ final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
+ RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
+ if (locs.isEmpty()) {
+ asyncComplete(
+ rbfRename.routerFedRename(src, dst, srcLocations, dstLocations));
+ return asyncReturn(Boolean.class);
+ }
+ RemoteMethod method = new RemoteMethod("rename",
+ new Class<?>[] {String.class, String.class},
+ new RemoteParam(), dstParam);
+ isMultiDestDirectory(src);
+ asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> {
+ if (isMultiDestDirectory) {
+ if (locs.size() != srcLocations.size()) {
+ throw new IOException("Rename of " + src + " to " + dst + " is not"
+ + " allowed. The number of remote locations for both source and"
+ + " target should be same.");
+ }
+ rpcClient.invokeAll(locs, method);
+ } else {
+ rpcClient.invokeSequential(locs, method, Boolean.class,
+ Boolean.TRUE);
+ }
+ });
+ return asyncReturn(Boolean.class);
+ }
+
+ @Override
+ public void rename2(
+ final String src, final String dst,
+ final Options.Rename... options) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> srcLocations =
+ rpcServer.getLocationsForPath(src, true, false);
+ final List<RemoteLocation> dstLocations =
+ rpcServer.getLocationsForPath(dst, false, false);
+ // srcLocations may be trimmed by getRenameDestinations()
+ final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
+ RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
+ if (locs.isEmpty()) {
+ rbfRename.routerFedRename(src, dst, srcLocations, dstLocations);
+ return;
+ }
+ RemoteMethod method = new RemoteMethod("rename2",
+ new Class<?>[] {String.class, String.class, options.getClass()},
+ new RemoteParam(), dstParam, options);
+ isMultiDestDirectory(src);
+ asyncApply((AsyncApplyFunction<Boolean, Boolean>) isMultiDestDirectory -> {
+ if (isMultiDestDirectory) {
+ if (locs.size() != srcLocations.size()) {
+ throw new IOException("Rename of " + src + " to " + dst + " is not"
+ + " allowed. The number of remote locations for both source and"
+ + " target should be same.");
+ }
+ rpcClient.invokeConcurrent(locs, method);
+ } else {
+ rpcClient.invokeSequential(locs, method, null, null);
+ }
+ });
+ }
+
+ @Override
+ public void concat(String trg, String[] src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // Concat only effects when all files in the same namespace.
+ getFileRemoteLocation(trg);
+ asyncApply((AsyncApplyFunction<RemoteLocation, Object>) targetDestination
-> {
+ if (targetDestination == null) {
+ throw new IOException("Cannot find target file - " + trg);
+ }
+ String targetNameService = targetDestination.getNameserviceId();
+ String[] sourceDestinations = new String[src.length];
+ int[] index = new int[1];
+ asyncForEach(Arrays.stream(src).iterator(), (forEachRun, sourceFile) -> {
+ getFileRemoteLocation(sourceFile);
+ asyncApply((ApplyFunction<RemoteLocation, Object>) srcLocation -> {
+ if (srcLocation == null) {
+ throw new IOException("Cannot find source file - " + sourceFile);
+ }
+ sourceDestinations[index[0]++] = srcLocation.getDest();
+ if (!targetNameService.equals(srcLocation.getNameserviceId())) {
+ throw new IOException("Cannot concatenate source file " +
sourceFile
+ + " because it is located in a different namespace" + " with
nameservice "
+ + srcLocation.getNameserviceId() + " from the target file with
nameservice "
+ + targetNameService);
+ }
+ return null;
+ });
+ });
+ asyncApply((AsyncApplyFunction<Object, Object>) o -> {
+ // Invoke
+ RemoteMethod method = new RemoteMethod("concat",
+ new Class<?>[] {String.class, String[].class},
+ targetDestination.getDest(), sourceDestinations);
+ rpcClient.invokeSingle(targetDestination, method, Void.class);
+ });
+ });
+ }
+
+ @Override
+ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("mkdirs",
+ new Class<?>[] {String.class, FsPermission.class, boolean.class},
+ new RemoteParam(), masked, createParent);
+
+ // Create in all locations
+ if (rpcServer.isPathAll(src)) {
+ return rpcClient.invokeAll(locations, method);
+ }
+
+ asyncComplete(false);
+ if (locations.size() > 1) {
+ // Check if this directory already exists
+ asyncTry(() -> {
+ getFileInfo(src);
+ asyncApply((ApplyFunction<HdfsFileStatus, Boolean>) fileStatus -> {
+ if (fileStatus != null) {
+ // When existing, the NN doesn't return an exception; return true
+ return true;
+ }
+ return false;
+ });
+ });
+ asyncCatch((ret, ex) -> {
+ // Can't query if this file exists or not.
+ LOG.error("Error getting file info for {} while proxying mkdirs: {}",
+ src, ex.getMessage());
+ return false;
+ }, IOException.class);
+ }
+
+ final RemoteLocation firstLocation = locations.get(0);
+ asyncApply((AsyncApplyFunction<Boolean, Boolean>) success -> {
+ if (success) {
+ asyncComplete(true);
+ return;
+ }
+ asyncTry(() -> {
+ rpcClient.invokeSingle(firstLocation, method, Boolean.class);
+ });
+
+ asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
+ final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
+ method, src, ioe, firstLocation, locations);
+ rpcClient.invokeSequential(
+ newLocations, method, Boolean.class, Boolean.TRUE);
+ }, IOException.class);
+ });
+
+ return asyncReturn(Boolean.class);
+ }
+
+ @Override
+ public DirectoryListing getListing(
+ String src, byte[] startAfter, boolean needLocation) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+ GetListingComparator comparator = RouterClientProtocol.getComparator();
+ getListingInt(src, startAfter, needLocation);
+ asyncApply((AsyncApplyFunction<List<RemoteResult<RemoteLocation,
DirectoryListing>>, Object>)
+ listings -> {
+ TreeMap<byte[], HdfsFileStatus> nnListing = new TreeMap<>(comparator);
+ int totalRemainingEntries = 0;
+ final int[] remainingEntries = {0};
+ boolean namenodeListingExists = false;
+ // Check the subcluster listing with the smallest name to make sure
+ // no file is skipped across subclusters
+ byte[] lastName = null;
+ if (listings != null) {
+ for (RemoteResult<RemoteLocation, DirectoryListing> result :
listings) {
+ if (result.hasException()) {
+ IOException ioe = result.getException();
+ if (ioe instanceof FileNotFoundException) {
+ RemoteLocation location = result.getLocation();
+ LOG.debug("Cannot get listing from {}", location);
+ } else if (!allowPartialList) {
+ throw ioe;
+ }
+ } else if (result.getResult() != null) {
+ DirectoryListing listing = result.getResult();
+ totalRemainingEntries += listing.getRemainingEntries();
+ HdfsFileStatus[] partialListing = listing.getPartialListing();
+ int length = partialListing.length;
+ if (length > 0) {
+ HdfsFileStatus lastLocalEntry = partialListing[length-1];
+ byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes();
+ if (lastName == null ||
+ comparator.compare(lastName, lastLocalName) > 0) {
+ lastName = lastLocalName;
+ }
+ }
+ }
+ }
+
+ // Add existing entries
+ for (RemoteResult<RemoteLocation, DirectoryListing> result :
listings) {
+ DirectoryListing listing = result.getResult();
+ if (listing != null) {
+ namenodeListingExists = true;
+ for (HdfsFileStatus file : listing.getPartialListing()) {
+ byte[] filename = file.getLocalNameInBytes();
+ if (totalRemainingEntries > 0 &&
+ comparator.compare(filename, lastName) > 0) {
+ // Discarding entries further than the lastName
+ remainingEntries[0]++;
+ } else {
+ nnListing.put(filename, file);
+ }
+ }
+ remainingEntries[0] += listing.getRemainingEntries();
+ }
+ }
+ }
+
+ // Add mount points at this level in the tree
+ final List<String> children = subclusterResolver.getMountPoints(src);
+ if (children != null) {
+ // Get the dates for each mount point
+ Map<String, Long> dates = getMountPointDates(src);
+ byte[] finalLastName = lastName;
+ asyncForEach(children.iterator(), (forEachRun, child) -> {
+ long date = 0;
+ if (dates != null && dates.containsKey(child)) {
+ date = dates.get(child);
+ }
+ Path childPath = new Path(src, child);
+ getMountPointStatus(childPath.toString(), 0, date);
+ asyncApply((ApplyFunction<HdfsFileStatus, Object>) dirStatus -> {
+ // if there is no subcluster path, always add mount point
+ byte[] bChild = DFSUtil.string2Bytes(child);
+ if (finalLastName == null) {
+ nnListing.put(bChild, dirStatus);
+ } else {
+ if (shouldAddMountPoint(bChild,
+ finalLastName, startAfter, remainingEntries[0])) {
+ // This may overwrite existing listing entries with the
mount point
+ // TODO don't add if already there?
+ nnListing.put(bChild, dirStatus);
+ }
+ }
+ return null;
+ });
+ });
+ asyncApply(o -> {
+ // Update the remaining count to include left mount points
+ if (nnListing.size() > 0) {
+ byte[] lastListing = nnListing.lastKey();
+ for (int i = 0; i < children.size(); i++) {
+ byte[] bChild = DFSUtil.string2Bytes(children.get(i));
+ if (comparator.compare(bChild, lastListing) > 0) {
+ remainingEntries[0] += (children.size() - i);
+ break;
+ }
+ }
+ }
+ return null;
+ });
+ }
+ asyncComplete(namenodeListingExists);
+ asyncApply((ApplyFunction<Boolean, Object>) exists -> {
+ if (!exists && nnListing.size() == 0 && children == null) {
+ // NN returns a null object if the directory cannot be found and
has no
+ // listing. If we didn't retrieve any NN listing data, and there
are no
+ // mount points here, return null.
+ return null;
+ }
+
+ // Generate combined listing
+ HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()];
+ combinedData = nnListing.values().toArray(combinedData);
+ return new DirectoryListing(combinedData, remainingEntries[0]);
+ });
+ });
+ return asyncReturn(DirectoryListing.class);
+ }
+
+ /**
+ * Get listing on remote locations.
+ */
+ @Override
+ protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
+ String src, byte[] startAfter, boolean needLocation) throws IOException {
+ List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false, false);
+ // Locate the dir and fetch the listing.
+ if (locations.isEmpty()) {
+ asyncComplete(new ArrayList<>());
+ return asyncReturn(List.class);
+ }
+ asyncTry(() -> {
+ RemoteMethod method = new RemoteMethod("getListing",
+ new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
+ new RemoteParam(), startAfter, needLocation);
+ rpcClient.invokeConcurrent(locations, method, false, -1,
+ DirectoryListing.class);
+ });
+ asyncCatch((CatchFunction<List, RouterResolveException>) (o, e) -> {
+ LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage());
+ LOG.info("Cannot get locations for {}, {}.", src, e.getMessage());
+ return new ArrayList<>();
+ }, RouterResolveException.class);
+ return asyncReturn(List.class);
+ }
+
+
+ @Override
+ public HdfsFileStatus getFileInfo(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ final IOException[] noLocationException = new IOException[1];
+ asyncTry(() -> {
+ final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false, false);
+ RemoteMethod method = new RemoteMethod("getFileInfo",
+ new Class<?>[] {String.class}, new RemoteParam());
+ // If it's a directory, we check in all locations
+ if (rpcServer.isPathAll(src)) {
+ getFileInfoAll(locations, method);
+ } else {
+ // Check for file information sequentially
+ rpcClient.invokeSequential(locations, method, HdfsFileStatus.class,
null);
+ }
+ });
+ asyncCatch((o, e) -> {
+ if (e instanceof NoLocationException
+ || e instanceof RouterResolveException) {
+ noLocationException[0] = e;
+ }
+ throw e;
+ }, IOException.class);
+
+ asyncApply((AsyncApplyFunction<HdfsFileStatus, Object>) ret -> {
+ // If there is no real path, check mount points
+ if (ret == null) {
+ List<String> children = subclusterResolver.getMountPoints(src);
+ if (children != null && !children.isEmpty()) {
+ Map<String, Long> dates = getMountPointDates(src);
+ long date = 0;
+ if (dates != null && dates.containsKey(src)) {
+ date = dates.get(src);
+ }
+ getMountPointStatus(src, children.size(), date, false);
+ } else if (children != null) {
+ // The src is a mount point, but there are no files or directories
+ getMountPointStatus(src, 0, 0, false);
+ } else {
+ asyncComplete(null);
+ }
+ asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) result -> {
+ // Can't find mount point for path and the path didn't contain any
sub monit points,
+ // throw the NoLocationException to client.
+ if (result == null && noLocationException[0] != null) {
+ throw noLocationException[0];
+ }
+
+ return result;
+ });
+ } else {
+ asyncComplete(ret);
+ }
+ });
+
+ return asyncReturn(HdfsFileStatus.class);
+ }
+
+ @Override
+ public RemoteLocation getFileRemoteLocation(String path) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ if (locations.size() == 1) {
+ asyncComplete(locations.get(0));
+ return asyncReturn(RemoteLocation.class);
+ }
+
+ asyncForEach(locations.iterator(), (forEachRun, location) -> {
+ RemoteMethod method =
+ new RemoteMethod("getFileInfo", new Class<?>[] {String.class}, new
RemoteParam());
+ rpcClient.invokeSequential(Collections.singletonList(location), method,
+ HdfsFileStatus.class, null);
+ asyncApply((ApplyFunction<HdfsFileStatus, RemoteLocation>) ret -> {
+ if (ret != null) {
+ forEachRun.breakNow();
+ return location;
+ }
+ return null;
+ });
+ });
+
+ return asyncReturn(RemoteLocation.class);
+ }
+
+ @Override
+ public HdfsFileStatus getMountPointStatus(
+ String name, int childrenNum, long date, boolean setPath) {
+ long modTime = date;
+ long accessTime = date;
+ final FsPermission[] permission = new
FsPermission[]{FsPermission.getDirDefault()};
+ final String[] owner = new String[]{this.superUser};
+ final String[] group = new String[]{this.superGroup};
+ final int[] childrenNums = new int[]{childrenNum};
+ final EnumSet<HdfsFileStatus.Flags>[] flags =
+ new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)};
+ asyncComplete(null);
+ if (getSubclusterResolver() instanceof MountTableResolver) {
+ asyncTry(() -> {
+ String mName = name.startsWith("/") ? name : "/" + name;
+ MountTableResolver mountTable = (MountTableResolver)
subclusterResolver;
+ MountTable entry = mountTable.getMountPoint(mName);
+ if (entry != null) {
+ permission[0] = entry.getMode();
+ owner[0] = entry.getOwnerName();
+ group[0] = entry.getGroupName();
+
+ RemoteMethod method = new RemoteMethod("getFileInfo",
+ new Class<?>[] {String.class}, new RemoteParam());
+ getFileInfoAll(
+ entry.getDestinations(), method, mountStatusTimeOut);
+ asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) fInfo -> {
+ if (fInfo != null) {
+ permission[0] = fInfo.getPermission();
+ owner[0] = fInfo.getOwner();
+ group[0] = fInfo.getGroup();
+ childrenNums[0] = fInfo.getChildrenNum();
+ flags[0] = DFSUtil
+ .getFlags(fInfo.isEncrypted(), fInfo.isErasureCoded(),
+ fInfo.isSnapshotEnabled(), fInfo.hasAcl());
+ }
+ return fInfo;
+ });
+ }
+ });
+ asyncCatch((CatchFunction<HdfsFileStatus, IOException>) (status, e) -> {
+ LOG.error("Cannot get mount point: {}", e.getMessage());
+ return status;
+ }, IOException.class);
+ } else {
+ try {
+ UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ owner[0] = ugi.getUserName();
+ group[0] = ugi.getPrimaryGroupName();
+ } catch (IOException e) {
+ String msg = "Cannot get remote user: " + e.getMessage();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.error(msg);
+ } else {
+ LOG.debug(msg);
+ }
+ }
+ }
+ long inodeId = 0;
+ HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder();
+ asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) status -> {
+ if (setPath) {
+ Path path = new Path(name);
+ String nameStr = path.getName();
+ builder.path(DFSUtil.string2Bytes(nameStr));
+ }
+
+ return builder.isdir(true)
+ .mtime(modTime)
+ .atime(accessTime)
+ .perm(permission[0])
+ .owner(owner[0])
+ .group(group[0])
+ .symlink(new byte[0])
+ .fileId(inodeId)
+ .children(childrenNums[0])
+ .flags(flags[0])
+ .build();
+ });
+ return asyncReturn(HdfsFileStatus.class);
+ }
+
+ @Override
+ protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+ final RemoteMethod method, long timeOutMs) throws IOException {
+
+ asyncComplete(null);
+ // Get the file info from everybody
+ rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
+ HdfsFileStatus.class);
+ asyncApply(res -> {
+ Map<RemoteLocation, HdfsFileStatus> results = (Map<RemoteLocation,
HdfsFileStatus>) res;
+ int children = 0;
+ // We return the first file
+ HdfsFileStatus dirStatus = null;
+ for (RemoteLocation loc : locations) {
+ HdfsFileStatus fileStatus = results.get(loc);
+ if (fileStatus != null) {
+ children += fileStatus.getChildrenNum();
+ if (!fileStatus.isDirectory()) {
+ return fileStatus;
+ } else if (dirStatus == null) {
+ dirStatus = fileStatus;
+ }
+ }
+ }
+ if (dirStatus != null) {
+ return updateMountPointStatus(dirStatus, children);
+ }
+ return null;
+ });
+ return asyncReturn(HdfsFileStatus.class);
+ }
+
+ @Override
+ public boolean recoverLease(String src, String clientName) throws
IOException {
+ super.recoverLease(src, clientName);
+ return asyncReturn(boolean.class);
+ }
+
+ @Override
+ public long[] getStats() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("getStats");
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
+ asyncApply(o -> {
+ Map<FederationNamespaceInfo, long[]> results
+ = (Map<FederationNamespaceInfo, long[]>) o;
+ long[] combinedData = new long[STATS_ARRAY_LENGTH];
+ for (long[] data : results.values()) {
+ for (int i = 0; i < combinedData.length && i < data.length; i++) {
+ if (data[i] >= 0) {
+ combinedData[i] += data[i];
+ }
+ }
+ }
+ return combinedData;
+ });
+ return asyncReturn(long[].class);
+ }
+
+ @Override
+ public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method = new RemoteMethod("getReplicatedBlockStats");
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true,
+ false, ReplicatedBlockStats.class);
+ asyncApply(o -> {
+ Map<FederationNamespaceInfo, ReplicatedBlockStats> ret =
+ (Map<FederationNamespaceInfo, ReplicatedBlockStats>) o;
+ return ReplicatedBlockStats.merge(ret.values());
+ });
+ return asyncReturn(ReplicatedBlockStats.class);
+ }
+
+ @Override
+ public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType
type)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+ return rpcServer.getDatanodeReportAsync(type, true, 0);
+ }
+
+ @Override
+ public DatanodeStorageReport[] getDatanodeStorageReport(
+ HdfsConstants.DatanodeReportType type) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ rpcServer.getDatanodeStorageReportMapAsync(type);
+ asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>,
DatanodeStorageReport[]>)
+ dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster));
+ return asyncReturn(DatanodeStorageReport[].class);
+ }
+
+ public DatanodeStorageReport[] getDatanodeStorageReport(
+ HdfsConstants.DatanodeReportType type, boolean requireResponse, long
timeOutMs)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ rpcServer.getDatanodeStorageReportMapAsync(type, requireResponse,
timeOutMs);
+ asyncApply((ApplyFunction< Map<String, DatanodeStorageReport[]>,
DatanodeStorageReport[]>)
+ dnSubcluster -> mergeDtanodeStorageReport(dnSubcluster));
+ return asyncReturn(DatanodeStorageReport[].class);
+ }
+
+ @Override
+ public boolean setSafeMode(HdfsConstants.SafeModeAction action,
+ boolean isChecked) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // Set safe mode in all the name spaces
+ RemoteMethod method = new RemoteMethod("setSafeMode",
+ new Class<?>[] {HdfsConstants.SafeModeAction.class, boolean.class},
+ action, isChecked);
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(
+ nss, method, true, !isChecked, Boolean.class);
+
+ asyncApply(o -> {
+ Map<FederationNamespaceInfo, Boolean> results
+ = (Map<FederationNamespaceInfo, Boolean>) o;
+ // We only report true if all the name space are in safe mode
+ int numSafemode = 0;
+ for (boolean safemode : results.values()) {
+ if (safemode) {
+ numSafemode++;
+ }
+ }
+ return numSafemode == results.size();
+ });
+ return asyncReturn(Boolean.class);
+ }
+
+ @Override
+ public boolean saveNamespace(long timeWindow, long txGap) throws IOException
{
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("saveNamespace",
+ new Class<?>[] {long.class, long.class}, timeWindow, txGap);
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true,
+ false, boolean.class);
+
+ asyncApply(o -> {
+ Map<FederationNamespaceInfo, Boolean> ret =
+ (Map<FederationNamespaceInfo, Boolean>) o;
+ boolean success = true;
+ for (boolean s : ret.values()) {
+ if (!s) {
+ success = false;
+ break;
+ }
+ }
+ return success;
+ });
+ return asyncReturn(Boolean.class);
+ }
+
+ @Override
+ public long rollEdits() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true, false, long.class);
+ asyncApply(o -> {
+ Map<FederationNamespaceInfo, Long> ret =
+ (Map<FederationNamespaceInfo, Long>) o;
+ // Return the maximum txid
+ long txid = 0;
+ for (long t : ret.values()) {
+ if (t > txid) {
+ txid = t;
+ }
+ }
+ return txid;
+ });
+ return asyncReturn(long.class);
+ }
+
+ @Override
+ public boolean restoreFailedStorage(String arg) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("restoreFailedStorage",
+ new Class<?>[] {String.class}, arg);
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
+ asyncApply(o -> {
+ Map<FederationNamespaceInfo, Boolean> ret =
+ (Map<FederationNamespaceInfo, Boolean>) o;
+ boolean success = true;
+ for (boolean s : ret.values()) {
+ if (!s) {
+ success = false;
+ break;
+ }
+ }
+ return success;
+ });
+ return asyncReturn(boolean.class);
+ }
+
+ @Override
+ public RollingUpgradeInfo rollingUpgrade(HdfsConstants.RollingUpgradeAction
action)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method = new RemoteMethod("rollingUpgrade",
+ new Class<?>[] {HdfsConstants.RollingUpgradeAction.class}, action);
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+
+ rpcClient.invokeConcurrent(
+ nss, method, true, false, RollingUpgradeInfo.class);
+ asyncApply(o -> {
+ Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
+ (Map<FederationNamespaceInfo, RollingUpgradeInfo>) o;
+ // Return the first rolling upgrade info
+ RollingUpgradeInfo info = null;
+ for (RollingUpgradeInfo infoNs : ret.values()) {
+ if (info == null && infoNs != null) {
+ info = infoNs;
+ }
+ }
+ return info;
+ });
+ return asyncReturn(RollingUpgradeInfo.class);
+ }
+
+ @Override
+ public ContentSummary getContentSummary(String path) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ // Get the summaries from regular files
+ final Collection<ContentSummary> summaries = new ArrayList<>();
+ final List<RemoteLocation> locations = getLocationsForContentSummary(path);
+ final RemoteMethod method = new RemoteMethod("getContentSummary",
+ new Class<?>[] {String.class}, new RemoteParam());
+ rpcClient.invokeConcurrent(locations, method,
+ false, -1, ContentSummary.class);
+
+ asyncApply(o -> {
+ final List<RemoteResult<RemoteLocation, ContentSummary>> results =
+ (List<RemoteResult<RemoteLocation, ContentSummary>>) o;
+
+ FileNotFoundException notFoundException = null;
+ for (RemoteResult<RemoteLocation, ContentSummary> result : results) {
+ if (result.hasException()) {
+ IOException ioe = result.getException();
+ if (ioe instanceof FileNotFoundException) {
+ notFoundException = (FileNotFoundException)ioe;
+ } else if (!allowPartialList) {
+ throw ioe;
+ }
+ } else if (result.getResult() != null) {
+ summaries.add(result.getResult());
+ }
+ }
+
+ // Throw original exception if no original nor mount points
+ if (summaries.isEmpty() && notFoundException != null) {
+ throw notFoundException;
+ }
+ return aggregateContentSummary(summaries);
+ });
+
+ return asyncReturn(ContentSummary.class);
+ }
+
+ @Override
+ public long getCurrentEditLogTxid() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method = new RemoteMethod(
+ "getCurrentEditLogTxid", new Class<?>[] {});
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true, false, long.class);
+
+ asyncApply(o -> {
+ Map<FederationNamespaceInfo, Long> ret =
+ (Map<FederationNamespaceInfo, Long>) o;
+ // Return the maximum txid
+ long txid = 0;
+ for (long t : ret.values()) {
+ if (t > txid) {
+ txid = t;
+ }
+ }
+ return txid;
+ });
+ return asyncReturn(long.class);
+ }
+
+ @Override
+ public void msync() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ // Only msync to nameservices with observer reads enabled.
+ Set<FederationNamespaceInfo> allNamespaces =
namenodeResolver.getNamespaces();
+ RemoteMethod method = new RemoteMethod("msync");
+ Set<FederationNamespaceInfo> namespacesEligibleForObserverReads =
allNamespaces
+ .stream()
+ .filter(ns ->
rpcClient.isNamespaceObserverReadEligible(ns.getNameserviceId()))
+ .collect(Collectors.toSet());
+ if (namespacesEligibleForObserverReads.isEmpty()) {
+ asyncCompleteWith(CompletableFuture.completedFuture(null));
+ return;
+ }
+ rpcClient.invokeConcurrent(namespacesEligibleForObserverReads, method);
+ }
+
+ @Override
+ public boolean setReplication(String src, short replication)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("setReplication",
+ new Class<?>[] {String.class, short.class}, new RemoteParam(),
+ replication);
+ if (rpcServer.isInvokeConcurrent(src)) {
+ rpcClient.invokeConcurrent(locations, method, Boolean.class);
+ asyncApply(o -> {
+ Map<RemoteLocation, Boolean> results = (Map<RemoteLocation, Boolean>)
o;
+ return !results.containsValue(false);
+ });
+ } else {
+ rpcClient.invokeSequential(locations, method, Boolean.class,
+ Boolean.TRUE);
+ }
+ return asyncReturn(boolean.class);
+ }
+
+ /**
+ * Checks if the path is a directory and is supposed to be present in all
+ * subclusters.
+ * @param src the source path
+ * @return true if the path is directory and is supposed to be present in all
+ * subclusters else false in all other scenarios.
+ * @throws IOException if unable to get the file status.
+ */
+ @Override
+ public boolean isMultiDestDirectory(String src) throws IOException {
+ try {
+ if (rpcServer.isPathAll(src)) {
+ List<RemoteLocation> locations;
+ locations = rpcServer.getLocationsForPath(src, false, false);
+ RemoteMethod method = new RemoteMethod("getFileInfo",
+ new Class<?>[] {String.class}, new RemoteParam());
+ rpcClient.invokeSequential(locations,
+ method, HdfsFileStatus.class, null);
+ CompletableFuture<Object> completableFuture = getCompletableFuture();
+ completableFuture = completableFuture.thenApply(o -> {
+ HdfsFileStatus fileStatus = (HdfsFileStatus) o;
+ if (fileStatus != null) {
+ return fileStatus.isDirectory();
+ } else {
+ LOG.debug("The destination {} doesn't exist.", src);
+ }
+ return false;
+ });
+ asyncCompleteWith(completableFuture);
+ return asyncReturn(Boolean.class);
+ }
+ } catch (UnresolvedPathException e) {
+ LOG.debug("The destination {} is a symlink.", src);
+ }
+ asyncCompleteWith(CompletableFuture.completedFuture(false));
+ return asyncReturn(Boolean.class);
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
index cc25516d59f..51a3a1b9c28 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
@@ -56,6 +56,7 @@ public class RouterAsyncProtocolTestBase {
private FileSystem routerFs;
private RouterRpcServer routerRpcServer;
private RouterRpcServer routerAsyncRpcServer;
+ protected static final String TEST_DIR_PATH = "/testdir";
@BeforeClass
public static void setUpCluster() throws Exception {
@@ -114,19 +115,20 @@ public class RouterAsyncProtocolTestBase {
routerRpcServer.getRouterStateIdContext());
routerAsyncRpcServer = Mockito.spy(routerRpcServer);
Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
+ Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true);
// Create mock locations
MockResolver resolver = (MockResolver)
router.getRouter().getSubclusterResolver();
resolver.addLocation("/", ns0, "/");
FsPermission permission = new FsPermission("705");
- routerFs.mkdirs(new Path("/testdir"), permission);
+ routerFs.mkdirs(new Path(TEST_DIR_PATH), permission);
}
@After
public void tearDown() throws IOException {
// clear client context
CallerContext.setCurrent(null);
- boolean delete = routerFs.delete(new Path("/testdir"));
+ boolean delete = routerFs.delete(new Path(TEST_DIR_PATH));
assertTrue(delete);
if (routerFs != null) {
routerFs.close();
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java
new file mode 100644
index 00000000000..96f957f93df
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncClientProtocol.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.util.Lists;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.apache.hadoop.crypto.CryptoProtocolVersion.ENCRYPTION_ZONES;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+import static org.apache.hadoop.fs.permission.FsAction.NONE;
+import static org.apache.hadoop.fs.permission.FsAction.READ;
+import static org.apache.hadoop.fs.permission.FsAction.READ_WRITE;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
+import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Used to test the functionality of {@link RouterAsyncClientProtocol}.
+ */
+public class TestRouterAsyncClientProtocol extends RouterAsyncProtocolTestBase
{
+ private RouterAsyncClientProtocol asyncClientProtocol;
+ private RouterClientProtocol clientProtocol;
+ private final String testPath = TEST_DIR_PATH + "/test";
+
+ @Before
+ public void setup() throws IOException {
+ asyncClientProtocol = new RouterAsyncClientProtocol(getRouterConf(),
getRouterAsyncRpcServer());
+ clientProtocol = new RouterClientProtocol(getRouterConf(),
getRouterRpcServer());
+ }
+
+ @Test
+ public void testGetServerDefaults() throws Exception {
+ FsServerDefaults serverDefaults = clientProtocol.getServerDefaults();
+ asyncClientProtocol.getServerDefaults();
+ FsServerDefaults fsServerDefaults = syncReturn(FsServerDefaults.class);
+ assertEquals(serverDefaults.getBlockSize(),
fsServerDefaults.getBlockSize());
+ assertEquals(serverDefaults.getReplication(),
fsServerDefaults.getReplication());
+ assertEquals(serverDefaults.getChecksumType(),
fsServerDefaults.getChecksumType());
+ assertEquals(
+ serverDefaults.getDefaultStoragePolicyId(),
fsServerDefaults.getDefaultStoragePolicyId());
+ }
+
+ @Test
+ public void testClientProtocolRpc() throws Exception {
+ asyncClientProtocol.mkdirs(testPath, new FsPermission(ALL, ALL, ALL),
false);
+ Boolean success = syncReturn(Boolean.class);
+ assertTrue(success);
+
+ asyncClientProtocol.setPermission(testPath, new FsPermission(READ_WRITE,
READ, NONE));
+ syncReturn(Void.class);
+
+ asyncClientProtocol.getFileInfo(testPath);
+ HdfsFileStatus hdfsFileStatus = syncReturn(HdfsFileStatus.class);
+ assertEquals(hdfsFileStatus.getPermission(), new FsPermission(READ_WRITE,
READ, NONE));
+
+ List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER,
"tmpUser", ALL));
+ asyncClientProtocol.setAcl(testPath, aclSpec);
+ syncReturn(Void.class);
+ asyncClientProtocol.setOwner(testPath, "tmpUser", "tmpUserGroup");
+ syncReturn(Void.class);
+
+ asyncClientProtocol.getFileInfo(testPath);
+ hdfsFileStatus = syncReturn(HdfsFileStatus.class);
+ assertEquals("tmpUser", hdfsFileStatus.getOwner());
+ assertEquals("tmpUserGroup", hdfsFileStatus.getGroup());
+
+ asyncClientProtocol.create(testPath + "/testCreate.file",
+ new FsPermission(ALL, ALL, ALL), "testAsyncClient",
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)),
+ false, (short) 1, 128 * 1024 * 1024L,
+ new CryptoProtocolVersion[]{ENCRYPTION_ZONES},
+ null, null);
+ hdfsFileStatus = syncReturn(HdfsFileStatus.class);
+ assertTrue(hdfsFileStatus.isFile());
+ assertEquals(128 * 1024 * 1024, hdfsFileStatus.getBlockSize());
+
+ asyncClientProtocol.getFileRemoteLocation(testPath);
+ RemoteLocation remoteLocation = syncReturn(RemoteLocation.class);
+ assertNotNull(remoteLocation);
+ assertEquals(getNs0(), remoteLocation.getNameserviceId());
+ assertEquals(testPath, remoteLocation.getSrc());
+
+ asyncClientProtocol.getListing(testPath, new byte[1], true);
+ DirectoryListing directoryListing = syncReturn(DirectoryListing.class);
+ assertEquals(1, directoryListing.getPartialListing().length);
+
+
asyncClientProtocol.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
+ DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class);
+ assertEquals(3, datanodeInfos.length);
+
+ asyncClientProtocol.createSymlink(testPath + "/testCreate.file",
+ "/link/link.file", new FsPermission(ALL, ALL, ALL), true);
+ syncReturn(Void.class);
+
+ asyncClientProtocol.getFileLinkInfo("/link/link.file");
+ hdfsFileStatus = syncReturn(HdfsFileStatus.class);
+ assertEquals("testCreate.file", hdfsFileStatus.getSymlink().getName());
+
+ asyncClientProtocol.rename(testPath + "/testCreate.file",
+ testPath + "/testRename.file");
+ success = syncReturn(boolean.class);
+ assertTrue(success);
+
+ asyncClientProtocol.delete(testPath, true);
+ success = syncReturn(boolean.class);
+ assertTrue(success);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]