This is an automated email from the ASF dual-hosted git repository.

ZanderXu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 215a1edea61 HDFS-17910. [ARR] Support async listOpenFiles for routers 
(#8454)
215a1edea61 is described below

commit 215a1edea61add4983cf7c58c9ebfc518adf0d6b
Author: Felix Nguyen <[email protected]>
AuthorDate: Wed May 6 10:28:00 2026 +0800

    HDFS-17910. [ARR] Support async listOpenFiles for routers (#8454)
---
 .../federation/router/RouterClientProtocol.java    | 20 +++++++++++++--
 .../router/async/RouterAsyncClientProtocol.java    | 21 +++++++++++++++
 .../federation/router/TestRouterListOpenFiles.java | 30 +++++++++++++++++++---
 3 files changed, 66 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index aac04a97849..5d822f2e6d5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -22,8 +22,8 @@
 import static 
org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.BatchedRemoteIterator;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -1986,7 +1986,23 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long 
prevId,
             prevId, openFilesTypes, new RemoteParam());
     Map<RemoteLocation, BatchedEntries> results =
         rpcClient.invokeConcurrent(locations, method, true, false, -1, 
BatchedEntries.class);
+    return mergeAndSortOpenFileListResults(results);
+  }
 
+  /**
+   * Merges the invocation results of listOpenFiles from downstream namespaces.
+   * To ensure no entries are skipped for the next call iteration, trims off 
all entries with
+   * <pre>
+   *   id > min([max([entry.id for entry in entries]) for entries per 
namespace])
+   * </pre>
+   * then sorts the filtered results by id, in ascending order.
+   * @param results invocation results of listOpenFiles from downstream 
namespaces
+   * @return {@link BatchedListEntries} object of merged entries
+   * @throws IOException when one file appears in different namespaces,
+   *                     and the path cannot resolve to a mount point
+   */
+  protected BatchedListEntries<OpenFileEntry> mergeAndSortOpenFileListResults(
+      Map<RemoteLocation, BatchedEntries> results) throws IOException {
     // Get the largest inodeIds for each namespace, and the smallest inodeId 
of them
     // then ignore all entries above this id to keep a consistent prevId for 
the next listOpenFiles
     long minOfMax = Long.MAX_VALUE;
@@ -2040,7 +2056,7 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long 
prevId,
     }
     List<OpenFileEntry> entryList = new ArrayList<>(routerEntries.values());
     entryList.sort(Comparator.comparingLong(OpenFileEntry::getId));
-    return new BatchedRemoteIterator.BatchedListEntries<>(entryList, hasMore);
+    return new BatchedListEntries<>(entryList, hasMore);
   }
 
   @Override
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
index b64ebb31f96..b421c8f39eb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
@@ -20,6 +20,8 @@
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -34,6 +36,8 @@
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -839,6 +843,23 @@ public ReplicatedBlockStats getReplicatedBlockStats() 
throws IOException {
     return asyncReturn(ReplicatedBlockStats.class);
   }
 
+  @Override
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+      EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path) 
throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, 
false, false);
+    RemoteMethod method =
+        new RemoteMethod("listOpenFiles", new Class<?>[] {long.class, 
EnumSet.class, String.class},
+            prevId, openFilesTypes, new RemoteParam());
+    rpcClient.invokeConcurrent(locations, method, true, false, -1, 
BatchedEntries.class);
+
+    asyncApply(o -> {
+      Map<RemoteLocation, BatchedEntries> results = (Map<RemoteLocation, 
BatchedEntries>) o;
+      return mergeAndSortOpenFileListResults(results);
+    });
+    return asyncReturn(BatchedListEntries.class);
+  }
+
   @Override
   public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType 
type)
       throws IOException {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
index e96c7c757fe..300e2451a8f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
@@ -28,9 +28,10 @@
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BatchedRemoteIterator;
@@ -49,10 +50,14 @@
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 
 import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@MethodSource("getParameters")
+@ParameterizedClass
 public class TestRouterListOpenFiles {
   final private static String TEST_DESTINATION_PATH = 
"/TestRouterListOpenFilesDst";
   final private static int NUM_SUBCLUSTERS = 2;
@@ -63,12 +68,22 @@ public class TestRouterListOpenFiles {
   private static DFSClient client0;
   private static DFSClient client1;
   private static DFSClient routerClient;
+  private final boolean useAsync;
 
-  @BeforeAll
-  public static void setup() throws Exception {
+  public TestRouterListOpenFiles(boolean useAsyncFlag) throws Exception {
+    this.useAsync = useAsyncFlag;
+    setup(useAsyncFlag);
+  }
+
+  public static Object[] getParameters() {
+    return new Object[] {true, false};
+  }
+
+  public void setup(boolean useAsyncFlag) throws Exception {
     cluster = new StateStoreDFSCluster(false, NUM_SUBCLUSTERS,
         MultipleDestinationMountTableResolver.class);
     Configuration conf = new 
RouterConfigBuilder().stateStore().heartbeat().admin().rpc().build();
+    conf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, useAsyncFlag);
     conf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns0,ns1");
     conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, true);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 
BATCH_SIZE);
@@ -120,6 +135,9 @@ public void testSingleDestination() throws Exception {
     BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
         routerProtocol.listOpenFiles(0, 
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
             testPath);
+    if (useAsync) {
+      result = syncReturn(BatchedRemoteIterator.BatchedEntries.class);
+    }
     // Should list only the entry on ns0
     assertEquals(1, result.size());
     assertEquals(testPath + "/file0", result.get(0).getFilePath());
@@ -138,6 +156,9 @@ public void testMultipleDestinations() throws Exception {
     BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
         routerProtocol.listOpenFiles(0, 
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
             testPath);
+    if (useAsync) {
+      result = syncReturn(BatchedRemoteIterator.BatchedEntries.class);
+    }
     // Should list both entries on ns0 and ns1
     assertEquals(2, result.size());
     assertEquals(testPath + "/file0", result.get(0).getFilePath());
@@ -157,6 +178,9 @@ public void testMultipleDestinations() throws Exception {
     result =
         routerProtocol.listOpenFiles(0, 
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
             testPath);
+    if (useAsync) {
+      result = syncReturn(BatchedRemoteIterator.BatchedEntries.class);
+    }
     // Should list one file only
     assertEquals(1, result.size());
     assertEquals(routerClient.getFileInfo(TEST_DESTINATION_PATH + 
"/file2").getFileId(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to