This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5aee0e0c0a2 HDFS-17134. RBF: Fix duplicate results of getListing 
through Router. (#5900). Contributed by Shuyan Zhang.
5aee0e0c0a2 is described below

commit 5aee0e0c0a2c1d4181e92a20b311885a5c552121
Author: zhangshuyan <81411509+zhangshuy...@users.noreply.github.com>
AuthorDate: Tue Aug 1 17:52:54 2023 +0800

    HDFS-17134. RBF: Fix duplicate results of getListing through Router. 
(#5900). Contributed by Shuyan Zhang.
    
    Reviewed-by: Ayush Saxena <ayushsax...@apache.org>
    Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org>
---
 .../federation/router/RouterClientProtocol.java    | 56 ++++++++++++++--------
 .../server/federation/router/TestRouterRpc.java    | 41 ++++++++++++++++
 2 files changed, 77 insertions(+), 20 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 20945cf3de1..ba0abc11e02 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
 import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
@@ -101,10 +102,11 @@ import org.apache.hadoop.classification.VisibleForTesting;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.Serializable;
 import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -819,6 +821,20 @@ public class RouterClientProtocol implements 
ClientProtocol {
     }
   }
 
+  /**
+   * For {@link #getListing(String,byte[],boolean) GetLisiting} to sort 
results.
+   */
+  private static class GetListingComparator
+      implements Comparator<byte[]>, Serializable {
+    @Override
+    public int compare(byte[] o1, byte[] o2) {
+      return DFSUtilClient.compareBytes(o1, o2);
+    }
+  }
+
+  private static GetListingComparator comparator =
+      new GetListingComparator();
+
   @Override
   public DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) throws IOException {
@@ -826,13 +842,13 @@ public class RouterClientProtocol implements 
ClientProtocol {
 
     List<RemoteResult<RemoteLocation, DirectoryListing>> listings =
         getListingInt(src, startAfter, needLocation);
-    TreeMap<String, HdfsFileStatus> nnListing = new TreeMap<>();
+    TreeMap<byte[], HdfsFileStatus> nnListing = new TreeMap<>(comparator);
     int totalRemainingEntries = 0;
     int remainingEntries = 0;
     boolean namenodeListingExists = false;
     // Check the subcluster listing with the smallest name to make sure
     // no file is skipped across subclusters
-    String lastName = null;
+    byte[] lastName = null;
     if (listings != null) {
       for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
         if (result.hasException()) {
@@ -850,8 +866,9 @@ public class RouterClientProtocol implements ClientProtocol 
{
           int length = partialListing.length;
           if (length > 0) {
             HdfsFileStatus lastLocalEntry = partialListing[length-1];
-            String lastLocalName = lastLocalEntry.getLocalName();
-            if (lastName == null || lastName.compareTo(lastLocalName) > 0) {
+            byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes();
+            if (lastName == null ||
+                comparator.compare(lastName, lastLocalName) > 0) {
               lastName = lastLocalName;
             }
           }
@@ -864,9 +881,9 @@ public class RouterClientProtocol implements ClientProtocol 
{
         if (listing != null) {
           namenodeListingExists = true;
           for (HdfsFileStatus file : listing.getPartialListing()) {
-            String filename = file.getLocalName();
+            byte[] filename = file.getLocalNameInBytes();
             if (totalRemainingEntries > 0 &&
-                filename.compareTo(lastName) > 0) {
+                comparator.compare(filename, lastName) > 0) {
               // Discarding entries further than the lastName
               remainingEntries++;
             } else {
@@ -880,10 +897,6 @@ public class RouterClientProtocol implements 
ClientProtocol {
 
     // Add mount points at this level in the tree
     final List<String> children = subclusterResolver.getMountPoints(src);
-    // Sort the list as the entries from subcluster are also sorted
-    if (children != null) {
-      Collections.sort(children);
-    }
     if (children != null) {
       // Get the dates for each mount point
       Map<String, Long> dates = getMountPointDates(src);
@@ -899,22 +912,24 @@ public class RouterClientProtocol implements 
ClientProtocol {
             getMountPointStatus(childPath.toString(), 0, date);
 
         // if there is no subcluster path, always add mount point
+        byte[] bChild = DFSUtil.string2Bytes(child);
         if (lastName == null) {
-          nnListing.put(child, dirStatus);
+          nnListing.put(bChild, dirStatus);
         } else {
-          if (shouldAddMountPoint(child,
+          if (shouldAddMountPoint(bChild,
                 lastName, startAfter, remainingEntries)) {
             // This may overwrite existing listing entries with the mount point
             // TODO don't add if already there?
-            nnListing.put(child, dirStatus);
+            nnListing.put(bChild, dirStatus);
           }
         }
       }
       // Update the remaining count to include left mount points
       if (nnListing.size() > 0) {
-        String lastListing = nnListing.lastKey();
+        byte[] lastListing = nnListing.lastKey();
         for (int i = 0; i < children.size(); i++) {
-          if (children.get(i).compareTo(lastListing) > 0) {
+          byte[] bChild = DFSUtil.string2Bytes(children.get(i));
+          if (comparator.compare(bChild, lastListing) > 0) {
             remainingEntries += (children.size() - i);
             break;
           }
@@ -2320,13 +2335,14 @@ public class RouterClientProtocol implements 
ClientProtocol {
    * @return
    */
   private static boolean shouldAddMountPoint(
-      String mountPoint, String lastEntry, byte[] startAfter,
+      byte[] mountPoint, byte[] lastEntry, byte[] startAfter,
       int remainingEntries) {
-    if (mountPoint.compareTo(DFSUtil.bytes2String(startAfter)) > 0 &&
-        mountPoint.compareTo(lastEntry) <= 0) {
+    if (comparator.compare(mountPoint, startAfter) > 0 &&
+        comparator.compare(mountPoint, lastEntry) <= 0) {
       return true;
     }
-    if (remainingEntries == 0 && mountPoint.compareTo(lastEntry) >= 0) {
+    if (remainingEntries == 0 &&
+        comparator.compare(mountPoint, lastEntry) >= 0) {
       return true;
     }
     return false;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index d3d34216190..d44b40b0523 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -145,6 +145,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
 
 /**
@@ -2278,4 +2280,43 @@ public class TestRouterRpc {
     long proxyOpAfterWithReEnable = federationRPCMetrics.getProxyOps();
     assertEquals(proxyOpAfterWithDisable + 2, proxyOpAfterWithReEnable);
   }
+
+  @Test
+  public void testGetListingOrder() throws Exception {
+    String ns1 = getCluster().getNameservices().get(1);
+    String destBasePath =  cluster.getNamenodeTestDirectoryForNS(ns1);
+    final String testPath1 = destBasePath + "/ßtestGetListingOrder";
+    final String testPath2 = destBasePath + "/%testGetListingOrder";
+    final FileSystem fileSystem1 = getCluster().
+        getNamenode(ns1, null).getFileSystem();
+
+    try {
+      // Create the test file in ns1.
+      createFile(fileSystem1, testPath1, 32);
+      createFile(fileSystem1, testPath2, 32);
+
+      NamenodeContext nn = cluster.getNamenode(ns1, null);
+      FileStatus[] fileStatuses =
+          nn.getFileSystem().listStatus(new Path(destBasePath));
+      List<String> requiredPaths = Arrays.stream(fileStatuses)
+          .map(fileStatus -> fileStatus.getPath().getName())
+          .collect(Collectors.toList());
+      Iterator<String> requiredPathsIterator = requiredPaths.iterator();
+
+      // Fetch listing.
+      DirectoryListing listing =
+          
routerProtocol.getListing(cluster.getFederatedTestDirectoryForNS(ns1),
+              HdfsFileStatus.EMPTY_NAME, false);
+      assertEquals(requiredPaths.size(), listing.getPartialListing().length);
+      // Match each path returned and verify order returned.
+      for (HdfsFileStatus f : listing.getPartialListing()) {
+        String fileName = requiredPathsIterator.next();
+        String currentFile = f.getFullPath(new Path("/")).getName();
+        assertEquals(currentFile, fileName);
+      }
+    } finally {
+      fileSystem1.delete(new Path(testPath1), true);
+      fileSystem1.delete(new Path(testPath2), true);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to