Copilot commented on code in PR #8072:
URL: https://github.com/apache/hadoop/pull/8072#discussion_r2612818506
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1977,8 +1979,68 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId)
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ Map<RemoteLocation, BatchedEntries> results =
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ // Get the largest inodeIds for each namespace, and the smallest inodeId
of them
+ // then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
+ long minOfMax = Long.MAX_VALUE;
+ for (BatchedEntries nsEntries : results.values()) {
+ // Only need to care about namespaces that still have more files to
report
+ if (!nsEntries.hasMore()) {
+ continue;
+ }
+ long max = 0;
+ for (int i = 0; i < nsEntries.size(); i++) {
+ max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+ }
+ minOfMax = Math.min(minOfMax, max);
+ }
+ // Concatenate all entries into one result, sorted by inodeId
+ boolean hasMore = false;
+ Map<String, OpenFileEntry> routerEntries = new HashMap<>();
+ Map<String, RemoteLocation> resolvedPaths = new HashMap<>();
+ for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet())
{
+ BatchedEntries nsEntries = entry.getValue();
+ hasMore |= nsEntries.hasMore();
+ for (int i = 0; i < nsEntries.size(); i++) {
+ OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
+ if (ofe.getId() > minOfMax) {
+ hasMore = true;
+ break;
+ }
+ RemoteLocation remoteLoc = entry.getKey();
+ String routerPath =
ofe.getFilePath().replaceFirst(remoteLoc.getDest(), remoteLoc.getSrc());
Review Comment:
The path replacement using `replaceFirst` could be unsafe if the destination
path contains regex special characters. For example, if `remoteLoc.getDest()`
contains characters like `.` or `*`, they will be interpreted as regex patterns
rather than literal strings. Consider using a literal string replacement method
or escaping the pattern with `Pattern.quote()`.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1977,8 +1979,68 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId)
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ Map<RemoteLocation, BatchedEntries> results =
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ // Get the largest inodeIds for each namespace, and the smallest inodeId
of them
+ // then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
+ long minOfMax = Long.MAX_VALUE;
+ for (BatchedEntries nsEntries : results.values()) {
+ // Only need to care about namespaces that still have more files to
report
+ if (!nsEntries.hasMore()) {
+ continue;
+ }
+ long max = 0;
+ for (int i = 0; i < nsEntries.size(); i++) {
+ max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+ }
+ minOfMax = Math.min(minOfMax, max);
+ }
+ // Concatenate all entries into one result, sorted by inodeId
+ boolean hasMore = false;
+ Map<String, OpenFileEntry> routerEntries = new HashMap<>();
+ Map<String, RemoteLocation> resolvedPaths = new HashMap<>();
+ for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet())
{
+ BatchedEntries nsEntries = entry.getValue();
+ hasMore |= nsEntries.hasMore();
+ for (int i = 0; i < nsEntries.size(); i++) {
+ OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
+ if (ofe.getId() > minOfMax) {
+ hasMore = true;
+ break;
+ }
+ RemoteLocation remoteLoc = entry.getKey();
+ String routerPath =
ofe.getFilePath().replaceFirst(remoteLoc.getDest(), remoteLoc.getSrc());
+ OpenFileEntry newEntry =
+ new OpenFileEntry(ofe.getId(), routerPath, ofe.getClientName(),
+ ofe.getClientMachine());
+ // An existing file already resolves to the same path.
+ // Resolve according to mount table and keep the best path.
+ if (resolvedPaths.containsKey(routerPath)) {
+ PathLocation pathLoc =
subclusterResolver.getDestinationForPath(routerPath);
+ List<String> namespaces = pathLoc.getDestinations().stream().map(
+ RemoteLocation::getNameserviceId).collect(
+ Collectors.toList());
+ int existingIdx =
namespaces.indexOf(resolvedPaths.get(routerPath).getNameserviceId());
+ int currentIdx = namespaces.indexOf(remoteLoc.getNameserviceId());
+ if (currentIdx < existingIdx && currentIdx != -1) {
+ routerEntries.put(routerPath, newEntry);
+ resolvedPaths.put(routerPath, remoteLoc);
+ }
Review Comment:
When duplicate file paths are detected (line 2024), the code resolves the
path using `subclusterResolver.getDestinationForPath(routerPath)` to determine
which namespace should take precedence. However, this resolver call is made for
every duplicate, which could be expensive. Consider caching the PathLocation
results or restructuring to avoid repeated lookups for the same path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]