This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0a0b055888 [core] Introduce DataFilePathFactories to unify cache
factories
0a0b055888 is described below
commit 0a0b055888923e5868d15bf237d882716b4b0b43
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jan 17 14:40:41 2025 +0800
[core] Introduce DataFilePathFactories to unify cache factories
---
.../iceberg/AbstractIcebergCommitCallback.java | 10 ++----
.../apache/paimon/operation/FileDeletionBase.java | 19 +++-------
.../paimon/operation/FileStoreCommitImpl.java | 10 ++----
.../org/apache/paimon/operation/TagDeletion.java | 11 ++----
.../apache/paimon/table/sink/TableCommitImpl.java | 13 ++-----
.../apache/paimon/utils/DataFilePathFactories.java | 42 ++++++++++++++++++++++
.../test/java/org/apache/paimon/TestFileStore.java | 17 +++------
.../paimon/flink/sink/RewriteFileIndexSink.java | 12 +++----
8 files changed, 67 insertions(+), 67 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
index 9301dab3eb..71e5b57fbe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
@@ -55,6 +55,7 @@ import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
@@ -447,15 +448,10 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
Map<String, BinaryRow> removedFiles,
Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles) {
boolean isAddOnly = true;
- Map<Pair<BinaryRow, Integer>, DataFilePathFactory>
dataFilePathFactoryMap = new HashMap<>();
+ DataFilePathFactories factories = new
DataFilePathFactories(fileStorePathFactory);
for (ManifestEntry entry : manifestEntries) {
- Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(),
entry.bucket());
DataFilePathFactory dataFilePathFactory =
- dataFilePathFactoryMap.computeIfAbsent(
- bucket,
- b ->
-
fileStorePathFactory.createDataFilePathFactory(
- entry.partition(),
entry.bucket()));
+ factories.get(entry.partition(), entry.bucket());
String path = dataFilePathFactory.toPath(entry).toString();
switch (entry.kind()) {
case ADD:
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 1a02924507..def9052021 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -34,6 +34,7 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileDeletionThreadPool;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
@@ -217,15 +218,10 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
List<ExpireFileEntry> dataFileEntries) {
// we cannot delete a data file directly when we meet a DELETE entry,
because that
// file might be upgraded
- Map<Pair<BinaryRow, Integer>, DataFilePathFactory>
dataFilePathFactoryMap = new HashMap<>();
+ DataFilePathFactories factories = new
DataFilePathFactories(pathFactory);
for (ExpireFileEntry entry : dataFileEntries) {
- Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(),
entry.bucket());
DataFilePathFactory dataFilePathFactory =
- dataFilePathFactoryMap.computeIfAbsent(
- bucket,
- b ->
- pathFactory.createDataFilePathFactory(
- entry.partition(),
entry.bucket()));
+ factories.get(entry.partition(), entry.bucket());
Path dataFilePath = dataFilePathFactory.toPath(entry);
switch (entry.kind()) {
case ADD:
@@ -267,15 +263,10 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
private void deleteAddedDataFiles(List<ExpireFileEntry> manifestEntries) {
List<Path> dataFileToDelete = new ArrayList<>();
- Map<Pair<BinaryRow, Integer>, DataFilePathFactory>
dataFilePathFactoryMap = new HashMap<>();
+ DataFilePathFactories factories = new
DataFilePathFactories(pathFactory);
for (ExpireFileEntry entry : manifestEntries) {
- Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(),
entry.bucket());
DataFilePathFactory dataFilePathFactory =
- dataFilePathFactoryMap.computeIfAbsent(
- bucket,
- b ->
- pathFactory.createDataFilePathFactory(
- entry.partition(),
entry.bucket()));
+ factories.get(entry.partition(), entry.bucket());
if (entry.kind() == FileKind.ADD) {
dataFileToDelete.add(dataFilePathFactory.toPath(entry));
recordDeletionBuckets(entry);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7a264a41bf..af7e450e8e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -52,6 +52,7 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
@@ -570,14 +571,9 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
@Override
public void abort(List<CommitMessage> commitMessages) {
- Map<Pair<BinaryRow, Integer>, DataFilePathFactory> factoryMap = new
HashMap<>();
+ DataFilePathFactories factories = new
DataFilePathFactories(pathFactory);
for (CommitMessage message : commitMessages) {
- DataFilePathFactory pathFactory =
- factoryMap.computeIfAbsent(
- Pair.of(message.partition(), message.bucket()),
- k ->
- this.pathFactory.createDataFilePathFactory(
- k.getKey(), k.getValue()));
+ DataFilePathFactory pathFactory =
factories.get(message.partition(), message.bucket());
CommitMessageImpl commitMessage = (CommitMessageImpl) message;
List<DataFileMeta> toDelete = new ArrayList<>();
toDelete.addAll(commitMessage.newFilesIncrement().newFiles());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 532b902c18..bac941de6f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -28,8 +28,8 @@ import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,15 +80,10 @@ public class TagDeletion extends FileDeletionBase<Snapshot>
{
}
Set<Path> dataFileToDelete = new HashSet<>();
- Map<Pair<BinaryRow, Integer>, DataFilePathFactory>
dataFilePathFactoryMap = new HashMap<>();
+ DataFilePathFactories factories = new
DataFilePathFactories(pathFactory);
for (ExpireFileEntry entry : manifestEntries) {
- Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(),
entry.bucket());
DataFilePathFactory dataFilePathFactory =
- dataFilePathFactoryMap.computeIfAbsent(
- bucket,
- b ->
- pathFactory.createDataFilePathFactory(
- entry.partition(),
entry.bucket()));
+ factories.get(entry.partition(), entry.bucket());
if (!skipper.test(entry)) {
dataFileToDelete.add(dataFilePathFactory.toPath(entry));
for (String file : entry.extraFiles()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 9f965892f4..44c89ae06b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
@@ -31,8 +30,8 @@ import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -246,19 +245,13 @@ public class TableCommitImpl implements InnerTableCommit {
private void checkFilesExistence(List<ManifestCommittable> committables) {
List<Path> files = new ArrayList<>();
- Map<Pair<BinaryRow, Integer>, DataFilePathFactory> factoryMap = new
HashMap<>();
+ DataFilePathFactories factories = new
DataFilePathFactories(commit.pathFactory());
PathFactory indexFileFactory = commit.pathFactory().indexFileFactory();
for (ManifestCommittable committable : committables) {
for (CommitMessage message : committable.fileCommittables()) {
CommitMessageImpl msg = (CommitMessageImpl) message;
DataFilePathFactory pathFactory =
- factoryMap.computeIfAbsent(
- Pair.of(message.partition(), message.bucket()),
- k ->
- commit.pathFactory()
- .createDataFilePathFactory(
- k.getKey(),
k.getValue()));
-
+ factories.get(message.partition(), message.bucket());
Consumer<DataFileMeta> collector = f ->
files.addAll(f.collectFiles(pathFactory));
msg.newFilesIncrement().newFiles().forEach(collector);
msg.newFilesIncrement().changelogFiles().forEach(collector);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/DataFilePathFactories.java
b/paimon-core/src/main/java/org/apache/paimon/utils/DataFilePathFactories.java
new file mode 100644
index 0000000000..77f5dce995
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/DataFilePathFactories.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFilePathFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Cache for {@link DataFilePathFactory}s. */
+public class DataFilePathFactories {
+
+ private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory> cache =
new HashMap<>();
+ private final FileStorePathFactory pathFactory;
+
+ public DataFilePathFactories(FileStorePathFactory pathFactory) {
+ this.pathFactory = pathFactory;
+ }
+
+ public DataFilePathFactory get(BinaryRow partition, int bucket) {
+ return cache.computeIfAbsent(
+ Pair.of(partition, bucket),
+ k -> pathFactory.createDataFilePathFactory(k.getKey(),
k.getValue()));
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 48fa81c525..62f2c38e13 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -58,6 +58,7 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.RecordWriter;
@@ -645,16 +646,11 @@ public class TestFileStore extends KeyValueFileStore {
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
entries = new ArrayList<>(FileEntry.mergeEntries(entries));
- Map<Pair<BinaryRow, Integer>, DataFilePathFactory>
dataFilePathFactoryMap = new HashMap<>();
+ DataFilePathFactories factories = new
DataFilePathFactories(pathFactory);
for (ManifestEntry entry : entries) {
- Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(),
entry.bucket());
DataFilePathFactory dataFilePathFactory =
- dataFilePathFactoryMap.computeIfAbsent(
- bucket,
- b ->
- pathFactory.createDataFilePathFactory(
- entry.partition(),
entry.bucket()));
+ factories.get(entry.partition(), entry.bucket());
result.add(dataFilePathFactory.toPath(entry));
}
@@ -674,13 +670,8 @@ public class TestFileStore extends KeyValueFileStore {
if (entry.kind() == FileKind.DELETE
&& entry.file().fileSource().orElse(FileSource.APPEND)
== FileSource.APPEND) {
- Pair<BinaryRow, Integer> bucket =
Pair.of(entry.partition(), entry.bucket());
DataFilePathFactory dataFilePathFactory =
- dataFilePathFactoryMap.computeIfAbsent(
- bucket,
- b ->
-
pathFactory.createDataFilePathFactory(
- entry.partition(),
entry.bucket()));
+ factories.get(entry.partition(), entry.bucket());
result.add(dataFilePathFactory.toPath(entry));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index 99061d4b82..bd7ae4a824 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -42,8 +42,8 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Pair;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -161,7 +161,7 @@ public class RewriteFileIndexSink extends
FlinkWriteSink<ManifestEntry> {
private final FileIndexOptions fileIndexOptions;
private final FileIO fileIO;
private final FileStorePathFactory pathFactory;
- private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory>
dataFilePathFactoryMap;
+ private final DataFilePathFactories pathFactories;
private final SchemaCache schemaInfoCache;
private final long sizeInMeta;
@@ -170,7 +170,7 @@ public class RewriteFileIndexSink extends
FlinkWriteSink<ManifestEntry> {
this.fileIndexOptions = table.coreOptions().indexColumnsOptions();
this.fileIO = table.fileIO();
this.pathFactory = table.store().pathFactory();
- this.dataFilePathFactoryMap = new HashMap<>();
+ this.pathFactories = new DataFilePathFactories(pathFactory);
this.schemaInfoCache =
new SchemaCache(fileIndexOptions, new
SchemaManager(fileIO, table.location()));
this.sizeInMeta =
table.coreOptions().fileIndexInManifestThreshold();
@@ -178,11 +178,7 @@ public class RewriteFileIndexSink extends
FlinkWriteSink<ManifestEntry> {
public DataFileMeta process(BinaryRow partition, int bucket,
DataFileMeta dataFileMeta)
throws IOException {
- DataFilePathFactory dataFilePathFactory =
- dataFilePathFactoryMap.computeIfAbsent(
- Pair.of(partition, bucket),
- p ->
pathFactory.createDataFilePathFactory(partition, bucket));
-
+ DataFilePathFactory dataFilePathFactory =
pathFactories.get(partition, bucket);
SchemaInfo schemaInfo =
schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
List<String> indexFiles =