ZanderXu commented on code in PR #8454:
URL: https://github.com/apache/hadoop/pull/8454#discussion_r3135906015
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1986,7 +1986,23 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId,
prevId, openFilesTypes, new RemoteParam());
Map<RemoteLocation, BatchedEntries> results =
rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+ return mergeAndSortOpenFileListResults(results);
+ }
+ /**
+ * Merges the invocation results of listOpenFiles from downstream namespaces.
+ * To ensure no entries are skipped for the next call iteration, trims off
all entries with
+ * <pre>
+ * id > min([max([entry.id for entry in entries]) for entries per
namespace])
+ * </pre>
+ * then sorts the filtered results by id, in ascending order.
+ * @param results invocation results of listOpenFiles from downstream
namespaces
+ * @return {@link BatchedRemoteIterator.BatchedListEntries} object of merged
entries
+ * @throws IOException when one file appears in different namespaces,
+ * and the path cannot resolve to a mount point
+ */
+ protected BatchedRemoteIterator.BatchedListEntries<OpenFileEntry>
mergeAndSortOpenFileListResults(
Review Comment:
`BatchedListEntries<OpenFileEntry> mergeOpenFileEntries(Map<RemoteLocation,
BatchedEntries> entriesByLocation)`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java:
##########
@@ -839,6 +843,26 @@ public ReplicatedBlockStats getReplicatedBlockStats()
throws IOException {
return asyncReturn(ReplicatedBlockStats.class);
}
+ @Override
+ public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+ EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ // Returns Map<RemoteLocation, BatchedEntries>
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
+ BatchedEntries.class);
+
+ asyncApply(o -> {
+ Map<RemoteLocation, BatchedEntries> results = (Map<RemoteLocation,
BatchedEntries>) o;
+ return mergeAndSortOpenFileListResults(results);
+ });
+ return asyncReturn(BatchedRemoteIterator.BatchedListEntries.class);
Review Comment:
`return asyncReturn(BatchedEntries.class);`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]