This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0596a0f333 [core] Add cache in StatsCollectorFactories
0596a0f333 is described below

commit 0596a0f33337c1c5cb0747c11d0f652c68f6f43e
Author: JingsongLi <[email protected]>
AuthorDate: Mon Sep 29 14:08:00 2025 +0800

    [core] Add cache in StatsCollectorFactories
---
 paimon-api/src/main/java/org/apache/paimon/types/BlobType.java | 10 ----------
 .../main/java/org/apache/paimon/append/AppendOnlyWriter.java   |  5 +++--
 .../{MergeAllBatchReader.java => ForceSingleBatchReader.java}  |  4 ++--
 .../src/main/java/org/apache/paimon/io/DataFileMeta.java       |  5 +++--
 .../org/apache/paimon/operation/DataEvolutionSplitRead.java    | 10 +++++-----
 .../java/org/apache/paimon/operation/FileStoreCommitImpl.java  |  2 +-
 .../main/java/org/apache/paimon/schema/SchemaValidation.java   |  3 ++-
 .../paimon/table/source/DataEvolutionSplitGenerator.java       |  4 ++--
 .../java/org/apache/paimon/utils/StatsCollectorFactories.java  |  7 ++++++-
 9 files changed, 24 insertions(+), 26 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java 
b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
index b5ead703f0..7f3fa010c4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
@@ -82,14 +82,4 @@ public final class BlobType extends DataType {
 
         return Pair.of(new RowType(normalFields), new RowType(blobFields));
     }
