[
https://issues.apache.org/jira/browse/HDFS-17632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18044589#comment-18044589
]
ASF GitHub Bot commented on HDFS-17632:
---------------------------------------
Copilot commented on code in PR #8072:
URL: https://github.com/apache/hadoop/pull/8072#discussion_r2612818513
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1977,8 +1979,68 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId)
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ Map<RemoteLocation, BatchedEntries> results =
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ // Get the largest inodeIds for each namespace, and the smallest inodeId
of them
+ // then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
+ long minOfMax = Long.MAX_VALUE;
+ for (BatchedEntries nsEntries : results.values()) {
+ // Only need to care about namespaces that still have more files to
report
+ if (!nsEntries.hasMore()) {
+ continue;
+ }
+ long max = 0;
+ for (int i = 0; i < nsEntries.size(); i++) {
+ max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+ }
+ minOfMax = Math.min(minOfMax, max);
Review Comment:
The logic for calculating `minOfMax` skips namespaces with empty results
(size == 0) that have `hasMore() == true`. If a namespace has `hasMore() ==
true` but `size() == 0`, the loop at lines 1999-2001 will never execute,
leaving `max` as 0. This 0 value will then be used in `Math.min(minOfMax, max)`
at line 2002, potentially setting `minOfMax` to 0 incorrectly. Consider adding
a check to skip namespaces with empty results or initializing `max` to the
correct value.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java:
##########
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import
org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import
org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRouterListOpenFiles {
+ final private static String TEST_DESTINATION_PATH =
"/TestRouterListOpenFilesDst";
+ final private static int NUM_SUBCLUSTERS = 2;
+ final private static int BATCH_SIZE = 3;
+ private static StateStoreDFSCluster cluster;
+ private static MiniRouterDFSCluster.RouterContext routerContext;
+ private static RouterClientProtocol routerProtocol;
+ private static DFSClient client0;
+ private static DFSClient client1;
+ private static DFSClient routerClient;
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ cluster = new StateStoreDFSCluster(false, NUM_SUBCLUSTERS,
+ MultipleDestinationMountTableResolver.class);
+ Configuration conf = new
RouterConfigBuilder().stateStore().heartbeat().admin().rpc().build();
+ conf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns0,ns1");
+ conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, true);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
BATCH_SIZE);
+ cluster.addRouterOverrides(conf);
+ cluster.startCluster(conf);
+ cluster.startRouters();
+ cluster.waitClusterUp();
+ routerContext = cluster.getRandomRouter();
+ routerProtocol =
routerContext.getRouterRpcServer().getClientProtocolModule();
+ routerClient = routerContext.getClient();
+ client0 = cluster.getNamenode("ns0", null).getClient();
+ client1 = cluster.getNamenode("ns1", null).getClient();
+ }
+
+ @AfterAll
+ public static void cleanup() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @BeforeEach
+ public void resetInodeId() throws IOException {
+ cluster.getNamenode("ns0",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking(12345);
+ cluster.getNamenode("ns1",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking(12345);
+ // Create 2 dirs with the same name on 2 different nss
+ client0.mkdirs(TEST_DESTINATION_PATH);
+ client1.mkdirs(TEST_DESTINATION_PATH);
+ }
+
+ @AfterEach
+ public void cleanupNamespaces() throws IOException {
+ client0.delete("/", true);
+ client1.delete("/", true);
+ }
+
+ @Test
+ public void testSingleDestination() throws Exception {
+ String testPath = "/" + getMethodName();
+ createMountTableEntry(testPath, Collections.singletonList("ns0"));
+
+ // Open 2 files with different names
+ OutputStream os0 = client0.create(TEST_DESTINATION_PATH + "/file0", true);
+ OutputStream os1 = client1.create(TEST_DESTINATION_PATH + "/file1", true);
+
+ BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
+ routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ testPath);
+ // Should list only the entry on ns0
+ assertEquals(1, result.size());
+ assertEquals(testPath + "/file0", result.get(0).getFilePath());
+ os0.close();
+ os1.close();
+ }
+
+ @Test
+ public void testMultipleDestinations() throws Exception {
+ String testPath = "/" + getMethodName();
+ createMountTableEntry(testPath, cluster.getNameservices());
+
+ // Open 2 files with different names
+ OutputStream os0 = client0.create(TEST_DESTINATION_PATH + "/file0", true);
+ OutputStream os1 = client1.create(TEST_DESTINATION_PATH + "/file1", true);
+ BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
+ routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ testPath);
+ // Should list both entries on ns0 and ns1
+ assertEquals(2, result.size());
+ assertEquals(testPath + "/file0", result.get(0).getFilePath());
+ assertEquals(testPath + "/file1", result.get(1).getFilePath());
+ RemoteIterator<OpenFileEntry> ite = routerClient.listOpenFiles(testPath);
+ while (ite.hasNext()) {
+ OpenFileEntry ofe = ite.next();
+ assertTrue(ofe.getFilePath().equals(testPath + "/file0") ||
ofe.getFilePath()
+ .equals(testPath + "/file1"));
+ }
+ os0.close();
+ os1.close();
+
+ // Open 2 files with same name
+ os0 = client0.create(TEST_DESTINATION_PATH + "/file2", true);
+ os1 = client1.create(TEST_DESTINATION_PATH + "/file2", true);
+ result =
+ routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ testPath);
+ // Should list one file only
+ assertEquals(1, result.size());
+ assertEquals(routerClient.getFileInfo(TEST_DESTINATION_PATH +
"/file2").getFileId(),
Review Comment:
The code retrieves file info using `TEST_DESTINATION_PATH + "/file2"`
instead of `testPath + "/file2"`. This is inconsistent with the rest of the
test which uses `testPath` for router operations. The router client should use
router paths (testPath), not destination paths (TEST_DESTINATION_PATH).
```suggestion
assertEquals(routerClient.getFileInfo(testPath + "/file2").getFileId(),
```
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1977,8 +1979,68 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId)
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ Map<RemoteLocation, BatchedEntries> results =
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ // Get the largest inodeIds for each namespace, and the smallest inodeId
of them
+ // then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
+ long minOfMax = Long.MAX_VALUE;
+ for (BatchedEntries nsEntries : results.values()) {
+ // Only need to care about namespaces that still have more files to
report
+ if (!nsEntries.hasMore()) {
+ continue;
+ }
+ long max = 0;
+ for (int i = 0; i < nsEntries.size(); i++) {
+ max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+ }
+ minOfMax = Math.min(minOfMax, max);
+ }
+ // Concatenate all entries into one result, sorted by inodeId
+ boolean hasMore = false;
+ Map<String, OpenFileEntry> routerEntries = new HashMap<>();
+ Map<String, RemoteLocation> resolvedPaths = new HashMap<>();
+ for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet())
{
+ BatchedEntries nsEntries = entry.getValue();
+ hasMore |= nsEntries.hasMore();
+ for (int i = 0; i < nsEntries.size(); i++) {
+ OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
+ if (ofe.getId() > minOfMax) {
+ hasMore = true;
+ break;
+ }
+ RemoteLocation remoteLoc = entry.getKey();
+ String routerPath =
ofe.getFilePath().replaceFirst(remoteLoc.getDest(), remoteLoc.getSrc());
+ OpenFileEntry newEntry =
+ new OpenFileEntry(ofe.getId(), routerPath, ofe.getClientName(),
+ ofe.getClientMachine());
+ // An existing file already resolves to the same path.
+ // Resolve according to mount table and keep the best path.
+ if (resolvedPaths.containsKey(routerPath)) {
+ PathLocation pathLoc =
subclusterResolver.getDestinationForPath(routerPath);
+ List<String> namespaces = pathLoc.getDestinations().stream().map(
+ RemoteLocation::getNameserviceId).collect(
+ Collectors.toList());
+ int existingIdx =
namespaces.indexOf(resolvedPaths.get(routerPath).getNameserviceId());
+ int currentIdx = namespaces.indexOf(remoteLoc.getNameserviceId());
+ if (currentIdx < existingIdx && currentIdx != -1) {
+ routerEntries.put(routerPath, newEntry);
+ resolvedPaths.put(routerPath, remoteLoc);
+ }
Review Comment:
When duplicate file paths are detected (line 2024), the code resolves the
path using `subclusterResolver.getDestinationForPath(routerPath)` to determine
which namespace should take precedence. However, this resolver call is made for
every duplicate, which could be expensive. Consider caching the PathLocation
results or restructuring to avoid repeated lookups for the same path.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1977,8 +1979,68 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId)
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ Map<RemoteLocation, BatchedEntries> results =
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ // Get the largest inodeIds for each namespace, and the smallest inodeId
of them
+ // then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
+ long minOfMax = Long.MAX_VALUE;
+ for (BatchedEntries nsEntries : results.values()) {
+ // Only need to care about namespaces that still have more files to
report
+ if (!nsEntries.hasMore()) {
+ continue;
+ }
+ long max = 0;
+ for (int i = 0; i < nsEntries.size(); i++) {
+ max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+ }
+ minOfMax = Math.min(minOfMax, max);
+ }
+ // Concatenate all entries into one result, sorted by inodeId
+ boolean hasMore = false;
+ Map<String, OpenFileEntry> routerEntries = new HashMap<>();
+ Map<String, RemoteLocation> resolvedPaths = new HashMap<>();
+ for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet())
{
+ BatchedEntries nsEntries = entry.getValue();
+ hasMore |= nsEntries.hasMore();
+ for (int i = 0; i < nsEntries.size(); i++) {
+ OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
+ if (ofe.getId() > minOfMax) {
+ hasMore = true;
+ break;
+ }
+ RemoteLocation remoteLoc = entry.getKey();
+ String routerPath =
ofe.getFilePath().replaceFirst(remoteLoc.getDest(), remoteLoc.getSrc());
Review Comment:
The path replacement using `replaceFirst` could be unsafe if the destination
path contains regex special characters. For example, if `remoteLoc.getDest()`
contains characters like `.` or `*`, they will be interpreted as regex patterns
rather than literal strings. Consider using a literal string replacement method
or escaping the pattern with `Pattern.quote()`.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java:
##########
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import
org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import
org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRouterListOpenFiles {
+ final private static String TEST_DESTINATION_PATH =
"/TestRouterListOpenFilesDst";
+ final private static int NUM_SUBCLUSTERS = 2;
+ final private static int BATCH_SIZE = 3;
+ private static StateStoreDFSCluster cluster;
+ private static MiniRouterDFSCluster.RouterContext routerContext;
+ private static RouterClientProtocol routerProtocol;
+ private static DFSClient client0;
+ private static DFSClient client1;
+ private static DFSClient routerClient;
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ cluster = new StateStoreDFSCluster(false, NUM_SUBCLUSTERS,
+ MultipleDestinationMountTableResolver.class);
+ Configuration conf = new
RouterConfigBuilder().stateStore().heartbeat().admin().rpc().build();
+ conf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns0,ns1");
+ conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, true);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
BATCH_SIZE);
+ cluster.addRouterOverrides(conf);
+ cluster.startCluster(conf);
+ cluster.startRouters();
+ cluster.waitClusterUp();
+ routerContext = cluster.getRandomRouter();
+ routerProtocol =
routerContext.getRouterRpcServer().getClientProtocolModule();
+ routerClient = routerContext.getClient();
+ client0 = cluster.getNamenode("ns0", null).getClient();
+ client1 = cluster.getNamenode("ns1", null).getClient();
+ }
+
+ @AfterAll
+ public static void cleanup() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @BeforeEach
+ public void resetInodeId() throws IOException {
+ cluster.getNamenode("ns0",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking(12345);
+ cluster.getNamenode("ns1",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking(12345);
+ // Create 2 dirs with the same name on 2 different nss
+ client0.mkdirs(TEST_DESTINATION_PATH);
+ client1.mkdirs(TEST_DESTINATION_PATH);
+ }
+
+ @AfterEach
+ public void cleanupNamespaces() throws IOException {
+ client0.delete("/", true);
+ client1.delete("/", true);
+ }
+
+ @Test
+ public void testSingleDestination() throws Exception {
+ String testPath = "/" + getMethodName();
+ createMountTableEntry(testPath, Collections.singletonList("ns0"));
+
+ // Open 2 files with different names
+ OutputStream os0 = client0.create(TEST_DESTINATION_PATH + "/file0", true);
+ OutputStream os1 = client1.create(TEST_DESTINATION_PATH + "/file1", true);
+
+ BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
+ routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ testPath);
+ // Should list only the entry on ns0
+ assertEquals(1, result.size());
+ assertEquals(testPath + "/file0", result.get(0).getFilePath());
+ os0.close();
+ os1.close();
+ }
+
+ @Test
+ public void testMultipleDestinations() throws Exception {
+ String testPath = "/" + getMethodName();
+ createMountTableEntry(testPath, cluster.getNameservices());
+
+ // Open 2 files with different names
+ OutputStream os0 = client0.create(TEST_DESTINATION_PATH + "/file0", true);
+ OutputStream os1 = client1.create(TEST_DESTINATION_PATH + "/file1", true);
+ BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
+ routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ testPath);
+ // Should list both entries on ns0 and ns1
+ assertEquals(2, result.size());
+ assertEquals(testPath + "/file0", result.get(0).getFilePath());
+ assertEquals(testPath + "/file1", result.get(1).getFilePath());
+ RemoteIterator<OpenFileEntry> ite = routerClient.listOpenFiles(testPath);
+ while (ite.hasNext()) {
+ OpenFileEntry ofe = ite.next();
+ assertTrue(ofe.getFilePath().equals(testPath + "/file0") ||
ofe.getFilePath()
+ .equals(testPath + "/file1"));
+ }
+ os0.close();
+ os1.close();
+
+ // Open 2 files with same name
+ os0 = client0.create(TEST_DESTINATION_PATH + "/file2", true);
+ os1 = client1.create(TEST_DESTINATION_PATH + "/file2", true);
+ result =
+ routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ testPath);
+ // Should list one file only
+ assertEquals(1, result.size());
+ assertEquals(routerClient.getFileInfo(TEST_DESTINATION_PATH +
"/file2").getFileId(),
+ result.get(0).getId());
+ ite = routerClient.listOpenFiles(testPath);
+ routerClient.open(testPath + "/file2");
Review Comment:
The call to `routerClient.open(testPath + "/file2")` appears to be redundant
and its result is not used. This line is executed after creating the iterator
but before consuming it. If the intent is to test that the file can be opened,
the opened stream should be captured and closed. Otherwise, this line should be
removed.
```suggestion
```
> RBF: Support listOpenFiles for routers
> --------------------------------------
>
> Key: HDFS-17632
> URL: https://issues.apache.org/jira/browse/HDFS-17632
> Project: Hadoop HDFS
> Issue Type: Improvement
> Components: hdfs, rbf
> Reporter: Felix N
> Assignee: Felix N
> Priority: Major
> Labels: pull-request-available
>
> {code:java}
> @Override
> public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
> EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
> throws IOException {
> rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
> return null;
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]