This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e6c1ad7005 [core] Refactor ExternalPathProviders abstraction
e6c1ad7005 is described below

commit e6c1ad7005b2673885a8d3579e8c30d6964c838e
Author: JingsongLi <[email protected]>
AuthorDate: Sun Dec 28 20:37:28 2025 +0800

    [core] Refactor ExternalPathProviders abstraction
---
 .../fs/EntropyInjectExternalPathProvider.java      | 10 ++-
 .../org/apache/paimon/fs/ExternalPathProvider.java | 44 ++++++------
 ...er.java => RoundRobinExternalPathProvider.java} | 14 ++--
 .../paimon/io/ChainReadDataFilePathFactory.java    |  4 +-
 .../paimon/table/format/FormatTableFileWriter.java | 16 ++++-
 .../apache/paimon/utils/FileStorePathFactory.java  | 79 ++++------------------
 6 files changed, 68 insertions(+), 99 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
index 5c1548fa6d..da44099793 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
@@ -26,7 +26,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 /** Provider for entropy inject external data paths. */
-public class EntropyInjectExternalPathProvider extends ExternalPathProvider {
+public class EntropyInjectExternalPathProvider implements ExternalPathProvider 
{
 
     private static final HashFunction HASH_FUNC = Hashing.murmur3_32();
     private static final int HASH_BINARY_STRING_BITS = 20;
@@ -35,9 +35,15 @@ public class EntropyInjectExternalPathProvider extends 
ExternalPathProvider {
     // Will create DEPTH many dirs from the entropy
     private static final int ENTROPY_DIR_DEPTH = 3;
 
+    private final List<Path> externalTablePaths;
+    private final Path relativeBucketPath;
+
+    private int position;
+
     public EntropyInjectExternalPathProvider(
             List<Path> externalTablePaths, Path relativeBucketPath) {
-        super(externalTablePaths, relativeBucketPath);
+        this.externalTablePaths = externalTablePaths;
+        this.relativeBucketPath = relativeBucketPath;
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
index 9d95b1d7e1..65fb5deb74 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
@@ -18,34 +18,34 @@
 
 package org.apache.paimon.fs;
 
+import org.apache.paimon.CoreOptions.ExternalPathStrategy;
+
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
 
 /** Provider for external data paths. */
-public class ExternalPathProvider implements Serializable {
-
-    protected final List<Path> externalTablePaths;
-    protected final Path relativeBucketPath;
+public interface ExternalPathProvider extends Serializable {
 
-    protected int position;
-
-    public ExternalPathProvider(List<Path> externalTablePaths, Path 
relativeBucketPath) {
-        this.externalTablePaths = externalTablePaths;
-        this.relativeBucketPath = relativeBucketPath;
-        this.position = 
ThreadLocalRandom.current().nextInt(externalTablePaths.size());
-    }
+    Path getNextExternalDataPath(String fileName);
 
-    /**
-     * Get the next external data path.
-     *
-     * @return the next external data path
-     */
-    public Path getNextExternalDataPath(String fileName) {
-        position++;
-        if (position == externalTablePaths.size()) {
-            position = 0;
+    @Nullable
+    static ExternalPathProvider create(
+            ExternalPathStrategy strategy, List<Path> externalTablePaths, Path 
relativeBucketPath) {
+        switch (strategy) {
+            case ENTROPY_INJECT:
+                return new EntropyInjectExternalPathProvider(
+                        externalTablePaths, relativeBucketPath);
+            case SPECIFIC_FS:
+                // specific fs can use round-robin with only one path
+            case ROUND_ROBIN:
+                return new RoundRobinExternalPathProvider(externalTablePaths, 
relativeBucketPath);
+            case NONE:
+                return null;
+            default:
+                throw new UnsupportedOperationException(
+                        "Cannot support create external path provider for: " + 
strategy);
         }
-        return new Path(new Path(externalTablePaths.get(position), 
relativeBucketPath), fileName);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/RoundRobinExternalPathProvider.java
similarity index 80%
copy from 
paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
copy to 
paimon-common/src/main/java/org/apache/paimon/fs/RoundRobinExternalPathProvider.java
index 9d95b1d7e1..e9ef9fa431 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/RoundRobinExternalPathProvider.java
@@ -18,19 +18,18 @@
 
 package org.apache.paimon.fs;
 
-import java.io.Serializable;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
-/** Provider for external data paths. */
-public class ExternalPathProvider implements Serializable {
+/** Provider for the round-robin external data paths. */
+public class RoundRobinExternalPathProvider implements ExternalPathProvider {
 
-    protected final List<Path> externalTablePaths;
-    protected final Path relativeBucketPath;
+    private final List<Path> externalTablePaths;
+    private final Path relativeBucketPath;
 
-    protected int position;
+    private int position;
 
-    public ExternalPathProvider(List<Path> externalTablePaths, Path 
relativeBucketPath) {
+    public RoundRobinExternalPathProvider(List<Path> externalTablePaths, Path 
relativeBucketPath) {
         this.externalTablePaths = externalTablePaths;
         this.relativeBucketPath = relativeBucketPath;
         this.position = 
ThreadLocalRandom.current().nextInt(externalTablePaths.size());
@@ -41,6 +40,7 @@ public class ExternalPathProvider implements Serializable {
      *
      * @return the next external data path
      */
+    @Override
     public Path getNextExternalDataPath(String fileName) {
         position++;
         if (position == externalTablePaths.size()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/ChainReadDataFilePathFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/ChainReadDataFilePathFactory.java
index d3987951ea..9c6ebc95b7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/ChainReadDataFilePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/ChainReadDataFilePathFactory.java
@@ -18,8 +18,8 @@
 
 package org.apache.paimon.io;
 
-import org.apache.paimon.fs.ExternalPathProvider;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.RoundRobinExternalPathProvider;
 import org.apache.paimon.manifest.FileEntry;
 
 import javax.annotation.Nullable;
@@ -38,7 +38,7 @@ public class ChainReadDataFilePathFactory extends 
DataFilePathFactory {
             String changelogFilePrefix,
             boolean fileSuffixIncludeCompression,
             String fileCompression,
-            @Nullable ExternalPathProvider externalPathProvider,
+            @Nullable RoundRobinExternalPathProvider externalPathProvider,
             ChainReadContext chainReadContext) {
         super(
                 parent,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
index fc8951036d..4edb603523 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.TwoPhaseOutputStream;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.types.RowType;
@@ -30,10 +31,12 @@ import org.apache.paimon.utils.FileStorePathFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.paimon.format.FileFormat.fileFormat;
+import static 
org.apache.paimon.utils.PartitionPathUtils.generatePartitionPathUtil;
 
 /** File writer for format table. */
 public class FormatTableFileWriter {
@@ -99,12 +102,21 @@ public class FormatTableFileWriter {
     }
 
     private FormatTableRecordWriter createWriter(BinaryRow partition) {
+        Path parent = pathFactory.root();
+        if (partition.getFieldCount() > 0) {
+            LinkedHashMap<String, String> partValues =
+                    
pathFactory.partitionComputer().generatePartValues(partition);
+            parent =
+                    new Path(
+                            parent,
+                            generatePartitionPathUtil(
+                                    partValues, 
options.formatTablePartitionOnlyValueInPath()));
+        }
         return new FormatTableRecordWriter(
                 fileIO,
                 fileFormat,
                 options.targetFileSize(false),
-                pathFactory.createFormatTableDataFilePathFactory(
-                        partition, 
options.formatTablePartitionOnlyValueInPath()),
+                pathFactory.createDataFilePathFactory(parent, null),
                 writeRowType,
                 options.formatTableFileCompression());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index d95fecae42..37ed1b1fa7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -19,9 +19,9 @@
 package org.apache.paimon.utils;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.ExternalPathStrategy;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.fs.EntropyInjectExternalPathProvider;
 import org.apache.paimon.fs.ExternalPathProvider;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexInDataFileDirPathFactory;
@@ -80,7 +80,7 @@ public class FileStorePathFactory {
     private final AtomicInteger indexFileCount;
     private final AtomicInteger statsFileCount;
     private final List<Path> externalPaths;
-    private final CoreOptions.ExternalPathStrategy strategy;
+    private final ExternalPathStrategy strategy;
 
     public FileStorePathFactory(
             Path root,
@@ -94,7 +94,7 @@ public class FileStorePathFactory {
             String fileCompression,
             @Nullable String dataFilePathDirectory,
             List<Path> externalPaths,
-            CoreOptions.ExternalPathStrategy strategy,
+            ExternalPathStrategy strategy,
             boolean indexFileInDataFileDir) {
         this.root = root;
         this.dataFilePathDirectory = dataFilePathDirectory;
@@ -168,14 +168,20 @@ public class FileStorePathFactory {
     }
 
     public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, 
int bucket) {
+        return createDataFilePathFactory(
+                bucketPath(partition, bucket), 
createExternalPathProvider(partition, bucket));
+    }
+
+    public DataFilePathFactory createDataFilePathFactory(
+            Path parent, @Nullable ExternalPathProvider externalPathProvider) {
         return new DataFilePathFactory(
-                bucketPath(partition, bucket),
+                parent,
                 formatIdentifier,
                 dataFilePrefix,
                 changelogFilePrefix,
                 fileSuffixIncludeCompression,
                 fileCompression,
-                createExternalPathProvider(partition, bucket));
+                externalPathProvider);
     }
 
     public ChainReadDataFilePathFactory createChainReadDataFilePathFactory(
@@ -194,64 +200,13 @@ public class FileStorePathFactory {
                 chainReadContext);
     }
 
-    public DataFilePathFactory createFormatTableDataFilePathFactory(
-            BinaryRow partition, boolean onlyValue) {
-        return new DataFilePathFactory(
-                partitionPath(partition, onlyValue),
-                formatIdentifier,
-                dataFilePrefix,
-                changelogFilePrefix,
-                fileSuffixIncludeCompression,
-                fileCompression,
-                createExternalPartitionPathProvider(partition));
-    }
-
-    private ExternalPathProvider createExternalPartitionPathProvider(
-            BinaryRow partition, boolean onlyValue) {
-        if (externalPaths == null || externalPaths.isEmpty()) {
-            return null;
-        }
-
-        return new ExternalPathProvider(externalPaths, 
partitionPath(partition, onlyValue));
-    }
-
-    private ExternalPathProvider createExternalPartitionPathProvider(BinaryRow 
partition) {
-        if (externalPaths == null || externalPaths.isEmpty()) {
-            return null;
-        }
-
-        return new ExternalPathProvider(externalPaths, 
partitionPath(partition));
-    }
-
-    private Path partitionPath(BinaryRow partition, boolean onlyValue) {
-        Path relativeBucketPath = null;
-        String partitionPath = getPartitionString(partition, onlyValue);
-        if (!partitionPath.isEmpty()) {
-            relativeBucketPath = new Path(partitionPath);
-        }
-        if (dataFilePathDirectory != null) {
-            relativeBucketPath =
-                    relativeBucketPath != null
-                            ? new Path(dataFilePathDirectory, 
relativeBucketPath)
-                            : new Path(dataFilePathDirectory);
-        }
-        return relativeBucketPath != null ? new Path(root, relativeBucketPath) 
: root;
-    }
-
-    public Path partitionPath(BinaryRow partition) {
-        return partitionPath(partition, false);
-    }
-
     @Nullable
     private ExternalPathProvider createExternalPathProvider(BinaryRow 
partition, int bucket) {
         if (externalPaths == null || externalPaths.isEmpty()) {
             return null;
         }
-        if (strategy == CoreOptions.ExternalPathStrategy.ENTROPY_INJECT) {
-            return new EntropyInjectExternalPathProvider(
-                    externalPaths, relativeBucketPath(partition, bucket));
-        }
-        return new ExternalPathProvider(externalPaths, 
relativeBucketPath(partition, bucket));
+        return ExternalPathProvider.create(
+                strategy, externalPaths, relativeBucketPath(partition, 
bucket));
     }
 
     public List<Path> getExternalPaths() {
@@ -286,12 +241,8 @@ public class FileStorePathFactory {
                                 partition, "Partition row data is null. This 
is unexpected.")));
     }
 
-    public String getPartitionString(BinaryRow partition, boolean onlyValue) {
-        return PartitionPathUtils.generatePartitionPathUtil(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This 
is unexpected.")),
-                onlyValue);
+    public InternalRowPartitionComputer partitionComputer() {
+        return partitionComputer;
     }
 
     // @TODO, need to be changed

Reply via email to