This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0596a0f333 [core] Add cache in StatsCollectorFactories
0596a0f333 is described below
commit 0596a0f33337c1c5cb0747c11d0f652c68f6f43e
Author: JingsongLi <[email protected]>
AuthorDate: Mon Sep 29 14:08:00 2025 +0800
[core] Add cache in StatsCollectorFactories
---
paimon-api/src/main/java/org/apache/paimon/types/BlobType.java | 10 ----------
.../main/java/org/apache/paimon/append/AppendOnlyWriter.java | 5 +++--
.../{MergeAllBatchReader.java => ForceSingleBatchReader.java} | 4 ++--
.../src/main/java/org/apache/paimon/io/DataFileMeta.java | 5 +++--
.../org/apache/paimon/operation/DataEvolutionSplitRead.java | 10 +++++-----
.../java/org/apache/paimon/operation/FileStoreCommitImpl.java | 2 +-
.../main/java/org/apache/paimon/schema/SchemaValidation.java | 3 ++-
.../paimon/table/source/DataEvolutionSplitGenerator.java | 4 ++--
.../java/org/apache/paimon/utils/StatsCollectorFactories.java | 7 ++++++-
9 files changed, 24 insertions(+), 26 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
index b5ead703f0..7f3fa010c4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
@@ -82,14 +82,4 @@ public final class BlobType extends DataType {
return Pair.of(new RowType(normalFields), new RowType(blobFields));
}
-
- public static boolean containsBlobType(RowType rowType) {
- for (DataField field : rowType.getFields()) {
- DataTypeRoot type = field.type().getTypeRoot();
- if (type == DataTypeRoot.BLOB) {
- return true;
- }
- }
- return false;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 4ad04b2610..4cfceefecc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -40,7 +40,6 @@ import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BatchRecordWriter;
import org.apache.paimon.utils.CommitIncrement;
@@ -61,6 +60,8 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import static org.apache.paimon.types.DataTypeRoot.BLOB;
+
/**
* A {@link RecordWriter} implementation that only accepts records which are
always insert
* operations and don't have any unique keys or sort keys.
@@ -292,7 +293,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
}
private RollingFileWriter<InternalRow, DataFileMeta>
createRollingRowWriter() {
- if (BlobType.containsBlobType(writeSchema)) {
+ if (writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB))) {
return new RollingBlobFileWriter(
fileIO,
schemaId,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
b/paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java
similarity index 95%
rename from
paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
rename to
paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java
index 911eafd3b4..f5a5f7f2e2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java
@@ -46,12 +46,12 @@ import java.io.IOException;
* <p>This reader is commonly used in data evolution scenarios where multiple
file formats or
* schemas need to be read as a unified stream.
*/
-public class MergeAllBatchReader implements RecordReader<InternalRow> {
+public class ForceSingleBatchReader implements RecordReader<InternalRow> {
private final RecordReader<InternalRow> multiBatchReader;
private ConcatBatch batch;
- public MergeAllBatchReader(RecordReader<InternalRow> multiBatchReader) {
+ public ForceSingleBatchReader(RecordReader<InternalRow> multiBatchReader) {
this.multiBatchReader = multiBatchReader;
this.batch = new ConcatBatch(multiBatchReader);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 68ca8b20a0..7f266d4554 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.blob.BlobFileFormatFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
@@ -256,8 +257,8 @@ public interface DataFileMeta {
String fileName();
- default boolean isBlob() {
- return fileName().endsWith(".blob");
+ default boolean isBlobFile() {
+ return fileName().endsWith("." + BlobFileFormatFactory.IDENTIFIER);
}
long fileSize();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index dd2809da7e..ead9fa18e6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -19,7 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.append.MergeAllBatchReader;
+import org.apache.paimon.append.ForceSingleBatchReader;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
@@ -176,7 +176,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
needMergeFiles,
file -> {
checkArgument(
- file.isBlob(), "Only blob file need to
call this method.");
+ file.isBlobFile(), "Only blob file need to
call this method.");
return schemaFetcher
.apply(file.schemaId())
.logicalRowType()
@@ -254,7 +254,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
readFields,
false));
fileRecordReaders[i] =
- new MergeAllBatchReader(
+ new ForceSingleBatchReader(
createFileReader(
partition, file, dataFilePathFactory,
formatReaderMapping));
}
@@ -356,7 +356,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
Map<Integer, BlobBunch> blobBunchMap = new HashMap<>();
long rowCount = -1;
for (DataFileMeta file : needMergeFiles) {
- if (file.isBlob()) {
+ if (file.isBlobFile()) {
int fieldId = blobFileToFieldId.apply(file);
final long expectedRowCount = rowCount;
blobBunchMap
@@ -450,7 +450,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
}
void add(DataFileMeta file) {
- if (!file.isBlob()) {
+ if (!file.isBlobFile()) {
throw new IllegalArgumentException("Only blob file can be
added to a blob bunch.");
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 870bd498c1..e311aa0b2e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1217,7 +1217,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
"This is a bug, file source field for row-tracking table
must present.");
if (entry.file().fileSource().get().equals(FileSource.APPEND)
&& entry.file().firstRowId() == null) {
- if (entry.file().isBlob()) {
+ if (entry.file().isBlobFile()) {
if (blobStart >= start) {
throw new IllegalStateException(
String.format(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 0521351a44..905d8ad647 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -79,6 +79,7 @@ import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQ
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.types.DataTypeRoot.ARRAY;
+import static org.apache.paimon.types.DataTypeRoot.BLOB;
import static org.apache.paimon.types.DataTypeRoot.MAP;
import static org.apache.paimon.types.DataTypeRoot.MULTISET;
import static org.apache.paimon.types.DataTypeRoot.ROW;
@@ -651,7 +652,7 @@ public class SchemaValidation {
"Data evolution config must disabled with
deletion-vectors.enabled");
}
- if (BlobType.containsBlobType(schema.logicalRowType())) {
+ if (schema.fields().stream().map(DataField::type).anyMatch(t ->
t.is(BLOB))) {
checkArgument(
options.dataEvolutionEnabled(),
"Data evolution config must enabled for table with BLOB
type column.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
index c2f115d32c..18ae25592a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
@@ -86,7 +86,7 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
value.firstRowId() == null
? Long.MIN_VALUE
: value.firstRowId())
- .thenComparingInt(f -> f.isBlob() ? 1 : 0)
+ .thenComparingInt(f -> f.isBlobFile() ? 1 : 0)
.thenComparing(
(f1, f2) -> {
// If firstRowId is the same, we should
read the file with
@@ -106,7 +106,7 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
splitByRowId.add(Collections.singletonList(file));
continue;
}
- if (!file.isBlob() && firstRowId != lastRowId) {
+ if (!file.isBlobFile() && firstRowId != lastRowId) {
if (!currentSplit.isEmpty()) {
splitByRowId.add(currentSplit);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
index 62bfd0c21b..6d57b7e7c4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
@@ -27,6 +27,8 @@ import org.apache.paimon.table.SpecialFields;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.STATS_MODE_SUFFIX;
@@ -36,13 +38,16 @@ import static org.apache.paimon.options.ConfigOptions.key;
public class StatsCollectorFactories {
private final CoreOptions options;
+ private final Map<List<String>, SimpleColStatsCollector.Factory[]> cache =
+ new ConcurrentHashMap<>();
public StatsCollectorFactories(CoreOptions options) {
this.options = options;
}
public SimpleColStatsCollector.Factory[] statsCollectors(List<String>
fieldNames) {
- return createStatsFactories(options.statsMode(), options, fieldNames);
+ return cache.computeIfAbsent(
+ fieldNames, k -> createStatsFactories(options.statsMode(),
options, fieldNames));
}
public static SimpleColStatsCollector.Factory[] createStatsFactories(