This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cf23382ec [core] Refactor ExternalPathProvider to generate file in 
DataFilePathFactory
5cf23382ec is described below

commit 5cf23382ecf6f09d2662c26f0f3729658b2b959d
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jan 6 16:48:38 2025 +0800

    [core] Refactor ExternalPathProvider to generate file in DataFilePathFactory
---
 .../main/java/org/apache/paimon/CoreOptions.java   |   1 -
 .../paimon/fs/DataFileExternalPathProvider.java    |  82 ---------
 .../org/apache/paimon/fs/ExternalPathProvider.java |  51 ++++++
 .../paimon/fs/TableExternalPathProvider.java       | 192 --------------------
 .../paimon/fs/TableExternalPathProviderTest.java   | 194 ---------------------
 .../java/org/apache/paimon/AbstractFileStore.java  |  42 ++++-
 .../iceberg/manifest/IcebergManifestFile.java      |   1 -
 .../org/apache/paimon/io/DataFilePathFactory.java  |  25 ++-
 .../apache/paimon/io/KeyValueDataFileWriter.java   |   5 +-
 .../paimon/io/KeyValueFileWriterFactory.java       |  25 +--
 .../org/apache/paimon/io/RowDataFileWriter.java    |   5 +-
 .../org/apache/paimon/io/SingleFileWriter.java     |   5 +-
 .../paimon/io/StatsCollectingSingleFileWriter.java |   5 +-
 .../org/apache/paimon/manifest/ManifestFile.java   |   1 -
 .../apache/paimon/utils/FileStorePathFactory.java  |  22 +--
 15 files changed, 130 insertions(+), 526 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8c59cdcd41..002def9f44 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2213,7 +2213,6 @@ public class CoreOptions implements Serializable {
         return options.get(DATA_FILE_EXTERNAL_PATHS);
     }
 
