This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a0b055888 [core] Introduce DataFilePathFactories to unify cache 
factories
0a0b055888 is described below

commit 0a0b055888923e5868d15bf237d882716b4b0b43
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jan 17 14:40:41 2025 +0800

    [core] Introduce DataFilePathFactories to unify cache factories
---
 .../iceberg/AbstractIcebergCommitCallback.java     | 10 ++----
 .../apache/paimon/operation/FileDeletionBase.java  | 19 +++-------
 .../paimon/operation/FileStoreCommitImpl.java      | 10 ++----
 .../org/apache/paimon/operation/TagDeletion.java   | 11 ++----
 .../apache/paimon/table/sink/TableCommitImpl.java  | 13 ++-----
 .../apache/paimon/utils/DataFilePathFactories.java | 42 ++++++++++++++++++++++
 .../test/java/org/apache/paimon/TestFileStore.java | 17 +++------
 .../paimon/flink/sink/RewriteFileIndexSink.java    | 12 +++----
 8 files changed, 67 insertions(+), 67 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
index 9301dab3eb..71e5b57fbe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
@@ -55,6 +55,7 @@ import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.ManifestReadThreadPool;
 import org.apache.paimon.utils.Pair;
@@ -447,15 +448,10 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
             Map<String, BinaryRow> removedFiles,
             Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles) {
         boolean isAddOnly = true;
-        Map<Pair<BinaryRow, Integer>, DataFilePathFactory> 
dataFilePathFactoryMap = new HashMap<>();
+        DataFilePathFactories factories = new 
DataFilePathFactories(fileStorePathFactory);
         for (ManifestEntry entry : manifestEntries) {
-            Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), 
entry.bucket());
             DataFilePathFactory dataFilePathFactory =
