This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fddae2cb7 Make S3PinotFS listFiles return directories when 
non-recursive (#14073)
2fddae2cb7 is described below

commit 2fddae2cb799bb86441f4f89f40aa5a7c93ba7c6
Author: William Gan <[email protected]>
AuthorDate: Mon Oct 7 18:12:10 2024 -0700

    Make S3PinotFS listFiles return directories when non-recursive (#14073)
---
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 27 +++++++++---
 .../pinot/plugin/filesystem/S3PinotFSTest.java     | 50 ++++++++++++----------
 2 files changed, 50 insertions(+), 27 deletions(-)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index d1129156a9..d603ef2ac7 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -55,6 +55,7 @@ import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CommonPrefix;
 import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
 import software.amazon.awssdk.services.s3.model.CompletedPart;
@@ -501,11 +502,11 @@ public class S3PinotFS extends BasePinotFS {
       throws IOException {
     ImmutableList.Builder<String> builder = ImmutableList.builder();
     visitFiles(fileUri, recursive, s3Object -> {
-      // TODO: Looks like S3PinotFS filters out directories, inconsistent with 
the other implementations.
-      // Only add files and not directories
       if (!s3Object.key().equals(fileUri.getPath()) && 
!s3Object.key().endsWith(DELIMITER)) {
         builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + 
getNormalizedFileKey(s3Object));
       }
+    }, commonPrefix -> {
+      builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + 
getNormalizedFileKey(commonPrefix));
     });
     String[] listedFiles = builder.build().toArray(new String[0]);
     LOGGER.info("Listed {} files from URI: {}, is recursive: {}", 
listedFiles.length, fileUri, recursive);
@@ -524,6 +525,11 @@ public class S3PinotFS extends BasePinotFS {
             .setIsDirectory(s3Object.key().endsWith(DELIMITER));
         listBuilder.add(fileBuilder.build());
       }
+    }, commonPrefix -> {
+      FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
+          .setFilePath(S3_SCHEME + fileUri.getHost() + DELIMITER + 
getNormalizedFileKey(commonPrefix))
+          .setIsDirectory(true);
+      listBuilder.add(fileBuilder.build());
     });
     ImmutableList<FileMetadata> listedFiles = listBuilder.build();
     LOGGER.info("Listed {} files from URI: {}, is recursive: {}", 
listedFiles.size(), fileUri, recursive);
@@ -538,8 +544,15 @@ public class S3PinotFS extends BasePinotFS {
     return fileKey;
   }
 
-  private void visitFiles(URI fileUri, boolean recursive, Consumer<S3Object> 
visitor)
-      throws IOException {
+  private static String getNormalizedFileKey(CommonPrefix commonPrefix) {
+    String prefix = commonPrefix.prefix();
+    return prefix.substring(0, prefix.length() - 1);
+  }
+
+  private void visitFiles(URI fileUri, boolean recursive, Consumer<S3Object> 
objectVisitor,
+      // S3 has a concept of CommonPrefixes which act like subdirectories:
+      // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CommonPrefix.html
+      @Nullable Consumer<CommonPrefix> commonPrefixVisitor) throws IOException 
{
     try {
       String continuationToken = null;
       boolean isDone = false;
@@ -561,7 +574,11 @@ public class S3PinotFS extends BasePinotFS {
         ListObjectsV2Response listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
         LOGGER.debug("Getting ListObjectsV2Response: {}", 
listObjectsV2Response);
         List<S3Object> filesReturned = listObjectsV2Response.contents();
-        filesReturned.forEach(visitor);
+        filesReturned.forEach(objectVisitor);
+        if (!recursive && listObjectsV2Response.hasCommonPrefixes() && 
commonPrefixVisitor != null) {
+          List<CommonPrefix> dirsReturned = 
listObjectsV2Response.commonPrefixes();
+          dirsReturned.forEach(commonPrefixVisitor);
+        }
         isDone = !listObjectsV2Response.isTruncated();
         continuationToken = listObjectsV2Response.nextContinuationToken();
       }
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
index 00ff8d03f9..0ea2b656b3 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
@@ -161,23 +161,21 @@ public class S3PinotFSTest {
       throws Exception {
     String folder = "list-files";
     String[] originalFiles = new String[]{"a-list-2.txt", "b-list-2.txt", 
"c-list-2.txt"};
+    List<String> expectedFileNames = new ArrayList<>();
 
     for (String fileName : originalFiles) {
       createEmptyFile(folder, fileName);
+      expectedFileNames.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folder 
+ DELIMITER + fileName));
     }
-    // Files in sub folders should be skipped.
-    createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt");
-    createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt");
-
-    String[] actualFiles = 
_s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, 
folder)), false);
-    Assert.assertEquals(actualFiles.length, originalFiles.length);
 
