This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b152608f3f [common] A FileIO API to list files iteratively (#4834)
b152608f3f is described below

commit b152608f3f3bc4f4356fb7db10a0be6a54d8924c
Author: Xiaoguang Zhu <[email protected]>
AuthorDate: Fri Jan 17 13:52:34 2025 +0800

    [common] A FileIO API to list files iteratively (#4834)
---
 .../src/main/java/org/apache/paimon/fs/FileIO.java | 72 ++++++++++++++++++++++
 .../java/org/apache/paimon/fs/RemoteIterator.java  | 49 +++++++++++++++
 .../test/java/org/apache/paimon/fs/FileIOTest.java | 44 +++++++++++--
 3 files changed, 160 insertions(+), 5 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index fe2e31b8db..5ba16acfca 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -39,12 +39,15 @@ import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -104,6 +107,75 @@ public interface FileIO extends Serializable {
      */
     FileStatus[] listStatus(Path path) throws IOException;
 
+    /**
+     * List the statuses of the files in the given path if the path is a 
directory.
+     *
+     * @param path given path
+     * @param recursive if set to <code>true</code> will recursively list 
files in subdirectories,
+     *     otherwise only files in the current directory will be listed
+     * @return the statuses of the files in the given path
+     */
+    default FileStatus[] listFiles(Path path, boolean recursive) throws 
IOException {
+        List<FileStatus> files = new ArrayList<>();
+        try (RemoteIterator<FileStatus> iter = listFilesIterative(path, 
recursive)) {
+            while (iter.hasNext()) {
+                files.add(iter.next());
+            }
+        }
+        return files.toArray(new FileStatus[0]);
+    }
+
+    /**
+     * List the statuses of the files iteratively in the given path if the 
path is a directory.
+     *
+     * @param path given path
+     * @param recursive if set to <code>true</code> will recursively list 
files in subdirectories,
+     *     otherwise only files in the current directory will be listed
+     * @return an {@link RemoteIterator} over {@link FileStatus} of the files 
in the given path
+     */
+    default RemoteIterator<FileStatus> listFilesIterative(Path path, boolean 
recursive)
+            throws IOException {
+        return new RemoteIterator<FileStatus>() {
+            private Queue<FileStatus> files = new LinkedList<>();
+            private Queue<Path> subdirStack = new 
LinkedList<>(Collections.singletonList(path));
+
+            @Override
+            public boolean hasNext() throws IOException {
+                maybeUnpackSubdir();
+                return !files.isEmpty();
+            }
+
+            @Override
+            public FileStatus next() throws IOException {
+                maybeUnpackSubdir();
+                return files.remove();
+            }
+
+            private void maybeUnpackSubdir() throws IOException {
+                if (!files.isEmpty()) {
+                    return;
+                }
+                if (subdirStack.isEmpty()) {
+                    return;
+                }
+                FileStatus[] statuses = listStatus(subdirStack.remove());
+                for (FileStatus f : statuses) {
+                    if (!f.isDir()) {
+                        files.add(f);
+                        continue;
+                    }
+                    if (!recursive) {
+                        continue;
+                    }
+                    subdirStack.add(f.getPath());
+                }
+            }
+
+            @Override
+            public void close() throws IOException {}
+        };
+    }
+
     /**
      * List the statuses of the directories in the given path if the path is a 
directory.
      *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/RemoteIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/RemoteIterator.java
new file mode 100644
index 0000000000..c23de0c199
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/RemoteIterator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** An iterator for lazily listing remote entries. */
+public interface RemoteIterator<E> extends Closeable {
+
+    /**
+     * Checks if there are more entries to be iterated.
+     *
+     * @return whether there are more elements to be iterated
+     * @throws IOException - if failed to list entries lazily
+     */
+    boolean hasNext() throws IOException;
+
+    /**
+     * Gets the next entry to be iterated.
+     *
+     * @return the next entry
+     * @throws IOException - if failed to list entries lazily
+     */
+    E next() throws IOException;
+
+    /**
+     * Closes the iterator and its associated resources.
+     *
+     * @throws IOException - if failed to close the iterator
+     */
+    void close() throws IOException;
+}
diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java 
b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
index 8dbcf41857..367bce3837 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
@@ -35,6 +35,8 @@ import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
@@ -134,7 +136,39 @@ public class FileIOTest {
         assertThat(exception.get()).isNull();
     }
 
-    /** A {@link FileIO} on local filesystem to test the default copy 
implementation. */
+    @Test
+    public void testListFiles() throws Exception {
+        Path fileA = new Path(tempDir.resolve("a").toUri());
+        Path dirB = new Path(tempDir.resolve("b").toUri());
+        Path fileBC = new Path(tempDir.resolve("b/c").toUri());
+
+        FileIO fileIO = new LocalFileIO();
+        fileIO.writeFile(fileA, "fileA", false);
+        fileIO.mkdirs(dirB);
+        fileIO.writeFile(fileBC, "fileBC", false);
+
+        {
+            // if listing non-recursively, file "a" is the only file in the 
top level directory
+            FileStatus[] statuses = fileIO.listFiles(new 
Path(tempDir.toUri()), false);
+            assertThat(statuses.length).isEqualTo(1);
+            assertThat(statuses[0].getPath()).isEqualTo(fileA);
+        }
+
+        {
+            // if listing recursively, file "a" and "b/c" should be listed, 
directory "b" should be
+            // omitted
+            FileStatus[] statuses = fileIO.listFiles(new 
Path(tempDir.toUri()), true);
+            assertThat(statuses.length).isEqualTo(2);
+            statuses =
+                    Arrays.stream(statuses)
+                            .sorted(Comparator.comparing(FileStatus::getPath))
+                            .toArray(FileStatus[]::new);
+            assertThat(statuses[0].getPath()).isEqualTo(fileA);
+            assertThat(statuses[1].getPath()).isEqualTo(fileBC);
+        }
+    }
+
+    /** A {@link FileIO} on local filesystem to test various default 
implementations. */
     private static class DummyFileIO implements FileIO {
         private static final ReentrantLock RENAME_LOCK = new ReentrantLock();
 
@@ -169,13 +203,13 @@ public class FileIOTest {
         }
 
         @Override
-        public FileStatus getFileStatus(Path path) {
-            throw new UnsupportedOperationException();
+        public FileStatus getFileStatus(Path path) throws IOException {
+            return new LocalFileIO().getFileStatus(path);
         }
 
         @Override
-        public FileStatus[] listStatus(Path path) {
-            throw new UnsupportedOperationException();
+        public FileStatus[] listStatus(Path path) throws IOException {
+            return new LocalFileIO().listStatus(path);
         }
 
         @Override

Reply via email to