-
-    public static boolean containsBlobType(RowType rowType) {
-        for (DataField field : rowType.getFields()) {
-            DataTypeRoot type = field.type().getTypeRoot();
-            if (type == DataTypeRoot.BLOB) {
-                return true;
-            }
-        }
-        return false;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 4ad04b2610..4cfceefecc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -40,7 +40,6 @@ import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BatchRecordWriter;
 import org.apache.paimon.utils.CommitIncrement;
@@ -61,6 +60,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.paimon.types.DataTypeRoot.BLOB;
+
 /**
  * A {@link RecordWriter} implementation that only accepts records which are 
always insert
  * operations and don't have any unique keys or sort keys.
@@ -292,7 +293,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     }
 
     private RollingFileWriter<InternalRow, DataFileMeta> 
createRollingRowWriter() {
-        if (BlobType.containsBlobType(writeSchema)) {
+        if (writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB))) {
             return new RollingBlobFileWriter(
                     fileIO,
                     schemaId,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java 
b/paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java
similarity index 95%
rename from 
paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
rename to 
paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java
index 911eafd3b4..f5a5f7f2e2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/MergeAllBatchReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java
@@ -46,12 +46,12 @@ import java.io.IOException;
  * <p>This reader is commonly used in data evolution scenarios where multiple 
file formats or
  * schemas need to be read as a unified stream.
  */
-public class MergeAllBatchReader implements RecordReader<InternalRow> {
+public class ForceSingleBatchReader implements RecordReader<InternalRow> {
 
     private final RecordReader<InternalRow> multiBatchReader;
     private ConcatBatch batch;
 
-    public MergeAllBatchReader(RecordReader<InternalRow> multiBatchReader) {
+    public ForceSingleBatchReader(RecordReader<InternalRow> multiBatchReader) {
         this.multiBatchReader = multiBatchReader;
         this.batch = new ConcatBatch(multiBatchReader);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 68ca8b20a0..7f266d4554 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.blob.BlobFileFormatFactory;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.stats.SimpleStats;
@@ -256,8 +257,8 @@ public interface DataFileMeta {
 
     String fileName();
 
-    default boolean isBlob() {
-        return fileName().endsWith(".blob");
+    default boolean isBlobFile() {
+        return fileName().endsWith("." + BlobFileFormatFactory.IDENTIFIER);
     }
 
     long fileSize();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index dd2809da7e..ead9fa18e6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.append.MergeAllBatchReader;
+import org.apache.paimon.append.ForceSingleBatchReader;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
@@ -176,7 +176,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                         needMergeFiles,
                         file -> {
                             checkArgument(
-                                    file.isBlob(), "Only blob file need to 
call this method.");
+                                    file.isBlobFile(), "Only blob file need to 
call this method.");
                             return schemaFetcher
                                     .apply(file.schemaId())
                                     .logicalRowType()
@@ -254,7 +254,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                                 readFields,
                                                 false));
                 fileRecordReaders[i] =
-                        new MergeAllBatchReader(
+                        new ForceSingleBatchReader(
                                 createFileReader(
                                         partition, file, dataFilePathFactory, 
formatReaderMapping));
             }
@@ -356,7 +356,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         Map<Integer, BlobBunch> blobBunchMap = new HashMap<>();
         long rowCount = -1;
         for (DataFileMeta file : needMergeFiles) {
-            if (file.isBlob()) {
+            if (file.isBlobFile()) {
                 int fieldId = blobFileToFieldId.apply(file);
                 final long expectedRowCount = rowCount;
                 blobBunchMap
@@ -450,7 +450,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         }
 
         void add(DataFileMeta file) {
-            if (!file.isBlob()) {
+            if (!file.isBlobFile()) {
                 throw new IllegalArgumentException("Only blob file can be 
added to a blob bunch.");
             }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 870bd498c1..e311aa0b2e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1217,7 +1217,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     "This is a bug, file source field for row-tracking table 
must present.");
             if (entry.file().fileSource().get().equals(FileSource.APPEND)
                     && entry.file().firstRowId() == null) {
-                if (entry.file().isBlob()) {
+                if (entry.file().isBlobFile()) {
                     if (blobStart >= start) {
                         throw new IllegalStateException(
                                 String.format(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 0521351a44..905d8ad647 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -79,6 +79,7 @@ import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQ
 import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
 import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
 import static org.apache.paimon.types.DataTypeRoot.ARRAY;
+import static org.apache.paimon.types.DataTypeRoot.BLOB;
 import static org.apache.paimon.types.DataTypeRoot.MAP;
 import static org.apache.paimon.types.DataTypeRoot.MULTISET;
 import static org.apache.paimon.types.DataTypeRoot.ROW;
@@ -651,7 +652,7 @@ public class SchemaValidation {
                     "Data evolution config must disabled with 
deletion-vectors.enabled");
         }
 
-        if (BlobType.containsBlobType(schema.logicalRowType())) {
+        if (schema.fields().stream().map(DataField::type).anyMatch(t -> 
t.is(BLOB))) {
             checkArgument(
                     options.dataEvolutionEnabled(),
                     "Data evolution config must enabled for table with BLOB 
type column.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
index c2f115d32c..18ae25592a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
@@ -86,7 +86,7 @@ public class DataEvolutionSplitGenerator implements 
SplitGenerator {
                                                 value.firstRowId() == null
                                                         ? Long.MIN_VALUE
                                                         : value.firstRowId())
-                        .thenComparingInt(f -> f.isBlob() ? 1 : 0)
+                        .thenComparingInt(f -> f.isBlobFile() ? 1 : 0)
                         .thenComparing(
                                 (f1, f2) -> {
                                     // If firstRowId is the same, we should 
read the file with
@@ -106,7 +106,7 @@ public class DataEvolutionSplitGenerator implements 
SplitGenerator {
                 splitByRowId.add(Collections.singletonList(file));
                 continue;
             }
-            if (!file.isBlob() && firstRowId != lastRowId) {
+            if (!file.isBlobFile() && firstRowId != lastRowId) {
                 if (!currentSplit.isEmpty()) {
                     splitByRowId.add(currentSplit);
                 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
index 62bfd0c21b..6d57b7e7c4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
@@ -27,6 +27,8 @@ import org.apache.paimon.table.SpecialFields;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.STATS_MODE_SUFFIX;
@@ -36,13 +38,16 @@ import static org.apache.paimon.options.ConfigOptions.key;
 public class StatsCollectorFactories {
 
     private final CoreOptions options;
+    private final Map<List<String>, SimpleColStatsCollector.Factory[]> cache =
+            new ConcurrentHashMap<>();
 
     public StatsCollectorFactories(CoreOptions options) {
         this.options = options;
     }
 
     public SimpleColStatsCollector.Factory[] statsCollectors(List<String> 
fieldNames) {
-        return createStatsFactories(options.statsMode(), options, fieldNames);
+        return cache.computeIfAbsent(
+                fieldNames, k -> createStatsFactories(options.statsMode(), 
options, fieldNames));
     }
 
     public static SimpleColStatsCollector.Factory[] createStatsFactories(

Reply via email to