-    actualFiles = Arrays.stream(actualFiles).filter(x -> 
x.contains("list-2")).toArray(String[]::new);
-    Assert.assertEquals(actualFiles.length, originalFiles.length);
+    String[] subfolders = new String[]{"subfolder1", "subfolder2"};
+    for (String subfolder : subfolders) {
+      createEmptyFile(folder + DELIMITER + subfolder, "a-sub-file.txt");
+      expectedFileNames.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folder 
+ DELIMITER + subfolder));
+    }
 
-    Assert.assertTrue(Arrays.equals(Arrays.stream(originalFiles)
-            .map(fileName -> String.format(FILE_FORMAT, SCHEME, BUCKET, folder 
+ DELIMITER + fileName)).toArray(),
-        actualFiles));
+    String[] actualFiles = 
_s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, 
folder)), false);
+    Assert.assertEquals(actualFiles, expectedFileNames.toArray());
   }
 
   @Test
@@ -207,22 +205,30 @@ public class S3PinotFSTest {
       throws Exception {
     String folder = "list-files-with-md";
     String[] originalFiles = new String[]{"a-list-2.txt", "b-list-2.txt", 
"c-list-2.txt"};
+    List<String> expectedFilePaths = new ArrayList<>();
+    List<Boolean> expectedIsDirectories = new ArrayList<>();
+
     for (String fileName : originalFiles) {
       createEmptyFile(folder, fileName);
+      expectedFilePaths.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folder 
+ DELIMITER + fileName));
+      expectedIsDirectories.add(false);
+    }
+
+    String[] subfolders = new String[]{"subfolder1", "subfolder2"};
+    for (String subfolder : subfolders) {
+      createEmptyFile(folder + DELIMITER + subfolder, "a-sub-file.txt");
+      expectedFilePaths.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folder 
+ DELIMITER + subfolder));
+      expectedIsDirectories.add(true);
     }
-    // Files in sub folders should be skipped.
-    createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt");
-    createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt");
+
     List<FileMetadata> actualFiles =
         _s3PinotFS.listFilesWithMetadata(URI.create(String.format(FILE_FORMAT, 
SCHEME, BUCKET, folder)), false);
-    Assert.assertEquals(actualFiles.size(), originalFiles.length);
-    List<String> actualFilePaths =
-        actualFiles.stream().map(FileMetadata::getFilePath).filter(fp -> 
fp.contains("list-2"))
-            .collect(Collectors.toList());
-    Assert.assertEquals(actualFilePaths.size(), originalFiles.length);
-    Assert.assertEquals(Arrays.stream(originalFiles)
-        .map(fileName -> String.format(FILE_FORMAT, SCHEME, BUCKET, folder + 
DELIMITER + fileName))
-        .collect(Collectors.toList()), actualFilePaths);
+
+    List<String> actualFilePaths = 
actualFiles.stream().map(FileMetadata::getFilePath).collect(Collectors.toList());
+    List<Boolean> actualIsDirectories = 
actualFiles.stream().map(FileMetadata::isDirectory)
+        .collect(Collectors.toList());
+    Assert.assertEquals(actualFilePaths, expectedFilePaths);
+    Assert.assertEquals(actualIsDirectories, expectedIsDirectories);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to