-    @Nullable
     public ExternalPathStrategy externalPathStrategy() {
         return options.get(DATA_FILE_EXTERNAL_PATHS_STRATEGY);
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/DataFileExternalPathProvider.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/DataFileExternalPathProvider.java
deleted file mode 100644
index 71e6f78184..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/DataFileExternalPathProvider.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.fs;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.Objects;
-import java.util.Optional;
-
-/** Provider for external data paths. */
-public class DataFileExternalPathProvider implements Serializable {
-    @Nullable private final TableExternalPathProvider 
tableExternalPathProvider;
-    private final Path relativeBucketPath;
-
-    public DataFileExternalPathProvider(
-            @Nullable TableExternalPathProvider tableExternalPathProvider,
-            Path relativeBucketPath) {
-        this.tableExternalPathProvider = tableExternalPathProvider;
-        this.relativeBucketPath = relativeBucketPath;
-    }
-
-    /**
-     * Get the next external data path.
-     *
-     * @return the next external data path
-     */
-    public Optional<Path> getNextExternalDataPath() {
-        return Optional.ofNullable(tableExternalPathProvider)
-                .flatMap(TableExternalPathProvider::getNextExternalPath)
-                .map(path -> new Path(path, relativeBucketPath));
-    }
-
-    public boolean externalPathExists() {
-        return tableExternalPathProvider != null && 
tableExternalPathProvider.externalPathExists();
-    }
-
-    @Override
-    public final boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof DataFileExternalPathProvider)) {
-            return false;
-        }
-
-        DataFileExternalPathProvider that = (DataFileExternalPathProvider) o;
-        return Objects.equals(tableExternalPathProvider, 
that.tableExternalPathProvider)
-                && Objects.equals(relativeBucketPath, that.relativeBucketPath);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(tableExternalPathProvider, relativeBucketPath);
-    }
-
-    @Override
-    public String toString() {
-        return "DataFileExternalPathProvider{"
-                + " externalPathProvider="
-                + tableExternalPathProvider
-                + ", relativeBucketPath="
-                + relativeBucketPath
-                + "}";
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
new file mode 100644
index 0000000000..720773d64f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Provider for external data paths. */
+public class ExternalPathProvider implements Serializable {
+
+    private final List<Path> externalTablePaths;
+    private final Path relativeBucketPath;
+
+    private int position;
+
+    public ExternalPathProvider(List<Path> externalTablePaths, Path 
relativeBucketPath) {
+        this.externalTablePaths = externalTablePaths;
+        this.relativeBucketPath = relativeBucketPath;
+        this.position = 
ThreadLocalRandom.current().nextInt(externalTablePaths.size());
+    }
+
+    /**
+     * Get the next external data path.
+     *
+     * @return the next external data path
+     */
+    public Path getNextExternalDataPath(String fileName) {
+        position++;
+        if (position == externalTablePaths.size()) {
+            position = 0;
+        }
+        return new Path(new Path(externalTablePaths.get(position), 
relativeBucketPath), fileName);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/TableExternalPathProvider.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/TableExternalPathProvider.java
deleted file mode 100644
index 8ae94e22b7..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/TableExternalPathProvider.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.fs;
-
-import org.apache.paimon.CoreOptions.ExternalPathStrategy;
-import org.apache.paimon.annotation.VisibleForTesting;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Random;
-
-/** Provider for external paths. */
-public class TableExternalPathProvider implements Serializable {
-    private final Map<String, Path> externalPathsMap;
-    private final List<Path> externalPathsList;
-
-    private final ExternalPathStrategy externalPathStrategy;
-    private final String externalSpecificFS;
-    private int currentIndex = 0;
-    private boolean externalPathExists;
-
-    public TableExternalPathProvider(
-            String externalPaths,
-            ExternalPathStrategy externalPathStrategy,
-            String externalSpecificFS) {
-        this.externalPathsMap = new HashMap<>();
-        this.externalPathsList = new ArrayList<>();
-        this.externalPathStrategy = externalPathStrategy;
-        if (externalSpecificFS != null) {
-            this.externalSpecificFS = externalSpecificFS.toLowerCase();
-        } else {
-            this.externalSpecificFS = null;
-        }
-        initExternalPaths(externalPaths);
-        if (!externalPathsList.isEmpty()) {
-            this.currentIndex = new Random().nextInt(externalPathsList.size());
-        }
-    }
-
-    private void initExternalPaths(String externalPaths) {
-        if (externalPaths == null) {
-            return;
-        }
-
-        String[] tmpArray = externalPaths.split(",");
-        for (String s : tmpArray) {
-            Path path = new Path(s.trim());
-            String scheme = path.toUri().getScheme();
-            if (scheme == null) {
-                throw new IllegalArgumentException("scheme should not be null: 
" + path);
-            }
-            scheme = scheme.toLowerCase();
-            externalPathsMap.put(scheme, path);
-            externalPathsList.add(path);
-        }
-
-        if (externalPathStrategy != null
-                && 
externalPathStrategy.equals(ExternalPathStrategy.SPECIFIC_FS)) {
-            if (externalSpecificFS == null) {
-                throw new IllegalArgumentException("external specific fs 
should not be null: ");
-            }
-
-            if (!externalPathsMap.containsKey(externalSpecificFS)) {
-                throw new IllegalArgumentException(
-                        "external specific fs not found: " + 
externalSpecificFS);
-            }
-        }
-
-        if (!externalPathsMap.isEmpty()
-                && !externalPathsList.isEmpty()
-                && externalPathStrategy != ExternalPathStrategy.NONE) {
-            externalPathExists = true;
-        }
-    }
-
-    /**
-     * Get the next external path.
-     *
-     * @return the next external path
-     */
-    public Optional<Path> getNextExternalPath() {
-        if (externalPathsMap == null || externalPathsMap.isEmpty()) {
-            return Optional.empty();
-        }
-
-        switch (externalPathStrategy) {
-            case NONE:
-                return Optional.empty();
-            case SPECIFIC_FS:
-                return getSpecificFSExternalPath();
-            case ROUND_ROBIN:
-                return getRoundRobinPath();
-            default:
-                return Optional.empty();
-        }
-    }
-
-    private Optional<Path> getSpecificFSExternalPath() {
-        if (!externalPathsMap.containsKey(externalSpecificFS)) {
-            return Optional.empty();
-        }
-        return Optional.of(externalPathsMap.get(externalSpecificFS));
-    }
-
-    private Optional<Path> getRoundRobinPath() {
-        currentIndex = (currentIndex + 1) % externalPathsList.size();
-        return Optional.of(externalPathsList.get(currentIndex));
-    }
-
-    public boolean externalPathExists() {
-        return externalPathExists;
-    }
-
-    @VisibleForTesting
-    public Map<String, Path> getExternalPathsMap() {
-        return externalPathsMap;
-    }
-
-    @VisibleForTesting
-    public List<Path> getExternalPathsList() {
-        return externalPathsList;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        TableExternalPathProvider that = (TableExternalPathProvider) o;
-        return currentIndex == that.currentIndex
-                && externalPathExists == that.externalPathExists
-                && externalPathsMap.equals(that.externalPathsMap)
-                && externalPathsList.equals(that.externalPathsList)
-                && externalPathStrategy == that.externalPathStrategy
-                && Objects.equals(externalSpecificFS, that.externalSpecificFS);
-    }
-
-    @Override
-    public String toString() {
-        return "ExternalPathProvider{"
-                + " externalPathsMap="
-                + externalPathsMap
-                + ", externalPathsList="
-                + externalPathsList
-                + ", externalPathStrategy="
-                + externalPathStrategy
-                + ", externalSpecificFS='"
-                + externalSpecificFS
-                + '\''
-                + ", currentIndex="
-                + currentIndex
-                + ", externalPathExists="
-                + externalPathExists
-                + "}";
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-                externalPathsMap,
-                externalPathsList,
-                externalPathStrategy,
-                externalSpecificFS,
-                currentIndex,
-                externalPathExists);
-    }
-}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/TableExternalPathProviderTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/TableExternalPathProviderTest.java
deleted file mode 100644
index 0a444768cf..0000000000
--- 
a/paimon-common/src/test/java/org/apache/paimon/fs/TableExternalPathProviderTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.fs;
-
-import org.apache.paimon.CoreOptions.ExternalPathStrategy;
-
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Test for {@link TableExternalPathProvider}. */
-public class TableExternalPathProviderTest {
-    private TableExternalPathProvider provider;
-
-    @BeforeEach
-    public void setUp() {
-        provider =
-                new TableExternalPathProvider(
-                        "oss://bucket1/path1,s3://bucket2/path2",
-                        ExternalPathStrategy.ROUND_ROBIN,
-                        null);
-    }
-
-    @Test
-    public void testInitExternalPaths() {
-        assertThat(provider.externalPathExists()).isTrue();
-        assertThat(provider.getExternalPathsMap().size()).isEqualTo(2);
-        assertThat(provider.getExternalPathsList().size()).isEqualTo(2);
-    }
-
-    @Test
-    public void testGetNextExternalPathRoundRobinSimple() {
-        String path1 = "s3://bucket2/path2";
-        String path2 = "oss://bucket1/path1";
-        List<String> expectedPaths = new ArrayList<String>();
-        expectedPaths.add(path1);
-        expectedPaths.add(path2);
-        String externalPaths = path1 + "," + path2;
-        provider =
-                new TableExternalPathProvider(
-                        externalPaths, ExternalPathStrategy.ROUND_ROBIN, null);
-
-        // Collect the paths returned by getNextExternalPath
-        List<String> actualPaths = new ArrayList<>();
-        int expectIndex = 0;
-        for (int i = 0; i < 6; i++) { // Collect more paths to ensure all are 
covered
-            Optional<Path> path = provider.getNextExternalPath();
-            assertThat(path.isPresent()).isTrue();
-            actualPaths.add(path.get().toString());
-            if (i == 0) {
-                if (path.get().toString().equals(path1)) {
-                    expectIndex = 0;
-                } else if (path.get().toString().equals(path2)) {
-                    expectIndex = 1;
-                }
-            }
-
-            expectIndex = (expectIndex) % expectedPaths.size();
-            
assertThat(path.get().toString().equals(expectedPaths.get(expectIndex))).isTrue();
-            expectIndex++;
-        }
-
-        // Check that all expected paths are present in the actual paths
-        for (String expectedPath : expectedPaths) {
-            assertThat(actualPaths).contains(expectedPath);
-        }
-    }
-
-    @Test
-    public void testGetNextExternalPathRoundRobinComplex() {
-        List<String> expectedPathsList = new ArrayList<String>();
-        for (int i = 0; i < 20; i++) {
-            if (i % 2 == 0) {
-                expectedPathsList.add("oss://bucket1/path" + i);
-            } else {
-                expectedPathsList.add("s3://bucket2/path" + i);
-            }
-        }
-        String externalPaths = String.join(",", expectedPathsList);
-        provider =
-                new TableExternalPathProvider(
-                        externalPaths, ExternalPathStrategy.ROUND_ROBIN, null);
-
-        // Collect the paths returned by getNextExternalPath
-        List<String> actualPaths = new ArrayList<>();
-        int expectIndex = 0;
-        for (int i = 0; i < 40; i++) { // Collect more paths to ensure all are 
covered
-            Optional<Path> path = provider.getNextExternalPath();
-            assertThat(path.isPresent()).isTrue();
-            actualPaths.add(path.get().toString());
-            if (i == 0) {
-                for (int j = 0; j < expectedPathsList.size(); j++) {
-                    if 
(path.get().toString().equals(expectedPathsList.get(j))) {
-                        expectIndex = j;
-                        break;
-                    }
-                }
-            }
-            expectIndex = (expectIndex) % expectedPathsList.size();
-            
assertThat(path.get().toString().equals(expectedPathsList.get(expectIndex))).isTrue();
-            expectIndex++;
-        }
-
-        // Check that all expected paths are present in the actual paths
-        for (String expectedPath : expectedPathsList) {
-            assertThat(actualPaths).contains(expectedPath);
-        }
-    }
-
-    @Test
-    public void testGetNextExternalPathSpecificFS() {
-        provider =
-                new TableExternalPathProvider(
-                        "oss://bucket1/path1,s3://bucket2/path2",
-                        ExternalPathStrategy.SPECIFIC_FS,
-                        "OSS");
-
-        Optional<Path> path = provider.getNextExternalPath();
-        assertThat(path.isPresent()).isTrue();
-        assertThat(path.get().toString()).isEqualTo("oss://bucket1/path1");
-    }
-
-    @Test
-    public void testGetNextExternalPathNone() {
-        provider =
-                new TableExternalPathProvider(
-                        "oss://bucket1/path1,s3://bucket2/path2", 
ExternalPathStrategy.NONE, "OSS");
-
-        Optional<Path> path = provider.getNextExternalPath();
-        assertThat(path.isPresent()).isFalse();
-    }
-
-    @Test
-    public void testUnsupportedExternalPath() {
-        Assertions.assertThatThrownBy(
-                        () -> {
-                            new TableExternalPathProvider(
-                                    "hdfs://bucket1/path1",
-                                    ExternalPathStrategy.SPECIFIC_FS,
-                                    "oss");
-                        })
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    public void testUnsupportedExternalSpecificFS() {
-        assertThatThrownBy(
-                        () -> {
-                            provider =
-                                    new TableExternalPathProvider(
-                                            "oss://bucket1/path1",
-                                            ExternalPathStrategy.SPECIFIC_FS,
-                                            "S3");
-                        })
-                .satisfies(
-                        anyCauseMatches(
-                                IllegalArgumentException.class,
-                                "external specific fs not found: s3"));
-    }
-
-    @Test
-    public void testExternalSpecificFSNull() {
-        Assertions.assertThatThrownBy(
-                        () -> {
-                            new TableExternalPathProvider(
-                                    "oss://bucket1/path1", 
ExternalPathStrategy.SPECIFIC_FS, null);
-                        })
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 655d75edfa..8b24ec2a54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.TableExternalPathProvider;
 import org.apache.paimon.index.HashIndexFile;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
@@ -66,6 +65,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /**
  * Base {@link FileStore} implementation.
  *
@@ -122,15 +123,42 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.fileSuffixIncludeCompression(),
                 options.fileCompression(),
                 options.dataFilePathDirectory(),
-                getExternalPathProvider());
+                createExternalPaths());
     }
 
-    private TableExternalPathProvider getExternalPathProvider() {
+    private List<Path> createExternalPaths() {
         String externalPaths = options.dataFileExternalPaths();
-        ExternalPathStrategy externalPathStrategy = 
options.externalPathStrategy();
-        String externalSpecificFS = options.externalSpecificFS();
-        return new TableExternalPathProvider(
-                externalPaths, externalPathStrategy, externalSpecificFS);
+        ExternalPathStrategy strategy = options.externalPathStrategy();
+        if (externalPaths == null
+                || externalPaths.isEmpty()
+                || strategy == ExternalPathStrategy.NONE) {
+            return Collections.emptyList();
+        }
+
+        String specificFS = options.externalSpecificFS();
+
+        List<Path> paths = new ArrayList<>();
+        for (String pathString : externalPaths.split(",")) {
+            Path path = new Path(pathString.trim());
+            String scheme = path.toUri().getScheme();
+            if (scheme == null) {
+                throw new IllegalArgumentException("scheme should not be null: 
" + path);
+            }
+
+            if (strategy == ExternalPathStrategy.SPECIFIC_FS) {
+                checkArgument(
+                        specificFS != null,
+                        "External path specificFS should not be null when 
strategy is specificFS.");
+                if (scheme.equalsIgnoreCase(specificFS)) {
+                    paths.add(path);
+                }
+            } else {
+                paths.add(path);
+            }
+        }
+
+        checkArgument(!paths.isEmpty(), "External paths should not be empty");
+        return paths;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index d4cfc0b5ec..5955da6220 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -154,7 +154,6 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
                     path,
                     serializer::toRow,
                     fileCompression,
-                    false,
                     false);
             this.partitionStatsCollector = new 
SimpleStatsCollector(partitionType);
             this.sequenceNumber = sequenceNumber;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index be32afa644..a70a795ef0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.io;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.fs.DataFileExternalPathProvider;
+import org.apache.paimon.fs.ExternalPathProvider;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.FileEntry;
 
@@ -45,8 +45,7 @@ public class DataFilePathFactory {
     private final String changelogFilePrefix;
     private final boolean fileSuffixIncludeCompression;
     private final String fileCompression;
-    @Nullable private final DataFileExternalPathProvider 
dataFileExternalPathProvider;
-    private final boolean isExternalPath;
+    @Nullable private final ExternalPathProvider externalPathProvider;
 
     public DataFilePathFactory(
             Path parent,
@@ -55,7 +54,7 @@ public class DataFilePathFactory {
             String changelogFilePrefix,
             boolean fileSuffixIncludeCompression,
             String fileCompression,
-            @Nullable DataFileExternalPathProvider 
dataFileExternalPathProvider) {
+            @Nullable ExternalPathProvider externalPathProvider) {
         this.parent = parent;
         this.uuid = UUID.randomUUID().toString();
         this.pathCount = new AtomicInteger(0);
@@ -64,12 +63,7 @@ public class DataFilePathFactory {
         this.changelogFilePrefix = changelogFilePrefix;
         this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
         this.fileCompression = fileCompression;
-        this.dataFileExternalPathProvider = dataFileExternalPathProvider;
-        if (dataFileExternalPathProvider != null) {
-            this.isExternalPath = 
dataFileExternalPathProvider.externalPathExists();
-        } else {
-            this.isExternalPath = false;
-        }
+        this.externalPathProvider = externalPathProvider;
     }
 
     public Path newPath() {
@@ -85,10 +79,11 @@ public class DataFilePathFactory {
     }
 
     public Path newPath(String prefix) {
-        return Optional.ofNullable(dataFileExternalPathProvider)
-                .flatMap(DataFileExternalPathProvider::getNextExternalDataPath)
-                .map(path -> new Path(path, newFileName(prefix)))
-                .orElseGet(() -> new Path(parent, newFileName(prefix)));
+        String fileName = newFileName(prefix);
+        if (externalPathProvider != null) {
+            return externalPathProvider.getNextExternalDataPath(fileName);
+        }
+        return new Path(parent, fileName);
     }
 
     private String newFileName(String prefix) {
@@ -148,7 +143,7 @@ public class DataFilePathFactory {
     }
 
     public boolean isExternalPath() {
-        return isExternalPath;
+        return externalPathProvider != null;
     }
 
     @VisibleForTesting
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 359855260e..3c7f6b45bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -66,6 +66,7 @@ public abstract class KeyValueDataFileWriter
     private final int level;
 
     private final SimpleStatsConverter keyStatsConverter;
+    private final boolean isExternalPath;
     private final SimpleStatsConverter valueStatsConverter;
     private final InternalRowSerializer keySerializer;
     private final FileSource fileSource;
@@ -103,8 +104,7 @@ public abstract class KeyValueDataFileWriter
                 compression,
                 StatsCollectorFactories.createStatsFactories(
                         options, writeRowType.getFieldNames(), 
keyType.getFieldNames()),
-                options.asyncFileWrite(),
-                isExternalPath);
+                options.asyncFileWrite());
 
         this.keyType = keyType;
         this.valueType = valueType;
@@ -112,6 +112,7 @@ public abstract class KeyValueDataFileWriter
         this.level = level;
 
         this.keyStatsConverter = new SimpleStatsConverter(keyType);
+        this.isExternalPath = isExternalPath;
         this.valueStatsConverter = new SimpleStatsConverter(valueType, 
options.statsDenseStore());
         this.keySerializer = new InternalRowSerializer(keyType);
         this.fileSource = fileSource;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index feee724389..53d7dcb68d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -93,23 +93,24 @@ public class KeyValueFileWriterFactory {
     public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingMergeTreeFileWriter(
             int level, FileSource fileSource) {
         return new RollingFileWriter<>(
-                () ->
-                        createDataFileWriter(
-                                formatContext.pathFactory(level).newPath(),
-                                level,
-                                fileSource,
-                                
formatContext.pathFactory(level).isExternalPath()),
+                () -> {
+                    DataFilePathFactory pathFactory = 
formatContext.pathFactory(level);
+                    return createDataFileWriter(
+                            pathFactory.newPath(), level, fileSource, 
pathFactory.isExternalPath());
+                },
                 suggestedFileSize);
     }
 
     public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingChangelogFileWriter(int level) {
         return new RollingFileWriter<>(
-                () ->
-                        createDataFileWriter(
-                                
formatContext.pathFactory(level).newChangelogPath(),
-                                level,
-                                FileSource.APPEND,
-                                
formatContext.pathFactory(level).isExternalPath()),
+                () -> {
+                    DataFilePathFactory pathFactory = 
formatContext.pathFactory(level);
+                    return createDataFileWriter(
+                            pathFactory.newChangelogPath(),
+                            level,
+                            FileSource.APPEND,
+                            pathFactory.isExternalPath());
+                },
                 suggestedFileSize);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 97af4d067a..25906e2dfa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -49,6 +49,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
 
     private final long schemaId;
     private final LongCounter seqNumCounter;
+    private final boolean isExternalPath;
     private final SimpleStatsConverter statsArraySerializer;
     @Nullable private final DataFileIndexWriter dataFileIndexWriter;
     private final FileSource fileSource;
@@ -77,10 +78,10 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
                 simpleStatsExtractor,
                 fileCompression,
                 statsCollectors,
-                asyncFileWrite,
-                isExternalPath);
+                asyncFileWrite);
         this.schemaId = schemaId;
         this.seqNumCounter = seqNumCounter;
+        this.isExternalPath = isExternalPath;
         this.statsArraySerializer = new SimpleStatsConverter(writeSchema, 
statsDenseStore);
         this.dataFileIndexWriter =
                 DataFileIndexWriter.create(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index 7a3cc4143b..f303e85978 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -54,7 +54,6 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
 
     private long recordCount;
     protected boolean closed;
-    protected boolean isExternalPath;
 
     public SingleFileWriter(
             FileIO fileIO,
@@ -62,8 +61,7 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
             Path path,
             Function<T, InternalRow> converter,
             String compression,
-            boolean asyncWrite,
-            boolean isExternalPath) {
+            boolean asyncWrite) {
         this.fileIO = fileIO;
         this.path = path;
         this.converter = converter;
@@ -86,7 +84,6 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
 
         this.recordCount = 0;
         this.closed = false;
-        this.isExternalPath = isExternalPath;
     }
 
     public Path path() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index 0ab2de8c2b..67a3fa6d1a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -59,9 +59,8 @@ public abstract class StatsCollectingSingleFileWriter<T, R> 
extends SingleFileWr
             @Nullable SimpleStatsExtractor simpleStatsExtractor,
             String compression,
             SimpleColStatsCollector.Factory[] statsCollectors,
-            boolean asyncWrite,
-            boolean isExternalPath) {
-        super(fileIO, factory, path, converter, compression, asyncWrite, 
isExternalPath);
+            boolean asyncWrite) {
+        super(fileIO, factory, path, converter, compression, asyncWrite);
         this.simpleStatsExtractor = simpleStatsExtractor;
         if (this.simpleStatsExtractor == null) {
             this.simpleStatsCollector = new SimpleStatsCollector(writeSchema, 
statsCollectors);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 34bf77345a..61b3e8a517 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -132,7 +132,6 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     path,
                     serializer::toRow,
                     fileCompression,
-                    false,
                     false);
             this.partitionStatsCollector = new 
SimpleStatsCollector(partitionType);
             this.partitionStatsSerializer = new 
SimpleStatsConverter(partitionType);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 7a9a7c4dbe..81b6307a5f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -20,9 +20,8 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.fs.DataFileExternalPathProvider;
+import org.apache.paimon.fs.ExternalPathProvider;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.TableExternalPathProvider;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.types.RowType;
 
@@ -57,7 +56,7 @@ public class FileStorePathFactory {
     private final AtomicInteger indexManifestCount;
     private final AtomicInteger indexFileCount;
     private final AtomicInteger statsFileCount;
-    @Nullable private final TableExternalPathProvider 
tableExternalPathProvider;
+    private final List<Path> externalPaths;
 
     public FileStorePathFactory(
             Path root,
@@ -70,7 +69,7 @@ public class FileStorePathFactory {
             boolean fileSuffixIncludeCompression,
             String fileCompression,
             @Nullable String dataFilePathDirectory,
-            @Nullable TableExternalPathProvider tableExternalPathProvider) {
+            List<Path> externalPaths) {
         this.root = root;
         this.dataFilePathDirectory = dataFilePathDirectory;
         this.uuid = UUID.randomUUID().toString();
@@ -88,7 +87,7 @@ public class FileStorePathFactory {
         this.indexManifestCount = new AtomicInteger(0);
         this.indexFileCount = new AtomicInteger(0);
         this.statsFileCount = new AtomicInteger(0);
-        this.tableExternalPathProvider = tableExternalPathProvider;
+        this.externalPaths = externalPaths;
     }
 
     public Path root() {
@@ -133,13 +132,16 @@ public class FileStorePathFactory {
                 changelogFilePrefix,
                 fileSuffixIncludeCompression,
                 fileCompression,
-                getDataFileExternalPathProvider(
-                        tableExternalPathProvider, 
relativeBucketPath(partition, bucket)));
+                createExternalPathProvider(partition, bucket));
     }
 
-    private DataFileExternalPathProvider getDataFileExternalPathProvider(
-            TableExternalPathProvider tableExternalPathProvider, Path 
relativeBucketPath) {
-        return new DataFileExternalPathProvider(tableExternalPathProvider, 
relativeBucketPath);
+    @Nullable
+    private ExternalPathProvider createExternalPathProvider(BinaryRow 
partition, int bucket) {
+        if (externalPaths == null || externalPaths.isEmpty()) {
+            return null;
+        }
+
+        return new ExternalPathProvider(externalPaths, 
relativeBucketPath(partition, bucket));
     }
 
     public Path bucketPath(BinaryRow partition, int bucket) {


Reply via email to