This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cfb0075629 [core] Optimized iterative list implementations for FileIO
(#4952)
cfb0075629 is described below
commit cfb007562914123f6f7ac17253e0cdf397a52a1a
Author: Xiaoguang Zhu <[email protected]>
AuthorDate: Tue Jan 21 12:39:41 2025 +0800
[core] Optimized iterative list implementations for FileIO (#4952)
---
.../org/apache/paimon/fs/hadoop/HadoopFileIO.java | 24 +++++++++++++
.../apache/paimon/fs/FileIOBehaviorTestBase.java | 39 ++++++++++++++++++++++
.../apache/paimon/oss/HadoopCompliantFileIO.java | 24 +++++++++++++
.../apache/paimon/s3/HadoopCompliantFileIO.java | 24 +++++++++++++
4 files changed, 111 insertions(+)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
index 0a8d64a73b..f17c587fe9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.hadoop.SerializableConfiguration;
import org.apache.paimon.utils.FunctionWithException;
@@ -104,6 +105,29 @@ public class HadoopFileIO implements FileIO {
return statuses;
}
+ @Override
+ public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean
recursive)
+ throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus>
hadoopIter =
+ getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
+ return new RemoteIterator<FileStatus>() {
+ @Override
+ public boolean hasNext() throws IOException {
+ return hadoopIter.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ org.apache.hadoop.fs.FileStatus hadoopStatus =
hadoopIter.next();
+ return new HadoopFileStatus(hadoopStatus);
+ }
+
+ @Override
+ public void close() throws IOException {}
+ };
+ }
+
@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
index 106dee38e3..7ca28b937d 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
@@ -25,6 +25,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import static org.assertj.core.api.Assertions.assertThat;
@@ -85,6 +87,43 @@ public abstract class FileIOBehaviorTestBase {
assertThat(fs.exists(new Path(basePath, randomName()))).isFalse();
}
+ // --- list files
+
+ @Test
+ void testListFilesIterativeNonRecursive() throws IOException {
+ Path fileA = createRandomFileInDirectory(basePath);
+ Path dirB = new Path(basePath, randomName());
+ fs.mkdirs(dirB);
+ Path fileBC = createRandomFileInDirectory(dirB);
+
+ List<FileStatus> allFiles = new ArrayList<>();
+ try (RemoteIterator<FileStatus> iter = fs.listFilesIterative(basePath,
false)) {
+ while (iter.hasNext()) {
+ allFiles.add(iter.next());
+ }
+ }
+ assertThat(allFiles.size()).isEqualTo(1);
+ assertThat(allFiles.get(0).getPath()).isEqualTo(fileA);
+ }
+
+ @Test
+ void testListFilesIterativeRecursive() throws IOException {
+ Path fileA = createRandomFileInDirectory(basePath);
+ Path dirB = new Path(basePath, randomName());
+ fs.mkdirs(dirB);
+ Path fileBC = createRandomFileInDirectory(dirB);
+
+ List<FileStatus> allFiles = new ArrayList<>();
+ try (RemoteIterator<FileStatus> iter = fs.listFilesIterative(basePath,
true)) {
+ while (iter.hasNext()) {
+ allFiles.add(iter.next());
+ }
+ }
+ assertThat(allFiles.size()).isEqualTo(2);
+ assertThat(allFiles.stream().filter(f ->
f.getPath().equals(fileA)).count()).isEqualTo(1);
+ assertThat(allFiles.stream().filter(f ->
f.getPath().equals(fileBC)).count()).isEqualTo(1);
+ }
+
// --- delete
@Test
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
index 67027eabad..8faa73d694 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
@@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -77,6 +78,29 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
return statuses;
}
+ @Override
+ public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean
recursive)
+ throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus>
hadoopIter =
+ getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
+ return new RemoteIterator<FileStatus>() {
+ @Override
+ public boolean hasNext() throws IOException {
+ return hadoopIter.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ org.apache.hadoop.fs.FileStatus hadoopStatus =
hadoopIter.next();
+ return new HadoopFileStatus(hadoopStatus);
+ }
+
+ @Override
+ public void close() throws IOException {}
+ };
+ }
+
@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
index 80f3df5820..1bac9087cb 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
@@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -77,6 +78,29 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
return statuses;
}
+ @Override
+ public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean
recursive)
+ throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus>
hadoopIter =
+ getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
+ return new RemoteIterator<FileStatus>() {
+ @Override
+ public boolean hasNext() throws IOException {
+ return hadoopIter.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ org.apache.hadoop.fs.FileStatus hadoopStatus =
hadoopIter.next();
+ return new HadoopFileStatus(hadoopStatus);
+ }
+
+ @Override
+ public void close() throws IOException {}
+ };
+ }
+
@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);