This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2fddae2cb7 Make S3PinotFS listFiles return directories when
non-recursive (#14073)
2fddae2cb7 is described below
commit 2fddae2cb799bb86441f4f89f40aa5a7c93ba7c6
Author: William Gan <[email protected]>
AuthorDate: Mon Oct 7 18:12:10 2024 -0700
Make S3PinotFS listFiles return directories when non-recursive (#14073)
---
.../apache/pinot/plugin/filesystem/S3PinotFS.java | 27 +++++++++---
.../pinot/plugin/filesystem/S3PinotFSTest.java | 50 ++++++++++++----------
2 files changed, 50 insertions(+), 27 deletions(-)
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index d1129156a9..d603ef2ac7 100644
---
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -55,6 +55,7 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
@@ -501,11 +502,11 @@ public class S3PinotFS extends BasePinotFS {
throws IOException {
ImmutableList.Builder<String> builder = ImmutableList.builder();
visitFiles(fileUri, recursive, s3Object -> {
- // TODO: Looks like S3PinotFS filters out directories, inconsistent with
the other implementations.
- // Only add files and not directories
if (!s3Object.key().equals(fileUri.getPath()) &&
!s3Object.key().endsWith(DELIMITER)) {
builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER +
getNormalizedFileKey(s3Object));
}
+ }, commonPrefix -> {
+ builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER +
getNormalizedFileKey(commonPrefix));
});
String[] listedFiles = builder.build().toArray(new String[0]);
LOGGER.info("Listed {} files from URI: {}, is recursive: {}",
listedFiles.length, fileUri, recursive);
@@ -524,6 +525,11 @@ public class S3PinotFS extends BasePinotFS {
.setIsDirectory(s3Object.key().endsWith(DELIMITER));
listBuilder.add(fileBuilder.build());
}
+ }, commonPrefix -> {
+ FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
+ .setFilePath(S3_SCHEME + fileUri.getHost() + DELIMITER +
getNormalizedFileKey(commonPrefix))
+ .setIsDirectory(true);
+ listBuilder.add(fileBuilder.build());
});
ImmutableList<FileMetadata> listedFiles = listBuilder.build();
LOGGER.info("Listed {} files from URI: {}, is recursive: {}",
listedFiles.size(), fileUri, recursive);
@@ -538,8 +544,15 @@ public class S3PinotFS extends BasePinotFS {
return fileKey;
}
- private void visitFiles(URI fileUri, boolean recursive, Consumer<S3Object>
visitor)
- throws IOException {
+ private static String getNormalizedFileKey(CommonPrefix commonPrefix) {
+ String prefix = commonPrefix.prefix();
+ return prefix.substring(0, prefix.length() - 1);
+ }
+
+ private void visitFiles(URI fileUri, boolean recursive, Consumer<S3Object>
objectVisitor,
+ // S3 has a concept of CommonPrefixes which act like subdirectories:
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CommonPrefix.html
+ @Nullable Consumer<CommonPrefix> commonPrefixVisitor) throws IOException
{
try {
String continuationToken = null;
boolean isDone = false;
@@ -561,7 +574,11 @@ public class S3PinotFS extends BasePinotFS {
ListObjectsV2Response listObjectsV2Response =
_s3Client.listObjectsV2(listObjectsV2Request);
LOGGER.debug("Getting ListObjectsV2Response: {}",
listObjectsV2Response);
List<S3Object> filesReturned = listObjectsV2Response.contents();
- filesReturned.forEach(visitor);
+ filesReturned.forEach(objectVisitor);
+ if (!recursive && listObjectsV2Response.hasCommonPrefixes() &&
commonPrefixVisitor != null) {
+ List<CommonPrefix> dirsReturned =
listObjectsV2Response.commonPrefixes();
+ dirsReturned.forEach(commonPrefixVisitor);
+ }
isDone = !listObjectsV2Response.isTruncated();
continuationToken = listObjectsV2Response.nextContinuationToken();
}
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
index 00ff8d03f9..0ea2b656b3 100644
---
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
@@ -161,23 +161,21 @@ public class S3PinotFSTest {
throws Exception {
String folder = "list-files";
String[] originalFiles = new String[]{"a-list-2.txt", "b-list-2.txt",
"c-list-2.txt"};
+ List<String> expectedFileNames = new ArrayList<>();
for (String fileName : originalFiles) {
createEmptyFile(folder, fileName);
+ expectedFileNames.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folder
+ DELIMITER + fileName));
}
- // Files in sub folders should be skipped.
- createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt");
- createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt");
-
- String[] actualFiles =
_s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET,
folder)), false);
- Assert.assertEquals(actualFiles.length, originalFiles.length);
- actualFiles = Arrays.stream(actualFiles).filter(x ->
x.contains("list-2")).toArray(String[]::new);
- Assert.assertEquals(actualFiles.length, originalFiles.length);
+ String[] subfolders = new String[]{"subfolder1", "subfolder2"};
+ for (String subfolder : subfolders) {
+ createEmptyFile(folder + DELIMITER + subfolder, "a-sub-file.txt");
+ expectedFileNames.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folder
+ DELIMITER + subfolder));
+ }
- Assert.assertTrue(Arrays.equals(Arrays.stream(originalFiles)
- .map(fileName -> String.format(FILE_FORMAT, SCHEME, BUCKET, folder
+ DELIMITER + fileName)).toArray(),
- actualFiles));
+ String[] actualFiles =
_s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET,
folder)), false);
+ Assert.assertEquals(actualFiles, expectedFileNames.toArray());
}
@Test
@@ -207,22 +205,30 @@ public class S3PinotFSTest {
throws Exception {
String folder = "list-files-with-md";
String[] originalFiles = new String[]{"a-list-2.txt", "b-list-2.txt",
"c-list-2.txt"};
+ List<String> expectedFilePaths = new ArrayList<>();
+ List<Boolean> expectedIsDirectories = new ArrayList<>();
+
for (String fileName : originalFiles) {
createEmptyFile(folder, fileName);
+ expectedFilePaths.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folder
+ DELIMITER + fileName));
+ expectedIsDirectories.add(false);
+ }
+
+ String[] subfolders = new String[]{"subfolder1", "subfolder2"};
+ for (String subfolder : subfolders) {
+ createEmptyFile(folder + DELIMITER + subfolder, "a-sub-file.txt");
+ expectedFilePaths.add(String.format(FILE_FORMAT, SCHEME, BUCKET, folder
+ DELIMITER + subfolder));
+ expectedIsDirectories.add(true);
}
- // Files in sub folders should be skipped.
- createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt");
- createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt");
+
List<FileMetadata> actualFiles =
_s3PinotFS.listFilesWithMetadata(URI.create(String.format(FILE_FORMAT,
SCHEME, BUCKET, folder)), false);
- Assert.assertEquals(actualFiles.size(), originalFiles.length);
- List<String> actualFilePaths =
- actualFiles.stream().map(FileMetadata::getFilePath).filter(fp ->
fp.contains("list-2"))
- .collect(Collectors.toList());
- Assert.assertEquals(actualFilePaths.size(), originalFiles.length);
- Assert.assertEquals(Arrays.stream(originalFiles)
- .map(fileName -> String.format(FILE_FORMAT, SCHEME, BUCKET, folder +
DELIMITER + fileName))
- .collect(Collectors.toList()), actualFilePaths);
+
+ List<String> actualFilePaths =
actualFiles.stream().map(FileMetadata::getFilePath).collect(Collectors.toList());
+ List<Boolean> actualIsDirectories =
actualFiles.stream().map(FileMetadata::isDirectory)
+ .collect(Collectors.toList());
+ Assert.assertEquals(actualFilePaths, expectedFilePaths);
+ Assert.assertEquals(actualIsDirectories, expectedIsDirectories);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]