-                    dataFilePathFactoryMap.computeIfAbsent(
-                            bucket,
-                            b ->
-                                    
fileStorePathFactory.createDataFilePathFactory(
-                                            entry.partition(), 
entry.bucket()));
+                    factories.get(entry.partition(), entry.bucket());
             String path = dataFilePathFactory.toPath(entry).toString();
             switch (entry.kind()) {
                 case ADD:
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 1a02924507..def9052021 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -34,6 +34,7 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.FileDeletionThreadPool;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
@@ -217,15 +218,10 @@ public abstract class FileDeletionBase<T extends 
Snapshot> {
             List<ExpireFileEntry> dataFileEntries) {
         // we cannot delete a data file directly when we meet a DELETE entry, 
because that
         // file might be upgraded
-        Map<Pair<BinaryRow, Integer>, DataFilePathFactory> 
dataFilePathFactoryMap = new HashMap<>();
+        DataFilePathFactories factories = new 
DataFilePathFactories(pathFactory);
         for (ExpireFileEntry entry : dataFileEntries) {
-            Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), 
entry.bucket());
             DataFilePathFactory dataFilePathFactory =
-                    dataFilePathFactoryMap.computeIfAbsent(
-                            bucket,
-                            b ->
-                                    pathFactory.createDataFilePathFactory(
-                                            entry.partition(), 
entry.bucket()));
+                    factories.get(entry.partition(), entry.bucket());
             Path dataFilePath = dataFilePathFactory.toPath(entry);
             switch (entry.kind()) {
                 case ADD:
@@ -267,15 +263,10 @@ public abstract class FileDeletionBase<T extends 
Snapshot> {
 
     private void deleteAddedDataFiles(List<ExpireFileEntry> manifestEntries) {
         List<Path> dataFileToDelete = new ArrayList<>();
-        Map<Pair<BinaryRow, Integer>, DataFilePathFactory> 
dataFilePathFactoryMap = new HashMap<>();
+        DataFilePathFactories factories = new 
DataFilePathFactories(pathFactory);
         for (ExpireFileEntry entry : manifestEntries) {
-            Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), 
entry.bucket());
             DataFilePathFactory dataFilePathFactory =
-                    dataFilePathFactoryMap.computeIfAbsent(
-                            bucket,
-                            b ->
-                                    pathFactory.createDataFilePathFactory(
-                                            entry.partition(), 
entry.bucket()));
+                    factories.get(entry.partition(), entry.bucket());
             if (entry.kind() == FileKind.ADD) {
                 dataFileToDelete.add(dataFilePathFactory.toPath(entry));
                 recordDeletionBuckets(entry);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7a264a41bf..af7e450e8e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -52,6 +52,7 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.Pair;
@@ -570,14 +571,9 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
     @Override
     public void abort(List<CommitMessage> commitMessages) {
-        Map<Pair<BinaryRow, Integer>, DataFilePathFactory> factoryMap = new 
HashMap<>();
+        DataFilePathFactories factories = new 
DataFilePathFactories(pathFactory);
         for (CommitMessage message : commitMessages) {
-            DataFilePathFactory pathFactory =
-                    factoryMap.computeIfAbsent(
-                            Pair.of(message.partition(), message.bucket()),
-                            k ->
-                                    this.pathFactory.createDataFilePathFactory(
-                                            k.getKey(), k.getValue()));
+            DataFilePathFactory pathFactory = 
factories.get(message.partition(), message.bucket());
             CommitMessageImpl commitMessage = (CommitMessageImpl) message;
             List<DataFileMeta> toDelete = new ArrayList<>();
             toDelete.addAll(commitMessage.newFilesIncrement().newFiles());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 532b902c18..bac941de6f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -28,8 +28,8 @@ import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,15 +80,10 @@ public class TagDeletion extends FileDeletionBase<Snapshot> 
{
         }
 
         Set<Path> dataFileToDelete = new HashSet<>();
-        Map<Pair<BinaryRow, Integer>, DataFilePathFactory> 
dataFilePathFactoryMap = new HashMap<>();
+        DataFilePathFactories factories = new 
DataFilePathFactories(pathFactory);
         for (ExpireFileEntry entry : manifestEntries) {
-            Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), 
entry.bucket());
             DataFilePathFactory dataFilePathFactory =
-                    dataFilePathFactoryMap.computeIfAbsent(
-                            bucket,
-                            b ->
-                                    pathFactory.createDataFilePathFactory(
-                                            entry.partition(), 
entry.bucket()));
+                    factories.get(entry.partition(), entry.bucket());
             if (!skipper.test(entry)) {
                 dataFileToDelete.add(dataFilePathFactory.toPath(entry));
                 for (String file : entry.extraFiles()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 9f965892f4..44c89ae06b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.sink;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
@@ -31,8 +30,8 @@ import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -246,19 +245,13 @@ public class TableCommitImpl implements InnerTableCommit {
 
     private void checkFilesExistence(List<ManifestCommittable> committables) {
         List<Path> files = new ArrayList<>();
-        Map<Pair<BinaryRow, Integer>, DataFilePathFactory> factoryMap = new 
HashMap<>();
+        DataFilePathFactories factories = new 
DataFilePathFactories(commit.pathFactory());
         PathFactory indexFileFactory = commit.pathFactory().indexFileFactory();
         for (ManifestCommittable committable : committables) {
             for (CommitMessage message : committable.fileCommittables()) {
                 CommitMessageImpl msg = (CommitMessageImpl) message;
                 DataFilePathFactory pathFactory =
-                        factoryMap.computeIfAbsent(
-                                Pair.of(message.partition(), message.bucket()),
-                                k ->
-                                        commit.pathFactory()
-                                                .createDataFilePathFactory(
-                                                        k.getKey(), 
k.getValue()));
-
+                        factories.get(message.partition(), message.bucket());
                 Consumer<DataFileMeta> collector = f -> 
files.addAll(f.collectFiles(pathFactory));
                 msg.newFilesIncrement().newFiles().forEach(collector);
                 msg.newFilesIncrement().changelogFiles().forEach(collector);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/DataFilePathFactories.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/DataFilePathFactories.java
new file mode 100644
index 0000000000..77f5dce995
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/DataFilePathFactories.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFilePathFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Cache for {@link DataFilePathFactory}s. */
+public class DataFilePathFactories {
+
+    private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory> cache = 
new HashMap<>();
+    private final FileStorePathFactory pathFactory;
+
+    public DataFilePathFactories(FileStorePathFactory pathFactory) {
+        this.pathFactory = pathFactory;
+    }
+
+    public DataFilePathFactory get(BinaryRow partition, int bucket) {
+        return cache.computeIfAbsent(
+                Pair.of(partition, bucket),
+                k -> pathFactory.createDataFilePathFactory(k.getKey(), 
k.getValue()));
+    }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 48fa81c525..62f2c38e13 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -58,6 +58,7 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.RecordWriter;
@@ -645,16 +646,11 @@ public class TestFileStore extends KeyValueFileStore {
                         .flatMap(m -> manifestFile.read(m.fileName()).stream())
                         .collect(Collectors.toList());
         entries = new ArrayList<>(FileEntry.mergeEntries(entries));
-        Map<Pair<BinaryRow, Integer>, DataFilePathFactory> 
dataFilePathFactoryMap = new HashMap<>();
+        DataFilePathFactories factories = new 
DataFilePathFactories(pathFactory);
 
         for (ManifestEntry entry : entries) {
-            Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), 
entry.bucket());
             DataFilePathFactory dataFilePathFactory =
-                    dataFilePathFactoryMap.computeIfAbsent(
-                            bucket,
-                            b ->
-                                    pathFactory.createDataFilePathFactory(
-                                            entry.partition(), 
entry.bucket()));
+                    factories.get(entry.partition(), entry.bucket());
             result.add(dataFilePathFactory.toPath(entry));
         }
 
@@ -674,13 +670,8 @@ public class TestFileStore extends KeyValueFileStore {
                 if (entry.kind() == FileKind.DELETE
                         && entry.file().fileSource().orElse(FileSource.APPEND)
                                 == FileSource.APPEND) {
-                    Pair<BinaryRow, Integer> bucket = 
Pair.of(entry.partition(), entry.bucket());
                     DataFilePathFactory dataFilePathFactory =
-                            dataFilePathFactoryMap.computeIfAbsent(
-                                    bucket,
-                                    b ->
-                                            
pathFactory.createDataFilePathFactory(
-                                                    entry.partition(), 
entry.bucket()));
+                            factories.get(entry.partition(), entry.bucket());
                     result.add(dataFilePathFactory.toPath(entry));
                 }
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index 99061d4b82..bd7ae4a824 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -42,8 +42,8 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Pair;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -161,7 +161,7 @@ public class RewriteFileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
         private final FileIndexOptions fileIndexOptions;
         private final FileIO fileIO;
         private final FileStorePathFactory pathFactory;
-        private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory> 
dataFilePathFactoryMap;
+        private final DataFilePathFactories pathFactories;
         private final SchemaCache schemaInfoCache;
         private final long sizeInMeta;
 
@@ -170,7 +170,7 @@ public class RewriteFileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
             this.fileIndexOptions = table.coreOptions().indexColumnsOptions();
             this.fileIO = table.fileIO();
             this.pathFactory = table.store().pathFactory();
-            this.dataFilePathFactoryMap = new HashMap<>();
+            this.pathFactories = new DataFilePathFactories(pathFactory);
             this.schemaInfoCache =
                     new SchemaCache(fileIndexOptions, new 
SchemaManager(fileIO, table.location()));
             this.sizeInMeta = 
table.coreOptions().fileIndexInManifestThreshold();
@@ -178,11 +178,7 @@ public class RewriteFileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
 
         public DataFileMeta process(BinaryRow partition, int bucket, 
DataFileMeta dataFileMeta)
                 throws IOException {
-            DataFilePathFactory dataFilePathFactory =
-                    dataFilePathFactoryMap.computeIfAbsent(
-                            Pair.of(partition, bucket),
-                            p -> 
pathFactory.createDataFilePathFactory(partition, bucket));
-
+            DataFilePathFactory dataFilePathFactory = 
pathFactories.get(partition, bucket);
             SchemaInfo schemaInfo = 
schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
             List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
             List<String> indexFiles =

Reply via email to