This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 5aee0e0c0a2 HDFS-17134. RBF: Fix duplicate results of getListing through Router. (#5900). Contributed by Shuyan Zhang. 5aee0e0c0a2 is described below commit 5aee0e0c0a2c1d4181e92a20b311885a5c552121 Author: zhangshuyan <81411509+zhangshuy...@users.noreply.github.com> AuthorDate: Tue Aug 1 17:52:54 2023 +0800 HDFS-17134. RBF: Fix duplicate results of getListing through Router. (#5900). Contributed by Shuyan Zhang. Reviewed-by: Ayush Saxena <ayushsax...@apache.org> Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../federation/router/RouterClientProtocol.java | 56 ++++++++++++++-------- .../server/federation/router/TestRouterRpc.java | 41 ++++++++++++++++ 2 files changed, 77 insertions(+), 20 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 20945cf3de1..ba0abc11e02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -41,6 +41,7 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; @@ -101,10 +102,11 @@ import org.apache.hadoop.classification.VisibleForTesting; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.Serializable; import java.net.ConnectException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -819,6 +821,20 @@ public class RouterClientProtocol implements ClientProtocol { } } + /** + * For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results. + */ + private static class GetListingComparator + implements Comparator<byte[]>, Serializable { + @Override + public int compare(byte[] o1, byte[] o2) { + return DFSUtilClient.compareBytes(o1, o2); + } + } + + private static GetListingComparator comparator = + new GetListingComparator(); + @Override public DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException { @@ -826,13 +842,13 @@ public class RouterClientProtocol implements ClientProtocol { List<RemoteResult<RemoteLocation, DirectoryListing>> listings = getListingInt(src, startAfter, needLocation); - TreeMap<String, HdfsFileStatus> nnListing = new TreeMap<>(); + TreeMap<byte[], HdfsFileStatus> nnListing = new TreeMap<>(comparator); int totalRemainingEntries = 0; int remainingEntries = 0; boolean namenodeListingExists = false; // Check the subcluster listing with the smallest name to make sure // no file is skipped across subclusters - String lastName = null; + byte[] lastName = null; if (listings != null) { for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) { if (result.hasException()) { @@ -850,8 +866,9 @@ public class RouterClientProtocol implements ClientProtocol { int length = partialListing.length; if (length > 0) { HdfsFileStatus lastLocalEntry = partialListing[length-1]; - String lastLocalName = lastLocalEntry.getLocalName(); - if (lastName == null || lastName.compareTo(lastLocalName) > 0) { + byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes(); + if (lastName == null || + comparator.compare(lastName, lastLocalName) > 0) { lastName = lastLocalName; } } @@ -864,9 +881,9 @@ public class RouterClientProtocol implements ClientProtocol { if (listing != null) { namenodeListingExists = true; for (HdfsFileStatus file : listing.getPartialListing()) { - String filename = file.getLocalName(); + byte[] filename = file.getLocalNameInBytes(); if (totalRemainingEntries > 0 && - filename.compareTo(lastName) > 0) { + comparator.compare(filename, lastName) > 0) { // Discarding entries further than the lastName remainingEntries++; } else { @@ -880,10 +897,6 @@ public class RouterClientProtocol implements ClientProtocol { // Add mount points at this level in the tree final List<String> children = subclusterResolver.getMountPoints(src); - // Sort the list as the entries from subcluster are also sorted - if (children != null) { - Collections.sort(children); - } if (children != null) { // Get the dates for each mount point Map<String, Long> dates = getMountPointDates(src); @@ -899,22 +912,24 @@ public class RouterClientProtocol implements ClientProtocol { getMountPointStatus(childPath.toString(), 0, date); // if there is no subcluster path, always add mount point + byte[] bChild = DFSUtil.string2Bytes(child); if (lastName == null) { - nnListing.put(child, dirStatus); + nnListing.put(bChild, dirStatus); } else { - if (shouldAddMountPoint(child, + if (shouldAddMountPoint(bChild, lastName, startAfter, remainingEntries)) { // This may overwrite existing listing entries with the mount point // TODO don't add if already there? - nnListing.put(child, dirStatus); + nnListing.put(bChild, dirStatus); } } } // Update the remaining count to include left mount points if (nnListing.size() > 0) { - String lastListing = nnListing.lastKey(); + byte[] lastListing = nnListing.lastKey(); for (int i = 0; i < children.size(); i++) { - if (children.get(i).compareTo(lastListing) > 0) { + byte[] bChild = DFSUtil.string2Bytes(children.get(i)); + if (comparator.compare(bChild, lastListing) > 0) { remainingEntries += (children.size() - i); break; } @@ -2320,13 +2335,14 @@ public class RouterClientProtocol implements ClientProtocol { * @return */ private static boolean shouldAddMountPoint( - String mountPoint, String lastEntry, byte[] startAfter, + byte[] mountPoint, byte[] lastEntry, byte[] startAfter, int remainingEntries) { - if (mountPoint.compareTo(DFSUtil.bytes2String(startAfter)) > 0 && - mountPoint.compareTo(lastEntry) <= 0) { + if (comparator.compare(mountPoint, startAfter) > 0 && + comparator.compare(mountPoint, lastEntry) <= 0) { return true; } - if (remainingEntries == 0 && mountPoint.compareTo(lastEntry) >= 0) { + if (remainingEntries == 0 && + comparator.compare(mountPoint, lastEntry) >= 0) { return true; } return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index d3d34216190..d44b40b0523 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -145,6 +145,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.function.Supplier; +import java.util.stream.Collectors; + import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; /** @@ -2278,4 +2280,43 @@ public class TestRouterRpc { long proxyOpAfterWithReEnable = federationRPCMetrics.getProxyOps(); assertEquals(proxyOpAfterWithDisable + 2, proxyOpAfterWithReEnable); } + + @Test + public void testGetListingOrder() throws Exception { + String ns1 = getCluster().getNameservices().get(1); + String destBasePath = cluster.getNamenodeTestDirectoryForNS(ns1); + final String testPath1 = destBasePath + "/ßtestGetListingOrder"; + final String testPath2 = destBasePath + "/%testGetListingOrder"; + final FileSystem fileSystem1 = getCluster(). + getNamenode(ns1, null).getFileSystem(); + + try { + // Create the test file in ns1. + createFile(fileSystem1, testPath1, 32); + createFile(fileSystem1, testPath2, 32); + + NamenodeContext nn = cluster.getNamenode(ns1, null); + FileStatus[] fileStatuses = + nn.getFileSystem().listStatus(new Path(destBasePath)); + List<String> requiredPaths = Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath().getName()) + .collect(Collectors.toList()); + Iterator<String> requiredPathsIterator = requiredPaths.iterator(); + + // Fetch listing. + DirectoryListing listing = + routerProtocol.getListing(cluster.getFederatedTestDirectoryForNS(ns1), + HdfsFileStatus.EMPTY_NAME, false); + assertEquals(requiredPaths.size(), listing.getPartialListing().length); + // Match each path returned and verify order returned. + for (HdfsFileStatus f : listing.getPartialListing()) { + String fileName = requiredPathsIterator.next(); + String currentFile = f.getFullPath(new Path("/")).getName(); + assertEquals(currentFile, fileName); + } + } finally { + fileSystem1.delete(new Path(testPath1), true); + fileSystem1.delete(new Path(testPath2), true); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org