This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e6c1ad7005 [core] Refactor ExternalPathProviders abstraction
e6c1ad7005 is described below
commit e6c1ad7005b2673885a8d3579e8c30d6964c838e
Author: JingsongLi <[email protected]>
AuthorDate: Sun Dec 28 20:37:28 2025 +0800
[core] Refactor ExternalPathProviders abstraction
---
.../fs/EntropyInjectExternalPathProvider.java | 10 ++-
.../org/apache/paimon/fs/ExternalPathProvider.java | 44 ++++++------
...er.java => RoundRobinExternalPathProvider.java} | 14 ++--
.../paimon/io/ChainReadDataFilePathFactory.java | 4 +-
.../paimon/table/format/FormatTableFileWriter.java | 16 ++++-
.../apache/paimon/utils/FileStorePathFactory.java | 79 ++++------------------
6 files changed, 68 insertions(+), 99 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
b/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
index 5c1548fa6d..da44099793 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
@@ -26,7 +26,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
/** Provider for entropy inject external data paths. */
-public class EntropyInjectExternalPathProvider extends ExternalPathProvider {
+public class EntropyInjectExternalPathProvider implements ExternalPathProvider
{
private static final HashFunction HASH_FUNC = Hashing.murmur3_32();
private static final int HASH_BINARY_STRING_BITS = 20;
@@ -35,9 +35,15 @@ public class EntropyInjectExternalPathProvider extends
ExternalPathProvider {
// Will create DEPTH many dirs from the entropy
private static final int ENTROPY_DIR_DEPTH = 3;
+ private final List<Path> externalTablePaths;
+ private final Path relativeBucketPath;
+
+ private int position;
+
public EntropyInjectExternalPathProvider(
List<Path> externalTablePaths, Path relativeBucketPath) {
- super(externalTablePaths, relativeBucketPath);
+ this.externalTablePaths = externalTablePaths;
+ this.relativeBucketPath = relativeBucketPath;
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
index 9d95b1d7e1..65fb5deb74 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
@@ -18,34 +18,34 @@
package org.apache.paimon.fs;
+import org.apache.paimon.CoreOptions.ExternalPathStrategy;
+
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
/** Provider for external data paths. */
-public class ExternalPathProvider implements Serializable {
-
- protected final List<Path> externalTablePaths;
- protected final Path relativeBucketPath;
+public interface ExternalPathProvider extends Serializable {
- protected int position;
-
- public ExternalPathProvider(List<Path> externalTablePaths, Path
relativeBucketPath) {
- this.externalTablePaths = externalTablePaths;
- this.relativeBucketPath = relativeBucketPath;
- this.position =
ThreadLocalRandom.current().nextInt(externalTablePaths.size());
- }
+ Path getNextExternalDataPath(String fileName);
- /**
- * Get the next external data path.
- *
- * @return the next external data path
- */
- public Path getNextExternalDataPath(String fileName) {
- position++;
- if (position == externalTablePaths.size()) {
- position = 0;
+ @Nullable
+ static ExternalPathProvider create(
+ ExternalPathStrategy strategy, List<Path> externalTablePaths, Path
relativeBucketPath) {
+ switch (strategy) {
+ case ENTROPY_INJECT:
+ return new EntropyInjectExternalPathProvider(
+ externalTablePaths, relativeBucketPath);
+ case SPECIFIC_FS:
+ // specific fs can use round-robin with only one path
+ case ROUND_ROBIN:
+ return new RoundRobinExternalPathProvider(externalTablePaths,
relativeBucketPath);
+ case NONE:
+ return null;
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot support create external path provider for: " +
strategy);
}
- return new Path(new Path(externalTablePaths.get(position),
relativeBucketPath), fileName);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
b/paimon-common/src/main/java/org/apache/paimon/fs/RoundRobinExternalPathProvider.java
similarity index 80%
copy from
paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
copy to
paimon-common/src/main/java/org/apache/paimon/fs/RoundRobinExternalPathProvider.java
index 9d95b1d7e1..e9ef9fa431 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/RoundRobinExternalPathProvider.java
@@ -18,19 +18,18 @@
package org.apache.paimon.fs;
-import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
-/** Provider for external data paths. */
-public class ExternalPathProvider implements Serializable {
+/** Provider for the round-robin external data paths. */
+public class RoundRobinExternalPathProvider implements ExternalPathProvider {
- protected final List<Path> externalTablePaths;
- protected final Path relativeBucketPath;
+ private final List<Path> externalTablePaths;
+ private final Path relativeBucketPath;
- protected int position;
+ private int position;
- public ExternalPathProvider(List<Path> externalTablePaths, Path
relativeBucketPath) {
+ public RoundRobinExternalPathProvider(List<Path> externalTablePaths, Path
relativeBucketPath) {
this.externalTablePaths = externalTablePaths;
this.relativeBucketPath = relativeBucketPath;
this.position =
ThreadLocalRandom.current().nextInt(externalTablePaths.size());
@@ -41,6 +40,7 @@ public class ExternalPathProvider implements Serializable {
*
* @return the next external data path
*/
+ @Override
public Path getNextExternalDataPath(String fileName) {
position++;
if (position == externalTablePaths.size()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/ChainReadDataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/ChainReadDataFilePathFactory.java
index d3987951ea..9c6ebc95b7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/ChainReadDataFilePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/ChainReadDataFilePathFactory.java
@@ -18,8 +18,8 @@
package org.apache.paimon.io;
-import org.apache.paimon.fs.ExternalPathProvider;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.RoundRobinExternalPathProvider;
import org.apache.paimon.manifest.FileEntry;
import javax.annotation.Nullable;
@@ -38,7 +38,7 @@ public class ChainReadDataFilePathFactory extends
DataFilePathFactory {
String changelogFilePrefix,
boolean fileSuffixIncludeCompression,
String fileCompression,
- @Nullable ExternalPathProvider externalPathProvider,
+ @Nullable RoundRobinExternalPathProvider externalPathProvider,
ChainReadContext chainReadContext) {
super(
parent,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
index fc8951036d..4edb603523 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.RowType;
@@ -30,10 +31,12 @@ import org.apache.paimon.utils.FileStorePathFactory;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.paimon.format.FileFormat.fileFormat;
+import static
org.apache.paimon.utils.PartitionPathUtils.generatePartitionPathUtil;
/** File writer for format table. */
public class FormatTableFileWriter {
@@ -99,12 +102,21 @@ public class FormatTableFileWriter {
}
private FormatTableRecordWriter createWriter(BinaryRow partition) {
+ Path parent = pathFactory.root();
+ if (partition.getFieldCount() > 0) {
+ LinkedHashMap<String, String> partValues =
+
pathFactory.partitionComputer().generatePartValues(partition);
+ parent =
+ new Path(
+ parent,
+ generatePartitionPathUtil(
+ partValues,
options.formatTablePartitionOnlyValueInPath()));
+ }
return new FormatTableRecordWriter(
fileIO,
fileFormat,
options.targetFileSize(false),
- pathFactory.createFormatTableDataFilePathFactory(
- partition,
options.formatTablePartitionOnlyValueInPath()),
+ pathFactory.createDataFilePathFactory(parent, null),
writeRowType,
options.formatTableFileCompression());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index d95fecae42..37ed1b1fa7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -19,9 +19,9 @@
package org.apache.paimon.utils;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.ExternalPathStrategy;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.fs.EntropyInjectExternalPathProvider;
import org.apache.paimon.fs.ExternalPathProvider;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexInDataFileDirPathFactory;
@@ -80,7 +80,7 @@ public class FileStorePathFactory {
private final AtomicInteger indexFileCount;
private final AtomicInteger statsFileCount;
private final List<Path> externalPaths;
- private final CoreOptions.ExternalPathStrategy strategy;
+ private final ExternalPathStrategy strategy;
public FileStorePathFactory(
Path root,
@@ -94,7 +94,7 @@ public class FileStorePathFactory {
String fileCompression,
@Nullable String dataFilePathDirectory,
List<Path> externalPaths,
- CoreOptions.ExternalPathStrategy strategy,
+ ExternalPathStrategy strategy,
boolean indexFileInDataFileDir) {
this.root = root;
this.dataFilePathDirectory = dataFilePathDirectory;
@@ -168,14 +168,20 @@ public class FileStorePathFactory {
}
public DataFilePathFactory createDataFilePathFactory(BinaryRow partition,
int bucket) {
+ return createDataFilePathFactory(
+ bucketPath(partition, bucket),
createExternalPathProvider(partition, bucket));
+ }
+
+ public DataFilePathFactory createDataFilePathFactory(
+ Path parent, @Nullable ExternalPathProvider externalPathProvider) {
return new DataFilePathFactory(
- bucketPath(partition, bucket),
+ parent,
formatIdentifier,
dataFilePrefix,
changelogFilePrefix,
fileSuffixIncludeCompression,
fileCompression,
- createExternalPathProvider(partition, bucket));
+ externalPathProvider);
}
public ChainReadDataFilePathFactory createChainReadDataFilePathFactory(
@@ -194,64 +200,13 @@ public class FileStorePathFactory {
chainReadContext);
}
- public DataFilePathFactory createFormatTableDataFilePathFactory(
- BinaryRow partition, boolean onlyValue) {
- return new DataFilePathFactory(
- partitionPath(partition, onlyValue),
- formatIdentifier,
- dataFilePrefix,
- changelogFilePrefix,
- fileSuffixIncludeCompression,
- fileCompression,
- createExternalPartitionPathProvider(partition));
- }
-
- private ExternalPathProvider createExternalPartitionPathProvider(
- BinaryRow partition, boolean onlyValue) {
- if (externalPaths == null || externalPaths.isEmpty()) {
- return null;
- }
-
- return new ExternalPathProvider(externalPaths,
partitionPath(partition, onlyValue));
- }
-
- private ExternalPathProvider createExternalPartitionPathProvider(BinaryRow
partition) {
- if (externalPaths == null || externalPaths.isEmpty()) {
- return null;
- }
-
- return new ExternalPathProvider(externalPaths,
partitionPath(partition));
- }
-
- private Path partitionPath(BinaryRow partition, boolean onlyValue) {
- Path relativeBucketPath = null;
- String partitionPath = getPartitionString(partition, onlyValue);
- if (!partitionPath.isEmpty()) {
- relativeBucketPath = new Path(partitionPath);
- }
- if (dataFilePathDirectory != null) {
- relativeBucketPath =
- relativeBucketPath != null
- ? new Path(dataFilePathDirectory,
relativeBucketPath)
- : new Path(dataFilePathDirectory);
- }
- return relativeBucketPath != null ? new Path(root, relativeBucketPath)
: root;
- }
-
- public Path partitionPath(BinaryRow partition) {
- return partitionPath(partition, false);
- }
-
@Nullable
private ExternalPathProvider createExternalPathProvider(BinaryRow
partition, int bucket) {
if (externalPaths == null || externalPaths.isEmpty()) {
return null;
}
- if (strategy == CoreOptions.ExternalPathStrategy.ENTROPY_INJECT) {
- return new EntropyInjectExternalPathProvider(
- externalPaths, relativeBucketPath(partition, bucket));
- }
- return new ExternalPathProvider(externalPaths,
relativeBucketPath(partition, bucket));
+ return ExternalPathProvider.create(
+ strategy, externalPaths, relativeBucketPath(partition,
bucket));
}
public List<Path> getExternalPaths() {
@@ -286,12 +241,8 @@ public class FileStorePathFactory {
partition, "Partition row data is null. This
is unexpected.")));
}
- public String getPartitionString(BinaryRow partition, boolean onlyValue) {
- return PartitionPathUtils.generatePartitionPathUtil(
- partitionComputer.generatePartValues(
- Preconditions.checkNotNull(
- partition, "Partition row data is null. This
is unexpected.")),
- onlyValue);
+ public InternalRowPartitionComputer partitionComputer() {
+ return partitionComputer;
}
// @TODO, need to be changed