This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 97aa7f2bb [core] add support of stats mode (#1452)
97aa7f2bb is described below

commit 97aa7f2bb94baf6b385e7ffb607e8c01f297c7e3
Author: Aitozi <[email protected]>
AuthorDate: Tue Jul 4 10:47:10 2023 +0800

    [core] add support of stats mode (#1452)
---
 docs/content/how-to/creating-tables.md             |  13 ++
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../java/org/apache/paimon/format/FileFormat.java  |   4 +-
 .../apache/paimon/format/TableStatsCollector.java  |  33 +---
 .../statistics/AbstractFieldStatsCollector.java    |  38 +++++
 .../statistics/CountsFieldStatsCollector.java      |  40 +++++
 .../paimon/statistics/FieldStatsCollector.java     |  70 ++++++++
 .../paimon/statistics/FullFieldStatsCollector.java |  53 ++++++
 .../paimon/statistics/NoneFieldStatsCollector.java |  41 +++++
 .../statistics/TruncateFieldStatsCollector.java    | 127 ++++++++++++++
 ...TableFieldStatsExtractorTestBaseCollector.java} |  21 ++-
 .../paimon/statistics/FieldStatsCollectorTest.java | 188 +++++++++++++++++++++
 .../java/org/apache/paimon/AbstractFileStore.java  |   4 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  31 ++++
 .../org/apache/paimon/append/AppendOnlyWriter.java |   9 +-
 .../apache/paimon/io/KeyValueDataFileWriter.java   |   9 +-
 .../paimon/io/KeyValueFileWriterFactory.java       |  23 ++-
 .../org/apache/paimon/io/RowDataFileWriter.java    |   7 +-
 .../apache/paimon/io/RowDataRollingFileWriter.java |   9 +-
 .../paimon/io/StatsCollectingSingleFileWriter.java |  16 +-
 .../org/apache/paimon/manifest/ManifestFile.java   |  25 ++-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  10 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |   6 +-
 .../java/org/apache/paimon/utils/StatsUtils.java   |  60 +++++++
 .../apache/paimon/append/AppendOnlyWriterTest.java |   7 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |   6 +-
 .../format/FileStatsExtractingAvroFormat.java      |   6 +-
 .../paimon/io/DataFileTestDataGenerator.java       |  15 +-
 .../paimon/io/KeyValueFileReadWriteTest.java       |   3 +-
 .../apache/paimon/io/RollingFileWriterTest.java    |  21 ++-
 .../paimon/manifest/ManifestFileMetaTestBase.java  |   7 +-
 .../apache/paimon/manifest/ManifestFileTest.java   |   7 +-
 .../paimon/manifest/ManifestTestDataGenerator.java |   9 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |   4 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   6 +-
 .../paimon/stats/TableStatsCollectorTest.java      |  10 +-
 .../paimon/stats/TestTableStatsExtractor.java      |  13 +-
 .../paimon/utils/FieldStatsCollectorUtilsTest.java |  65 +++++++
 .../apache/paimon/format/orc/OrcFileFormat.java    |   6 +-
 .../format/orc/filter/OrcTableStatsExtractor.java  | 125 +++++++++-----
 .../paimon/format/parquet/ParquetFileFormat.java   |   6 +-
 .../format/parquet/ParquetTableStatsExtractor.java |  80 ++++++---
 .../format/orc/OrcTableStatsExtractorTest.java     |   4 +-
 .../parquet/ParquetTableStatsExtractorTest.java    |   4 +-
 44 files changed, 1093 insertions(+), 154 deletions(-)

diff --git a/docs/content/how-to/creating-tables.md 
b/docs/content/how-to/creating-tables.md
index 0f4916c6c..6121991a5 100644
--- a/docs/content/how-to/creating-tables.md
+++ b/docs/content/how-to/creating-tables.md
@@ -235,6 +235,19 @@ The following three types of fields may be defined as 
partition fields in the wa
   if you declare the primary key containing partition field, you can achieve 
the unique effect.
 - CDC op_ts: It cannot be defined as a partition field, unable to know 
previous record timestamp.
 
+### Specify the statistics collector mode
+
+Paimon will automatically collect the statistics of the data file for speeding 
up the query process. There are four modes supported:
+
+- `full`: collect the full metrics: `null_count, min, max`
+- `truncate(length)`: length can be any positive number, the default mode is 
`truncate(16)`, which means collect the null count, min/max value with 
truncated length of 16.
+  This is mainly to avoid too big column which will enlarge the manifest file.
+- `counts`: only collect the null count.
+- `none`: disable the metadata stats collection.
+
+The statistics collector mode can also be configured at the field level by 
setting [field.{field_name}.stats.mode]({{< ref 
"maintenance/configurations#coreoptions" >}}).
+
+
 ## Create Table As
 
 Table can be created and populated by the results of a query, for example, we 
have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index c85ad3039..5417737df 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -242,6 +242,12 @@ under the License.
             <td><p>Enum</p></td>
             <td>Specify the merge engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last 
row.</li><li>"partial-update": Partial update non-null 
fields.</li><li>"aggregation": Aggregate fields with same primary 
key.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>metadata.stats.mode</h5></td>
+            <td style="word-wrap: break-word;">"truncate(16)"</td>
+            <td>String</td>
+            <td>The mode of metadata stats collection. none, counts, 
truncate(16), full is available.<br /><ul><li>"none": means disable the 
metadata stats collection.</li></ul><ul><li>"counts" means only collect the 
null count.</li></ul><ul><li>"full": means collect the null count, min/max 
value.</li></ul><ul><li>"truncate(16)": means collect the null count, min/max 
value with truncated length of 16.</li></ul><ul><li>Field level stats mode can 
be specified by field.{field_name}.stats.mo [...]
+        </tr>
         <tr>
             <td><h5>num-levels</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index 015a80ec0..a6ba21c50 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
@@ -75,7 +76,8 @@ public abstract class FileFormat {
         return createReaderFactory(rowType, projection, new ArrayList<>());
     }
 
-    public Optional<TableStatsExtractor> createStatsExtractor(RowType type) {
+    public Optional<TableStatsExtractor> createStatsExtractor(
+            RowType type, FieldStatsCollector[] stats) {
         return Optional.empty();
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java 
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
index f85273be4..0db227389 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
@@ -21,24 +21,21 @@ package org.apache.paimon.format;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
 /** Collector to extract statistics of each fields from a series of records. */
 public class TableStatsCollector {
 
-    private final Object[] minValues;
-    private final Object[] maxValues;
-    private final long[] nullCounts;
     private final RowDataToObjectArrayConverter converter;
+    private final FieldStatsCollector[] stats;
     private final Serializer<Object>[] fieldSerializers;
 
-    public TableStatsCollector(RowType rowType) {
+    public TableStatsCollector(RowType rowType, FieldStatsCollector[] stats) {
         int numFields = rowType.getFieldCount();
-        this.minValues = new Object[numFields];
-        this.maxValues = new Object[numFields];
-        this.nullCounts = new long[numFields];
         this.converter = new RowDataToObjectArrayConverter(rowType);
+        this.stats = stats;
         this.fieldSerializers = new Serializer[numFields];
         for (int i = 0; i < numFields; i++) {
             fieldSerializers[i] = 
InternalSerializers.create(rowType.getTypeAt(i));
@@ -54,30 +51,16 @@ public class TableStatsCollector {
     public void collect(InternalRow row) {
         Object[] objects = converter.convert(row);
         for (int i = 0; i < row.getFieldCount(); i++) {
+            FieldStatsCollector collector = stats[i];
             Object obj = objects[i];
-            if (obj == null) {
-                nullCounts[i]++;
-                continue;
-            }
-
-            // TODO use comparator for not comparable types and extract this 
logic to a util class
-            if (!(obj instanceof Comparable)) {
-                continue;
-            }
-            Comparable<Object> c = (Comparable<Object>) obj;
-            if (minValues[i] == null || c.compareTo(minValues[i]) < 0) {
-                minValues[i] = fieldSerializers[i].copy(c);
-            }
-            if (maxValues[i] == null || c.compareTo(maxValues[i]) > 0) {
-                maxValues[i] = fieldSerializers[i].copy(c);
-            }
+            collector.collect(obj, fieldSerializers[i]);
         }
     }
 
     public FieldStats[] extract() {
-        FieldStats[] stats = new FieldStats[nullCounts.length];
+        FieldStats[] stats = new FieldStats[this.stats.length];
         for (int i = 0; i < stats.length; i++) {
-            stats[i] = new FieldStats(minValues[i], maxValues[i], 
nullCounts[i]);
+            stats[i] = this.stats[i].result();
         }
         return stats;
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/statistics/AbstractFieldStatsCollector.java
 
b/paimon-common/src/main/java/org/apache/paimon/statistics/AbstractFieldStatsCollector.java
new file mode 100644
index 000000000..b51efed6f
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/statistics/AbstractFieldStatsCollector.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.format.FieldStats;
+
+/** Abstract base stats collector. */
+public abstract class AbstractFieldStatsCollector implements 
FieldStatsCollector {
+
+    protected Object minValue;
+
+    protected Object maxValue;
+
+    protected long nullCount;
+
+    @Override
+    public FieldStats result() {
+        return new FieldStats(minValue, maxValue, nullCount);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/statistics/CountsFieldStatsCollector.java
 
b/paimon-common/src/main/java/org/apache/paimon/statistics/CountsFieldStatsCollector.java
new file mode 100644
index 000000000..1bab248b1
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/statistics/CountsFieldStatsCollector.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+
+/** The counts stats collector, which will only report null count stats. */
+public class CountsFieldStatsCollector extends AbstractFieldStatsCollector {
+
+    @Override
+    public void collect(Object field, Serializer<Object> serializer) {
+        if (field == null) {
+            nullCount++;
+        }
+    }
+
+    @Override
+    public FieldStats convert(FieldStats source) {
+        return new FieldStats(null, null, source.nullCount());
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
 
b/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
new file mode 100644
index 000000000..d77a2d266
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
@@ -0,0 +1,70 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+
+import java.util.regex.Matcher;
+
+import static 
org.apache.paimon.statistics.TruncateFieldStatsCollector.TRUNCATE_PATTERN;
+
+/** The mode of the field stats. */
+public interface FieldStatsCollector {
+
+    /**
+     * collect stats from the field.
+     *
+     * @param field The target field object.
+     * @param fieldSerializer The serializer of the field object.
+     */
+    void collect(Object field, Serializer<Object> fieldSerializer);
+
+    /** @return The collected field stats. */
+    FieldStats result();
+
+    /**
+     * Convert the field stats according to the strategy.
+     *
+     * @param source The source field stats, extracted from the file.
+     * @return The converted field stats.
+     */
+    FieldStats convert(FieldStats source);
+
+    static FieldStatsCollector from(String option) {
+        String upper = option.toUpperCase();
+        switch (upper) {
+            case "NONE":
+                return new NoneFieldStatsCollector();
+            case "FULL":
+                return new FullFieldStatsCollector();
+            case "COUNTS":
+                return new CountsFieldStatsCollector();
+            default:
+                Matcher matcher = TRUNCATE_PATTERN.matcher(upper);
+                if (matcher.matches()) {
+                    String length = matcher.group(1);
+                    return new 
TruncateFieldStatsCollector(Integer.parseInt(length));
+                }
+                throw new IllegalArgumentException("Unexpected option: " + 
option);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/statistics/FullFieldStatsCollector.java
 
b/paimon-common/src/main/java/org/apache/paimon/statistics/FullFieldStatsCollector.java
new file mode 100644
index 000000000..9db97a70d
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/statistics/FullFieldStatsCollector.java
@@ -0,0 +1,53 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+
+/** The full stats collector which will report null count, min value, max 
value if available. */
+public class FullFieldStatsCollector extends AbstractFieldStatsCollector {
+
+    @Override
+    public void collect(Object field, Serializer<Object> fieldSerializer) {
+        if (field == null) {
+            nullCount++;
+            return;
+        }
+
+        // TODO use comparator for not comparable types and extract this logic 
to a util class
+        if (!(field instanceof Comparable)) {
+            return;
+        }
+        Comparable<Object> c = (Comparable<Object>) field;
+        if (minValue == null || c.compareTo(minValue) < 0) {
+            minValue = fieldSerializer.copy(c);
+        }
+        if (maxValue == null || c.compareTo(maxValue) > 0) {
+            maxValue = fieldSerializer.copy(c);
+        }
+    }
+
+    @Override
+    public FieldStats convert(FieldStats source) {
+        return source;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/statistics/NoneFieldStatsCollector.java
 
b/paimon-common/src/main/java/org/apache/paimon/statistics/NoneFieldStatsCollector.java
new file mode 100644
index 000000000..23fc6ffd0
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/statistics/NoneFieldStatsCollector.java
@@ -0,0 +1,41 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+
+/** The none stats collector which report nothing. */
+public class NoneFieldStatsCollector extends AbstractFieldStatsCollector {
+
+    @Override
+    public void collect(Object field, Serializer<Object> fieldSerializer) {}
+
+    @Override
+    public FieldStats result() {
+        return new FieldStats(null, null, null);
+    }
+
+    @Override
+    public FieldStats convert(FieldStats source) {
+        return new FieldStats(null, null, null);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/statistics/TruncateFieldStatsCollector.java
 
b/paimon-common/src/main/java/org/apache/paimon/statistics/TruncateFieldStatsCollector.java
new file mode 100644
index 000000000..bae77bed6
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/statistics/TruncateFieldStatsCollector.java
@@ -0,0 +1,127 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.regex.Pattern;
+
+/**
+ * The truncate stats collector which will report null count, truncated 
min/max value. Currently,
+ * truncation only performs on the {@link BinaryString} value.
+ */
+public class TruncateFieldStatsCollector extends AbstractFieldStatsCollector {
+
+    public static final Pattern TRUNCATE_PATTERN = 
Pattern.compile("TRUNCATE\\((\\d+)\\)");
+
+    private final int length;
+
+    public TruncateFieldStatsCollector(int length) {
+        Preconditions.checkArgument(length > 0, "Truncate length should larger 
than zero.");
+        this.length = length;
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    @Override
+    public void collect(Object field, Serializer<Object> fieldSerializer) {
+        if (field == null) {
+            nullCount++;
+            return;
+        }
+
+        // TODO use comparator for not comparable types and extract this logic 
to a util class
+        if (!(field instanceof Comparable)) {
+            return;
+        }
+
+        Comparable<Object> c = (Comparable<Object>) field;
+        if (minValue == null || c.compareTo(minValue) < 0) {
+            minValue = truncateMin(field, fieldSerializer);
+        }
+        if (maxValue == null || c.compareTo(maxValue) > 0) {
+            maxValue = truncateMax(field, fieldSerializer);
+        }
+    }
+
+    @Override
+    public FieldStats convert(FieldStats source) {
+        return new FieldStats(
+                truncateMin(source.minValue(), null),
+                truncateMax(source.maxValue(), null),
+                source.nullCount());
+    }
+
+    /** @return a truncated value less or equal than the old value. */
+    private Object truncateMin(Object field, @Nullable Serializer<Object> 
fieldSerializer) {
+        if (field == null) {
+            return null;
+        }
+        if (field instanceof BinaryString) {
+            return ((BinaryString) field).substring(0, length);
+        } else {
+            return fieldSerializer == null ? field : 
fieldSerializer.copy(field);
+        }
+    }
+
+    /** @return a value greater or equal than the old value. */
+    private Object truncateMax(Object field, @Nullable Serializer<Object> 
fieldSerializer) {
+        if (field == null) {
+            return null;
+        }
+        if (field instanceof BinaryString) {
+            BinaryString original = ((BinaryString) field);
+            BinaryString truncated = original.substring(0, length);
+
+            // No need to increment if the input length is under the truncate 
length
+            if (original.getSizeInBytes() == truncated.getSizeInBytes()) {
+                return field;
+            }
+
+            StringBuilder truncatedStringBuilder = new 
StringBuilder(truncated.toString());
+
+            // Try incrementing the code points from the end
+            for (int i = length - 1; i >= 0; i--) {
+                // Get the offset in the truncated string buffer where the 
number of unicode
+                // characters = i
+                int offsetByCodePoint = 
truncatedStringBuilder.offsetByCodePoints(0, i);
+                int nextCodePoint = 
truncatedStringBuilder.codePointAt(offsetByCodePoint) + 1;
+                // No overflow
+                if (nextCodePoint != 0 && 
Character.isValidCodePoint(nextCodePoint)) {
+                    truncatedStringBuilder.setLength(offsetByCodePoint);
+                    // Append next code point to the truncated substring
+                    truncatedStringBuilder.appendCodePoint(nextCodePoint);
+                    return 
BinaryString.fromString(truncatedStringBuilder.toString());
+                }
+            }
+            return null; // Cannot find a valid upper bound
+        } else {
+            return fieldSerializer == null ? field : 
fieldSerializer.copy(field);
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/TableStatsExtractorTestBase.java
 
b/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
similarity index 92%
rename from 
paimon-common/src/test/java/org/apache/paimon/format/TableStatsExtractorTestBase.java
rename to 
paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
index 69bf6b86b..b3eda7c70 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/format/TableStatsExtractorTestBase.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BinaryType;
 import org.apache.paimon.types.CharType;
@@ -44,8 +45,9 @@ import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.types.VarCharType;
 
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.math.BigDecimal;
 import java.time.Instant;
@@ -55,21 +57,28 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
 
 import static org.apache.paimon.types.DataTypeChecks.getPrecision;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link TableStatsExtractor}. */
-public abstract class TableStatsExtractorTestBase {
+public abstract class TableFieldStatsExtractorTestBaseCollector {
 
     @TempDir java.nio.file.Path tempDir;
 
     private final FileIO fileIO = new LocalFileIO();
 
-    @Test
-    public void testExtract() throws Exception {
+    @ParameterizedTest
+    @ValueSource(strings = {"none", "counts", "full", "truncate(3)", 
"truncate(12)"})
+    public void testExtract(String mode) throws Exception {
         FileFormat format = createFormat();
         RowType rowType = rowType();
+        int count = rowType().getFieldCount();
+        FieldStatsCollector[] stats =
+                IntStream.range(0, count)
+                        .mapToObj(p -> FieldStatsCollector.from(mode))
+                        .toArray(FieldStatsCollector[]::new);
 
         FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
         Path path = new Path(tempDir.toString() + "/test");
@@ -82,13 +91,13 @@ public abstract class TableStatsExtractorTestBase {
         }
         writer.finish();
 
-        TableStatsCollector collector = new TableStatsCollector(rowType);
+        TableStatsCollector collector = new TableStatsCollector(rowType, 
stats);
         for (GenericRow row : data) {
             collector.collect(row);
         }
         FieldStats[] expected = collector.extract();
 
-        TableStatsExtractor extractor = 
format.createStatsExtractor(rowType).get();
+        TableStatsExtractor extractor = format.createStatsExtractor(rowType, 
stats).get();
         assertThat(extractor).isNotNull();
         FieldStats[] actual = extractor.extract(fileIO, path);
         for (int i = 0; i < expected.length; i++) {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/statistics/FieldStatsCollectorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/statistics/FieldStatsCollectorTest.java
new file mode 100644
index 000000000..1dec6f25d
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/statistics/FieldStatsCollectorTest.java
@@ -0,0 +1,188 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Test for {@link FieldStatsCollector}. */
+public class FieldStatsCollectorTest {
+
+    private Serializer<Object>[] serializers;
+    private static final String s1 = StringUtils.repeat("a", 12);
+    private static final String s1_t = "aa";
+    private static final String s2 = StringUtils.repeat("b", 12);
+    private static final String s3 = StringUtils.repeat("d", 13);
+    private static final String s3_t = "de";
+
+    @BeforeEach
+    public void before() {
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "a", new IntType(), 
"Someone's desc."),
+                                new DataField(1, "b", new VarCharType())));
+        serializers = new Serializer[2];
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            serializers[i] = InternalSerializers.create(rowType.getTypeAt(i));
+        }
+    }
+
+    @Test
+    public void testParse() {
+        Assertions.assertTrue(FieldStatsCollector.from("none") instanceof 
NoneFieldStatsCollector);
+        Assertions.assertTrue(FieldStatsCollector.from("Full") instanceof 
FullFieldStatsCollector);
+        Assertions.assertTrue(
+                FieldStatsCollector.from("CoUNts") instanceof 
CountsFieldStatsCollector);
+        TruncateFieldStatsCollector t1 =
+                (TruncateFieldStatsCollector) 
FieldStatsCollector.from("truncate(10)");
+        Assertions.assertEquals(10, t1.getLength());
+        assertThatThrownBy(() -> FieldStatsCollector.from("aatruncate(10)"))
+                .isInstanceOf(IllegalArgumentException.class);
+        assertThatThrownBy(() -> FieldStatsCollector.from("truncate(10.1)"))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    public void testNone() {
+        List<GenericRow> rows = getRows();
+        for (int i = 0; i < serializers.length; i++) {}
+        check(
+                rows,
+                0,
+                new FieldStats(null, null, null),
+                new FieldStats(1, 4, 0L),
+                new NoneFieldStatsCollector());
+        check(
+                rows,
+                1,
+                new FieldStats(null, null, null),
+                new FieldStats(s1, s3, 1L),
+                new NoneFieldStatsCollector());
+    }
+
+    @Test
+    public void testCounts() {
+        List<GenericRow> rows = getRows();
+        for (int i = 0; i < serializers.length; i++) {}
+        check(
+                rows,
+                0,
+                new FieldStats(null, null, 0L),
+                new FieldStats(1, 4, 0L),
+                new CountsFieldStatsCollector());
+        check(
+                rows,
+                1,
+                new FieldStats(null, null, 1L),
+                new FieldStats(s1, s3, 1L),
+                new CountsFieldStatsCollector());
+    }
+
+    @Test
+    public void testFull() {
+        List<GenericRow> rows = getRows();
+        for (int i = 0; i < serializers.length; i++) {}
+        check(
+                rows,
+                0,
+                new FieldStats(1, 4, 0L),
+                new FieldStats(1, 4, 0L),
+                new FullFieldStatsCollector());
+        check(
+                rows,
+                1,
+                new FieldStats(BinaryString.fromString(s1), 
BinaryString.fromString(s3), 1L),
+                new FieldStats(BinaryString.fromString(s1), 
BinaryString.fromString(s3), 1L),
+                new FullFieldStatsCollector());
+    }
+
+    @Test
+    public void testTruncate() {
+        List<GenericRow> rows = getRows();
+        for (int i = 0; i < serializers.length; i++) {}
+        check(
+                rows,
+                0,
+                new FieldStats(1, 4, 0L),
+                new FieldStats(1, 4, 0L),
+                new TruncateFieldStatsCollector(1));
+        check(
+                rows,
+                1,
+                new FieldStats(BinaryString.fromString(s1_t), 
BinaryString.fromString(s3_t), 1L),
+                new FieldStats(BinaryString.fromString(s1), 
BinaryString.fromString(s3), 1L),
+                new TruncateFieldStatsCollector(2));
+    }
+
+    @Test
+    public void testTruncateTwoChar() {
+        TruncateFieldStatsCollector t1 = new TruncateFieldStatsCollector(1);
+        FieldStats fieldStats =
+                new FieldStats(
+                        BinaryString.fromString("\uD83E\uDD18a"),
+                        BinaryString.fromString("\uD83E\uDD18b"),
+                        0L);
+        fieldStats = t1.convert(fieldStats);
+        Assertions.assertEquals(BinaryString.fromString("\uD83E\uDD18"), 
fieldStats.minValue());
+        Assertions.assertEquals(BinaryString.fromString("\uD83E\uDD19"), 
fieldStats.maxValue());
+    }
+
+    private void check(
+            List<GenericRow> rows,
+            int column,
+            FieldStats expected,
+            FieldStats formatExtracted,
+            FieldStatsCollector fieldStatsCollector) {
+        for (GenericRow row : rows) {
+            fieldStatsCollector.collect(row.getField(column), 
serializers[column]);
+        }
+        Assertions.assertEquals(expected, fieldStatsCollector.result());
+        Assertions.assertEquals(expected, 
fieldStatsCollector.convert(formatExtracted));
+    }
+
+    private List<GenericRow> getRows() {
+        List<GenericRow> rows = new ArrayList<>();
+        rows.add(GenericRow.of(1, BinaryString.fromString(s1)));
+        rows.add(GenericRow.of(2, BinaryString.fromString(s2)));
+        rows.add(GenericRow.of(3, null));
+        rows.add(GenericRow.of(4, BinaryString.fromString(s3)));
+        return rows;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 558daa899..883eb2611 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -38,6 +38,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsUtils;
 import org.apache.paimon.utils.TagManager;
 
 import javax.annotation.Nullable;
@@ -104,7 +105,8 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.manifestFormat(),
                 pathFactory(),
                 options.manifestTargetSize().getBytes(),
-                forWrite ? writeManifestCache : null);
+                forWrite ? writeManifestCache : null,
+                StatsUtils.getFieldsStatsMode(options, 
partitionType.getFieldNames()));
     }
 
     @VisibleForTesting
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index b35aae375..d0baf4935 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -694,6 +694,37 @@ public class CoreOptions implements Serializable {
                             "Read incremental changes between start snapshot 
(exclusive) and end snapshot, "
                                     + "for example, '5,10' means changes 
between snapshot 5 and snapshot 10.");
 
+    public static final String FIELD_STATS_MODE_PREFIX = "field";
+    public static final String FIELD_STATS_MODE_SUFFIX = "stats.mode";
+
+    public static final ConfigOption<String> STATS_MODE =
+            key("metadata.stats.mode")
+                    .stringType()
+                    .defaultValue("truncate(16)")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The mode of metadata stats 
collection. none, counts, truncate(16), full is available.")
+                                    .linebreak()
+                                    .list(
+                                            text(
+                                                    "\"none\": means disable 
the metadata stats collection."))
+                                    .list(text("\"counts\" means only collect 
the null count."))
+                                    .list(
+                                            text(
+                                                    "\"full\": means collect 
the null count, min/max value."))
+                                    .list(
+                                            text(
+                                                    "\"truncate(16)\": means 
collect the null count, min/max value with truncated length of 16."))
+                                    .list(
+                                            text(
+                                                    "Field level stats mode 
can be specified by "
+                                                            + 
FIELD_STATS_MODE_PREFIX
+                                                            + "."
+                                                            + "{field_name}."
+                                                            + 
FIELD_STATS_MODE_SUFFIX))
+                                    .build());
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index b40c038c5..98c05cda4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -28,6 +28,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.NewFilesIncrement;
 import org.apache.paimon.io.RowDataRollingFileWriter;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
@@ -64,6 +65,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
     private final String fileCompression;
 
     private RowDataRollingFileWriter writer;
+    private FieldStatsCollector[] stats;
 
     public AppendOnlyWriter(
             FileIO fileIO,
@@ -76,7 +78,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
             boolean forceCompact,
             DataFilePathFactory pathFactory,
             @Nullable CommitIncrement increment,
-            String fileCompression) {
+            String fileCompression,
+            FieldStatsCollector[] stats) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
@@ -98,6 +101,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
             compactBefore.addAll(increment.compactIncrement().compactBefore());
             compactAfter.addAll(increment.compactIncrement().compactAfter());
         }
+        this.stats = stats;
     }
 
     @Override
@@ -173,7 +177,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
                 writeSchema,
                 pathFactory,
                 seqNumCounter,
-                fileCompression);
+                fileCompression,
+                stats);
     }
 
     private void trySyncLatestCompaction(boolean blocking)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 4ee489f00..1f7b16422 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
@@ -30,6 +31,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.stats.BinaryTableStats;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StatsUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,7 +78,8 @@ public class KeyValueDataFileWriter
             @Nullable TableStatsExtractor tableStatsExtractor,
             long schemaId,
             int level,
-            String compression) {
+            String compression,
+            CoreOptions options) {
         super(
                 fileIO,
                 factory,
@@ -84,7 +87,9 @@ public class KeyValueDataFileWriter
                 converter,
                 KeyValue.schema(keyType, valueType),
                 tableStatsExtractor,
-                compression);
+                compression,
+                StatsUtils.getFieldsStatsMode(
+                        options, KeyValue.schema(keyType, 
valueType).getFieldNames()));
 
         this.keyType = keyType;
         this.valueType = valueType;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index f0a322c2d..87dc5d88f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueSerializer;
 import org.apache.paimon.annotation.VisibleForTesting;
@@ -29,6 +30,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsUtils;
 
 import javax.annotation.Nullable;
 
@@ -47,6 +49,7 @@ public class KeyValueFileWriterFactory {
     private final long suggestedFileSize;
     private final Map<Integer, String> levelCompressions;
     private final String fileCompression;
+    private final CoreOptions options;
 
     private KeyValueFileWriterFactory(
             FileIO fileIO,
@@ -58,7 +61,8 @@ public class KeyValueFileWriterFactory {
             DataFilePathFactory pathFactory,
             long suggestedFileSize,
             Map<Integer, String> levelCompressions,
-            String fileCompression) {
+            String fileCompression,
+            CoreOptions options) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.keyType = keyType;
@@ -69,6 +73,7 @@ public class KeyValueFileWriterFactory {
         this.suggestedFileSize = suggestedFileSize;
         this.levelCompressions = levelCompressions;
         this.fileCompression = fileCompression;
+        this.options = options;
     }
 
     public RowType keyType() {
@@ -118,7 +123,8 @@ public class KeyValueFileWriterFactory {
                 tableStatsExtractor,
                 schemaId,
                 level,
-                compression);
+                compression,
+                options);
     }
 
     public void deleteFile(String filename) {
@@ -169,7 +175,8 @@ public class KeyValueFileWriterFactory {
                 BinaryRow partition,
                 int bucket,
                 Map<Integer, String> levelCompressions,
-                String fileCompression) {
+                String fileCompression,
+                CoreOptions options) {
             RowType recordType = KeyValue.schema(keyType, valueType);
             return new KeyValueFileWriterFactory(
                     fileIO,
@@ -177,11 +184,17 @@ public class KeyValueFileWriterFactory {
                     keyType,
                     valueType,
                     fileFormat.createWriterFactory(recordType),
-                    fileFormat.createStatsExtractor(recordType).orElse(null),
+                    fileFormat
+                            .createStatsExtractor(
+                                    recordType,
+                                    StatsUtils.getFieldsStatsMode(
+                                            options, 
recordType.getFieldNames()))
+                            .orElse(null),
                     pathFactory.createDataFilePathFactory(partition, bucket),
                     suggestedFileSize,
                     levelCompressions,
-                    fileCompression);
+                    fileCompression,
+                    options);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 241f41ec8..8cbbe8ed8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.stats.BinaryTableStats;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.types.RowType;
@@ -52,7 +53,8 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
             @Nullable TableStatsExtractor tableStatsExtractor,
             long schemaId,
             LongCounter seqNumCounter,
-            String fileCompression) {
+            String fileCompression,
+            FieldStatsCollector[] stats) {
         super(
                 fileIO,
                 factory,
@@ -60,7 +62,8 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
                 Function.identity(),
                 writeSchema,
                 tableStatsExtractor,
-                fileCompression);
+                fileCompression,
+                stats);
         this.schemaId = schemaId;
         this.seqNumCounter = seqNumCounter;
         this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 7b17ed167..8b99b6389 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -22,6 +22,7 @@ package org.apache.paimon.io;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.LongCounter;
 
@@ -36,7 +37,8 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
             RowType writeSchema,
             DataFilePathFactory pathFactory,
             LongCounter seqNumCounter,
-            String fileCompression) {
+            String fileCompression,
+            FieldStatsCollector[] stats) {
         super(
                 () ->
                         new RowDataFileWriter(
@@ -44,10 +46,11 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
                                 fileFormat.createWriterFactory(writeSchema),
                                 pathFactory.newPath(),
                                 writeSchema,
-                                
fileFormat.createStatsExtractor(writeSchema).orElse(null),
+                                fileFormat.createStatsExtractor(writeSchema, 
stats).orElse(null),
                                 schemaId,
                                 seqNumCounter,
-                                fileCompression),
+                                fileCompression,
+                                stats),
                 targetFileSize);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index 03742b1d3..0fd5e5c1f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -26,12 +26,15 @@ import org.apache.paimon.format.TableStatsCollector;
 import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.NoneFieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.function.Function;
 
 /**
@@ -44,6 +47,7 @@ public abstract class StatsCollectingSingleFileWriter<T, R> 
extends SingleFileWr
 
     @Nullable private final TableStatsExtractor tableStatsExtractor;
     @Nullable private TableStatsCollector tableStatsCollector = null;
+    private final boolean isStatsCollectorDisabled;
 
     public StatsCollectingSingleFileWriter(
             FileIO fileIO,
@@ -52,18 +56,24 @@ public abstract class StatsCollectingSingleFileWriter<T, R> 
extends SingleFileWr
             Function<T, InternalRow> converter,
             RowType writeSchema,
             @Nullable TableStatsExtractor tableStatsExtractor,
-            String compression) {
+            String compression,
+            FieldStatsCollector[] stats) {
         super(fileIO, factory, path, converter, compression);
         this.tableStatsExtractor = tableStatsExtractor;
+        this.isStatsCollectorDisabled =
+                Arrays.stream(stats).allMatch(p -> p instanceof 
NoneFieldStatsCollector);
         if (this.tableStatsExtractor == null) {
-            this.tableStatsCollector = new TableStatsCollector(writeSchema);
+            this.tableStatsCollector = new TableStatsCollector(writeSchema, 
stats);
         }
+        Preconditions.checkArgument(
+                stats.length == writeSchema.getFieldCount(),
+                "The stats collector is not aligned to write schema.");
     }
 
     @Override
     public void write(T record) throws IOException {
         InternalRow rowData = writeImpl(record);
-        if (tableStatsCollector != null) {
+        if (tableStatsCollector != null && !isStatsCollectorDisabled) {
             tableStatsCollector.collect(rowData);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index e5da04c94..562077920 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.io.SingleFileWriter;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -52,6 +53,7 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> {
     private final RowType partitionType;
     private final FormatWriterFactory writerFactory;
     private final long suggestedFileSize;
+    private final FieldStatsCollector[] partitionStats;
 
     private ManifestFile(
             FileIO fileIO,
@@ -62,12 +64,14 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
             FormatWriterFactory writerFactory,
             PathFactory pathFactory,
             long suggestedFileSize,
-            @Nullable SegmentsCache<String> cache) {
+            @Nullable SegmentsCache<String> cache,
+            FieldStatsCollector[] partitionStats) {
         super(fileIO, serializer, readerFactory, writerFactory, pathFactory, 
cache);
         this.schemaManager = schemaManager;
         this.partitionType = partitionType;
         this.writerFactory = writerFactory;
         this.suggestedFileSize = suggestedFileSize;
+        this.partitionStats = partitionStats;
     }
 
     @VisibleForTesting
@@ -87,7 +91,8 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> {
                                 new ManifestEntryWriter(
                                         writerFactory,
                                         pathFactory.newPath(),
-                                        
CoreOptions.FILE_COMPRESSION.defaultValue()),
+                                        
CoreOptions.FILE_COMPRESSION.defaultValue(),
+                                        partitionStats),
                         suggestedFileSize);
         try {
             writer.write(entries);
@@ -107,10 +112,14 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private long numDeletedFiles = 0;
         private long schemaId = Long.MIN_VALUE;
 
-        ManifestEntryWriter(FormatWriterFactory factory, Path path, String 
fileCompression) {
+        ManifestEntryWriter(
+                FormatWriterFactory factory,
+                Path path,
+                String fileCompression,
+                FieldStatsCollector[] partitionStats) {
             super(ManifestFile.this.fileIO, factory, path, serializer::toRow, 
fileCompression);
 
-            this.partitionStatsCollector = new 
TableStatsCollector(partitionType);
+            this.partitionStatsCollector = new 
TableStatsCollector(partitionType, partitionStats);
             this.partitionStatsSerializer = new 
FieldStatsArraySerializer(partitionType);
         }
 
@@ -157,6 +166,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private final FileStorePathFactory pathFactory;
         private final long suggestedFileSize;
         @Nullable private final SegmentsCache<String> cache;
+        private final FieldStatsCollector[] partitionStats;
 
         public Factory(
                 FileIO fileIO,
@@ -165,7 +175,8 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                 FileFormat fileFormat,
                 FileStorePathFactory pathFactory,
                 long suggestedFileSize,
-                @Nullable SegmentsCache<String> cache) {
+                @Nullable SegmentsCache<String> cache,
+                FieldStatsCollector[] partitionStats) {
             this.fileIO = fileIO;
             this.schemaManager = schemaManager;
             this.partitionType = partitionType;
@@ -173,6 +184,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
             this.pathFactory = pathFactory;
             this.suggestedFileSize = suggestedFileSize;
             this.cache = cache;
+            this.partitionStats = partitionStats;
         }
 
         public ManifestFile create() {
@@ -186,7 +198,8 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     fileFormat.createWriterFactory(entryType),
                     pathFactory.manifestFileFactory(),
                     suggestedFileSize,
-                    cache);
+                    cache,
+                    partitionStats);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 15727b38d..8efd1fd6d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -32,6 +32,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.RowDataRollingFileWriter;
 import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.RowType;
@@ -40,6 +41,7 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.LongCounter;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsUtils;
 
 import javax.annotation.Nullable;
 
@@ -67,6 +69,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
     private final String fileCompression;
     private boolean skipCompaction;
     private BucketMode bucketMode = BucketMode.FIXED;
+    private FieldStatsCollector[] stats;
 
     public AppendOnlyFileStoreWrite(
             FileIO fileIO,
@@ -92,6 +95,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
         this.skipCompaction = options.writeOnly();
         this.assertDisorder = 
options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER);
         this.fileCompression = options.fileCompression();
+        this.stats = StatsUtils.getFieldsStatsMode(options, 
rowType.getFieldNames());
     }
 
     @Override
@@ -128,7 +132,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
                 commitForceCompact,
                 factory,
                 restoreIncrement,
-                fileCompression);
+                fileCompression,
+                stats);
     }
 
     public AppendOnlyCompactManager.CompactRewriter compactRewriter(
@@ -146,7 +151,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
                             rowType,
                             pathFactory.createDataFilePathFactory(partition, 
bucket),
                             new 
LongCounter(toCompact.get(0).minSequenceNumber()),
-                            fileCompression);
+                            fileCompression,
+                            stats);
             rewriter.write(
                     new RecordReaderIterator<>(
                             read.createReader(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 01cf7b526..4c5d6926c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -147,7 +147,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         partition,
                         bucket,
                         options.fileCompressionPerLevel(),
-                        options.fileCompression());
+                        options.fileCompression(),
+                        options);
         Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
         Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
         UniversalCompaction universalCompaction =
@@ -210,7 +211,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         partition,
                         bucket,
                         options.fileCompressionPerLevel(),
-                        options.fileCompression());
+                        options.fileCompression(),
+                        options);
         MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, 
ioManager);
         switch (options.changelogProducer()) {
             case FULL_COMPACTION:
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsUtils.java
new file mode 100644
index 000000000..35141c6b7
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsUtils.java
@@ -0,0 +1,60 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.FieldStatsCollector;
+
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.FIELD_STATS_MODE_PREFIX;
+import static org.apache.paimon.CoreOptions.FIELD_STATS_MODE_SUFFIX;
+import static org.apache.paimon.options.ConfigOptions.key;
+
+/** The stats utils. */
+public class StatsUtils {
+
+    public static FieldStatsCollector[] getFieldsStatsMode(
+            CoreOptions options, List<String> fields) {
+        Options cfg = options.toConfiguration();
+        FieldStatsCollector defaultMode = 
FieldStatsCollector.from(cfg.get(CoreOptions.STATS_MODE));
+        FieldStatsCollector[] modes = new FieldStatsCollector[fields.size()];
+        for (int i = 0; i < fields.size(); i++) {
+
+            String fieldMode =
+                    cfg.get(
+                            key(String.format(
+                                            "%s.%s.%s",
+                                            FIELD_STATS_MODE_PREFIX,
+                                            fields.get(i),
+                                            FIELD_STATS_MODE_SUFFIX))
+                                    .stringType()
+                                    .noDefaultValue());
+            if (fieldMode != null) {
+                modes[i] = FieldStatsCollector.from(fieldMode);
+            } else {
+                modes[i] = defaultMode;
+            }
+        }
+        return modes;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 9c2913c7e..92a8cb531 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -40,12 +40,14 @@ import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.ExecutorThreadFactory;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.StatsUtils;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
@@ -317,7 +319,10 @@ public class AppendOnlyWriterTest {
                         forceCompact,
                         pathFactory,
                         null,
-                        CoreOptions.FILE_COMPRESSION.defaultValue());
+                        CoreOptions.FILE_COMPRESSION.defaultValue(),
+                        StatsUtils.getFieldsStatsMode(
+                                new CoreOptions(new HashMap<>()),
+                                AppendOnlyWriterTest.SCHEMA.getFieldNames()));
         return Pair.of(writer, compactManager.allFiles());
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 40e814844..13caec6b7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -35,11 +35,13 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.StatsUtils;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.util.HashMap;
 import java.util.LinkedList;
 
 /** test file format suffix. */
@@ -74,7 +76,9 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         false,
                         dataFilePathFactory,
                         null,
-                        CoreOptions.FILE_COMPRESSION.defaultValue());
+                        CoreOptions.FILE_COMPRESSION.defaultValue(),
+                        StatsUtils.getFieldsStatsMode(
+                                new CoreOptions(new HashMap<>()), 
SCHEMA.getFieldNames()));
         appendOnlyWriter.write(
                 GenericRow.of(1, BinaryString.fromString("aaa"), 
BinaryString.fromString("1")));
         CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
 
b/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
index 82ce6e2fd..218cd141e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format;
 
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.stats.TestTableStatsExtractor;
 import org.apache.paimon.types.RowType;
 
@@ -55,7 +56,8 @@ public class FileStatsExtractingAvroFormat extends FileFormat 
{
     }
 
     @Override
-    public Optional<TableStatsExtractor> createStatsExtractor(RowType type) {
-        return Optional.of(new TestTableStatsExtractor(this, type));
+    public Optional<TableStatsExtractor> createStatsExtractor(
+            RowType type, FieldStatsCollector[] stats) {
+        return Optional.of(new TestTableStatsExtractor(this, type, stats));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index cbdcbf4ec..c767f1d7b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -22,6 +22,8 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.TestKeyValueGenerator;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.format.TableStatsCollector;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 
 import java.util.ArrayList;
@@ -29,6 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.IntStream;
 
 /** Random {@link DataFileMeta} generator. */
 public class DataFileTestDataGenerator {
@@ -100,9 +103,17 @@ public class DataFileTestDataGenerator {
 
     private Data createDataFile(List<KeyValue> kvs, int level, BinaryRow 
partition, int bucket) {
         TableStatsCollector keyStatsCollector =
-                new TableStatsCollector(TestKeyValueGenerator.KEY_TYPE);
+                new TableStatsCollector(
+                        TestKeyValueGenerator.KEY_TYPE,
+                        IntStream.range(0, 
TestKeyValueGenerator.KEY_TYPE.getFieldCount())
+                                .mapToObj(i -> new FullFieldStatsCollector())
+                                .toArray(FieldStatsCollector[]::new));
         TableStatsCollector valueStatsCollector =
-                new 
TableStatsCollector(TestKeyValueGenerator.DEFAULT_ROW_TYPE);
+                new TableStatsCollector(
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        IntStream.range(0, 
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFieldCount())
+                                .mapToObj(i -> new FullFieldStatsCollector())
+                                .toArray(FieldStatsCollector[]::new));
         FieldStatsArraySerializer keyStatsSerializer =
                 new FieldStatsArraySerializer(TestKeyValueGenerator.KEY_TYPE);
         FieldStatsArraySerializer valueStatsSerializer =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 2a236c966..5ec574605 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.stats.StatsTestUtils;
@@ -255,7 +256,7 @@ public class KeyValueFileReadWriteTest {
                         new FlushingFileFormat(format),
                         pathFactory,
                         suggestedFileSize)
-                .build(BinaryRow.EMPTY_ROW, 0, null, null);
+                .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(new 
Options()));
     }
 
     private KeyValueFileReaderFactory createReaderFactory(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index df183a535..7b273b693 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -25,10 +25,13 @@ import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.StatsUtils;
 
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.junit.jupiter.api.io.TempDir;
@@ -37,6 +40,8 @@ import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.FileFormatType;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -74,10 +79,22 @@ public class RollingFileWriterTest {
                                                                 .toString())
                                                 .newPath(),
                                         SCHEMA,
-                                        
fileFormat.createStatsExtractor(SCHEMA).orElse(null),
+                                        fileFormat
+                                                .createStatsExtractor(
+                                                        SCHEMA,
+                                                        IntStream.range(0, 
SCHEMA.getFieldCount())
+                                                                .mapToObj(
+                                                                        i ->
+                                                                               
 new FullFieldStatsCollector())
+                                                                .toArray(
+                                                                        
FieldStatsCollector[]::new))
+                                                .orElse(null),
                                         0L,
                                         new LongCounter(0),
-                                        
CoreOptions.FILE_COMPRESSION.defaultValue()),
+                                        
CoreOptions.FILE_COMPRESSION.defaultValue(),
+                                        StatsUtils.getFieldsStatsMode(
+                                                new CoreOptions(new 
HashMap<>()),
+                                                SCHEMA.getFieldNames())),
                         TARGET_FILE_SIZE);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index e66a6eccb..a4ea867fa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -32,10 +32,12 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -134,7 +136,10 @@ public abstract class ManifestFileMetaTestBase {
                                 "default",
                                 
CoreOptions.FILE_FORMAT.defaultValue().toString()),
                         Long.MAX_VALUE,
-                        null)
+                        null,
+                        StatsUtils.getFieldsStatsMode(
+                                new CoreOptions(new HashMap<>()),
+                                getPartitionType().getFieldNames()))
                 .create();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 5be026b7c..a664fb2d3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -29,12 +29,14 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsUtils;
 
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -108,7 +110,10 @@ public class ManifestFileTest {
                         avro,
                         pathFactory,
                         suggestedFileSize,
-                        null)
+                        null,
+                        StatsUtils.getFieldsStatsMode(
+                                new CoreOptions(new HashMap<>()),
+                                DEFAULT_PART_TYPE.getFieldNames()))
                 .create();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 81098d767..c09d4e64c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -23,6 +23,8 @@ import org.apache.paimon.TestKeyValueGenerator;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.format.TableStatsCollector;
 import org.apache.paimon.io.DataFileTestDataGenerator;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.utils.Preconditions;
 
@@ -32,6 +34,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.IntStream;
 
 /** Random {@link ManifestEntry} generator. */
 public class ManifestTestDataGenerator {
@@ -83,7 +86,11 @@ public class ManifestTestDataGenerator {
                 !entries.isEmpty(), "Manifest entries are empty. Invalid test 
data.");
 
         TableStatsCollector collector =
-                new 
TableStatsCollector(TestKeyValueGenerator.DEFAULT_PART_TYPE);
+                new TableStatsCollector(
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                        IntStream.range(0, 
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFieldCount())
+                                .mapToObj(i -> new FullFieldStatsCollector())
+                                .toArray(FieldStatsCollector[]::new));
         FieldStatsArraySerializer serializer =
                 new 
FieldStatsArraySerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index e4b074a51..114ff92a3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
@@ -32,6 +33,7 @@ import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -234,7 +236,7 @@ public class LookupLevelsTest {
                         new FlushingFileFormat("avro"),
                         new FileStorePathFactory(path),
                         TARGET_FILE_SIZE.defaultValue().getBytes())
-                .build(BinaryRow.EMPTY_ROW, 0, null, null);
+                .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(new 
Options()));
     }
 
     private KeyValueFileReaderFactory createReaderFactory() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index fb4533a27..fd36afa8a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -187,13 +187,15 @@ public abstract class MergeTreeTestBase {
                         BinaryRow.EMPTY_ROW,
                         0,
                         options.fileCompressionPerLevel(),
-                        options.fileCompression());
+                        options.fileCompression(),
+                        options);
         compactWriterFactory =
                 writerFactoryBuilder.build(
                         BinaryRow.EMPTY_ROW,
                         0,
                         options.fileCompressionPerLevel(),
-                        options.fileCompression());
+                        options.fileCompression(),
+                        options);
         writer = createMergeTreeWriter(Collections.emptyList());
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
index 57f137d01..3701ce166 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.format.FieldStats;
 import org.apache.paimon.format.TableStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
@@ -30,6 +31,8 @@ import org.apache.paimon.types.VarCharType;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.stream.IntStream;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link TableStatsCollector}. */
@@ -39,7 +42,12 @@ public class TableStatsCollectorTest {
     public void testCollect() {
         RowType rowType =
                 RowType.of(new IntType(), new VarCharType(10), new 
ArrayType(new IntType()));
-        TableStatsCollector collector = new TableStatsCollector(rowType);
+        TableStatsCollector collector =
+                new TableStatsCollector(
+                        rowType,
+                        IntStream.range(0, rowType.getFieldCount())
+                                .mapToObj(i -> new FullFieldStatsCollector())
+                                .toArray(FullFieldStatsCollector[]::new));
 
         collector.collect(
                 GenericRow.of(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
 
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
index 2146c1bca..9314ac500 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
@@ -26,9 +26,11 @@ import org.apache.paimon.format.TableStatsCollector;
 import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.Preconditions;
 
 import java.io.IOException;
 import java.util.List;
@@ -41,10 +43,16 @@ public class TestTableStatsExtractor implements 
TableStatsExtractor {
 
     private final FileFormat format;
     private final RowType rowType;
+    private final FieldStatsCollector[] stats;
 
-    public TestTableStatsExtractor(FileFormat format, RowType rowType) {
+    public TestTableStatsExtractor(
+            FileFormat format, RowType rowType, FieldStatsCollector[] stats) {
         this.format = format;
         this.rowType = rowType;
+        this.stats = stats;
+        Preconditions.checkArgument(
+                rowType.getFieldCount() == stats.length,
+                "The stats collector is not aligned to write schema.");
     }
 
     @Override
@@ -53,7 +61,8 @@ public class TestTableStatsExtractor implements 
TableStatsExtractor {
         FormatReaderFactory readerFactory = 
format.createReaderFactory(rowType);
         List<InternalRow> records =
                 FileUtils.readListFromFile(fileIO, path, serializer, 
readerFactory);
-        TableStatsCollector statsCollector = new TableStatsCollector(rowType);
+
+        TableStatsCollector statsCollector = new TableStatsCollector(rowType, 
stats);
         for (InternalRow record : records) {
             statsCollector.collect(record);
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
new file mode 100644
index 000000000..00b4ec0c3
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
+import org.apache.paimon.statistics.TruncateFieldStatsCollector;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+/** Test for {@link StatsUtils}. */
+public class FieldStatsCollectorUtilsTest {
+    @Test
+    public void testFieldStats() {
+        RowType type =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "a", new VarCharType(), 
"Someone's desc."),
+                                new DataField(1, "b", new TimestampType()),
+                                new DataField(2, "c", new CharType())));
+
+        Options options = new Options();
+        options.set(
+                CoreOptions.FIELD_STATS_MODE_PREFIX + ".b." + 
CoreOptions.FIELD_STATS_MODE_SUFFIX,
+                "truncate(12)");
+        options.set(
+                CoreOptions.FIELD_STATS_MODE_PREFIX + ".c." + 
CoreOptions.FIELD_STATS_MODE_SUFFIX,
+                "full");
+
+        FieldStatsCollector[] stats =
+                StatsUtils.getFieldsStatsMode(new CoreOptions(options), 
type.getFieldNames());
+        Assertions.assertEquals(3, stats.length);
+        Assertions.assertEquals(16, ((TruncateFieldStatsCollector) 
stats[0]).getLength());
+        Assertions.assertEquals(12, ((TruncateFieldStatsCollector) 
stats[1]).getLength());
+        Assertions.assertTrue(stats[2] instanceof FullFieldStatsCollector);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 5677973c6..b5a4d1db9 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -33,6 +33,7 @@ import org.apache.paimon.format.orc.writer.RowDataVectorizer;
 import org.apache.paimon.format.orc.writer.Vectorizer;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -86,8 +87,9 @@ public class OrcFileFormat extends FileFormat {
     }
 
     @Override
-    public Optional<TableStatsExtractor> createStatsExtractor(RowType type) {
-        return Optional.of(new OrcTableStatsExtractor(type));
+    public Optional<TableStatsExtractor> createStatsExtractor(
+            RowType type, FieldStatsCollector[] stats) {
+        return Optional.of(new OrcTableStatsExtractor(type, stats));
     }
 
     @Override
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
index 800024547..411b52e63 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
@@ -26,6 +26,7 @@ import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.format.orc.OrcReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.RowType;
@@ -53,9 +54,14 @@ import java.util.stream.IntStream;
 public class OrcTableStatsExtractor implements TableStatsExtractor {
 
     private final RowType rowType;
+    private final FieldStatsCollector[] stats;
 
-    public OrcTableStatsExtractor(RowType rowType) {
+    public OrcTableStatsExtractor(RowType rowType, FieldStatsCollector[] 
stats) {
         this.rowType = rowType;
+        this.stats = stats;
+        Preconditions.checkArgument(
+                rowType.getFieldCount() == stats.length,
+                "The stats collector is not aligned to write schema.");
     }
 
     @Override
@@ -74,17 +80,18 @@ public class OrcTableStatsExtractor implements 
TableStatsExtractor {
                                 DataField field = rowType.getFields().get(i);
                                 int fieldIdx = 
columnNames.indexOf(field.name());
                                 int colId = columnTypes.get(fieldIdx).getId();
-                                return toFieldStats(field, 
columnStatistics[colId], rowCount);
+                                return toFieldStats(field, 
columnStatistics[colId], rowCount, i);
                             })
                     .toArray(FieldStats[]::new);
         }
     }
 
-    private FieldStats toFieldStats(DataField field, ColumnStatistics stats, 
long rowCount) {
+    private FieldStats toFieldStats(
+            DataField field, ColumnStatistics stats, long rowCount, int idx) {
         long nullCount = rowCount - stats.getNumberOfValues();
         if (nullCount == rowCount) {
             // all nulls
-            return new FieldStats(null, null, nullCount);
+            return this.stats[idx].convert(new FieldStats(null, null, 
nullCount));
         }
         Preconditions.checkState(
                 (nullCount > 0) == stats.hasNull(),
@@ -94,86 +101,120 @@ public class OrcTableStatsExtractor implements 
TableStatsExtractor {
                         + stats.hasNull()
                         + "!");
 
+        FieldStats fieldStats;
         switch (field.type().getTypeRoot()) {
             case CHAR:
             case VARCHAR:
                 assertStatsClass(field, stats, StringColumnStatistics.class);
                 StringColumnStatistics stringStats = (StringColumnStatistics) 
stats;
-                return new FieldStats(
-                        BinaryString.fromString(stringStats.getMinimum()),
-                        BinaryString.fromString(stringStats.getMaximum()),
-                        nullCount);
+                fieldStats =
+                        new FieldStats(
+                                
BinaryString.fromString(stringStats.getMinimum()),
+                                
BinaryString.fromString(stringStats.getMaximum()),
+                                nullCount);
+                break;
             case BOOLEAN:
                 assertStatsClass(field, stats, BooleanColumnStatistics.class);
                 BooleanColumnStatistics boolStats = (BooleanColumnStatistics) 
stats;
-                return new FieldStats(
-                        boolStats.getFalseCount() == 0, 
boolStats.getTrueCount() != 0, nullCount);
+                fieldStats =
+                        new FieldStats(
+                                boolStats.getFalseCount() == 0,
+                                boolStats.getTrueCount() != 0,
+                                nullCount);
+                break;
             case DECIMAL:
                 assertStatsClass(field, stats, DecimalColumnStatistics.class);
                 DecimalColumnStatistics decimalStats = 
(DecimalColumnStatistics) stats;
                 DecimalType decimalType = (DecimalType) (field.type());
                 int precision = decimalType.getPrecision();
                 int scale = decimalType.getScale();
-                return new FieldStats(
-                        Decimal.fromBigDecimal(
-                                decimalStats.getMinimum().bigDecimalValue(), 
precision, scale),
-                        Decimal.fromBigDecimal(
-                                decimalStats.getMaximum().bigDecimalValue(), 
precision, scale),
-                        nullCount);
+                fieldStats =
+                        new FieldStats(
+                                Decimal.fromBigDecimal(
+                                        
decimalStats.getMinimum().bigDecimalValue(),
+                                        precision,
+                                        scale),
+                                Decimal.fromBigDecimal(
+                                        
decimalStats.getMaximum().bigDecimalValue(),
+                                        precision,
+                                        scale),
+                                nullCount);
+                break;
             case TINYINT:
                 assertStatsClass(field, stats, IntegerColumnStatistics.class);
                 IntegerColumnStatistics byteStats = (IntegerColumnStatistics) 
stats;
-                return new FieldStats(
-                        (byte) byteStats.getMinimum(), (byte) 
byteStats.getMaximum(), nullCount);
+                fieldStats =
+                        new FieldStats(
+                                (byte) byteStats.getMinimum(),
+                                (byte) byteStats.getMaximum(),
+                                nullCount);
+                break;
             case SMALLINT:
                 assertStatsClass(field, stats, IntegerColumnStatistics.class);
                 IntegerColumnStatistics shortStats = (IntegerColumnStatistics) 
stats;
-                return new FieldStats(
-                        (short) shortStats.getMinimum(),
-                        (short) shortStats.getMaximum(),
-                        nullCount);
+                fieldStats =
+                        new FieldStats(
+                                (short) shortStats.getMinimum(),
+                                (short) shortStats.getMaximum(),
+                                nullCount);
+                break;
             case INTEGER:
             case TIME_WITHOUT_TIME_ZONE:
                 assertStatsClass(field, stats, IntegerColumnStatistics.class);
                 IntegerColumnStatistics intStats = (IntegerColumnStatistics) 
stats;
-                return new FieldStats(
-                        Long.valueOf(intStats.getMinimum()).intValue(),
-                        Long.valueOf(intStats.getMaximum()).intValue(),
-                        nullCount);
+                fieldStats =
+                        new FieldStats(
+                                Long.valueOf(intStats.getMinimum()).intValue(),
+                                Long.valueOf(intStats.getMaximum()).intValue(),
+                                nullCount);
+                break;
             case BIGINT:
                 assertStatsClass(field, stats, IntegerColumnStatistics.class);
                 IntegerColumnStatistics longStats = (IntegerColumnStatistics) 
stats;
-                return new FieldStats(longStats.getMinimum(), 
longStats.getMaximum(), nullCount);
+                fieldStats =
+                        new FieldStats(longStats.getMinimum(), 
longStats.getMaximum(), nullCount);
+                break;
             case FLOAT:
                 assertStatsClass(field, stats, DoubleColumnStatistics.class);
                 DoubleColumnStatistics floatStats = (DoubleColumnStatistics) 
stats;
-                return new FieldStats(
-                        (float) floatStats.getMinimum(),
-                        (float) floatStats.getMaximum(),
-                        nullCount);
+                fieldStats =
+                        new FieldStats(
+                                (float) floatStats.getMinimum(),
+                                (float) floatStats.getMaximum(),
+                                nullCount);
+                break;
             case DOUBLE:
                 assertStatsClass(field, stats, DoubleColumnStatistics.class);
                 DoubleColumnStatistics doubleStats = (DoubleColumnStatistics) 
stats;
-                return new FieldStats(
-                        doubleStats.getMinimum(), doubleStats.getMaximum(), 
nullCount);
+                fieldStats =
+                        new FieldStats(
+                                doubleStats.getMinimum(), 
doubleStats.getMaximum(), nullCount);
+                break;
             case DATE:
                 assertStatsClass(field, stats, DateColumnStatistics.class);
                 DateColumnStatistics dateStats = (DateColumnStatistics) stats;
-                return new FieldStats(
-                        DateTimeUtils.toInternal(new 
Date(dateStats.getMinimum().getTime())),
-                        DateTimeUtils.toInternal(new 
Date(dateStats.getMaximum().getTime())),
-                        nullCount);
+                fieldStats =
+                        new FieldStats(
+                                DateTimeUtils.toInternal(
+                                        new 
Date(dateStats.getMinimum().getTime())),
+                                DateTimeUtils.toInternal(
+                                        new 
Date(dateStats.getMaximum().getTime())),
+                                nullCount);
+                break;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 assertStatsClass(field, stats, 
TimestampColumnStatistics.class);
                 TimestampColumnStatistics timestampStats = 
(TimestampColumnStatistics) stats;
-                return new FieldStats(
-                        
Timestamp.fromSQLTimestamp(timestampStats.getMinimum()),
-                        
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
-                        nullCount);
+                fieldStats =
+                        new FieldStats(
+                                
Timestamp.fromSQLTimestamp(timestampStats.getMinimum()),
+                                
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
+                                nullCount);
+                break;
             default:
-                return new FieldStats(null, null, nullCount);
+                fieldStats = new FieldStats(null, null, nullCount);
         }
+        return this.stats[idx].convert(fieldStats);
     }
 
     private void assertStatsClass(
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index dea46559b..1bb0bc2fd 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -27,6 +27,7 @@ import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Projection;
 
@@ -72,8 +73,9 @@ public class ParquetFileFormat extends FileFormat {
     }
 
     @Override
-    public Optional<TableStatsExtractor> createStatsExtractor(RowType type) {
-        return Optional.of(new ParquetTableStatsExtractor(type));
+    public Optional<TableStatsExtractor> createStatsExtractor(
+            RowType type, FieldStatsCollector[] stats) {
+        return Optional.of(new ParquetTableStatsExtractor(type, stats));
     }
 
     public static Options getParquetConfiguration(Options options) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
index 115d9879c..587b3dfc2 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
@@ -25,11 +25,13 @@ import org.apache.paimon.format.FieldStats;
 import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Preconditions;
 
 import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.BooleanStatistics;
@@ -58,9 +60,14 @@ public class ParquetTableStatsExtractor implements 
TableStatsExtractor {
     private final RowType rowType;
     private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
     private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+    private final FieldStatsCollector[] stats;
 
-    public ParquetTableStatsExtractor(RowType rowType) {
+    public ParquetTableStatsExtractor(RowType rowType, FieldStatsCollector[] 
stats) {
         this.rowType = rowType;
+        this.stats = stats;
+        Preconditions.checkArgument(
+                rowType.getFieldCount() == stats.length,
+                "The stats collector is not aligned to write schema.");
     }
 
     @Override
@@ -71,79 +78,100 @@ public class ParquetTableStatsExtractor implements 
TableStatsExtractor {
                 .mapToObj(
                         i -> {
                             DataField field = rowType.getFields().get(i);
-                            return toFieldStats(field, 
stats.get(field.name()));
+                            return toFieldStats(field, 
stats.get(field.name()), i);
                         })
                 .toArray(FieldStats[]::new);
     }
 
-    private FieldStats toFieldStats(DataField field, Statistics<?> stats) {
+    private FieldStats toFieldStats(DataField field, Statistics<?> stats, int 
idx) {
         if (stats == null) {
             return new FieldStats(null, null, null);
         }
         long nullCount = stats.getNumNulls();
         if (!stats.hasNonNullValue()) {
-            return new FieldStats(null, null, nullCount);
+            return this.stats[idx].convert(new FieldStats(null, null, 
nullCount));
         }
 
+        FieldStats fieldStats;
         switch (field.type().getTypeRoot()) {
             case CHAR:
             case VARCHAR:
                 assertStatsClass(field, stats, BinaryStatistics.class);
                 BinaryStatistics stringStats = (BinaryStatistics) stats;
-                return new FieldStats(
-                        BinaryString.fromString(stringStats.minAsString()),
-                        BinaryString.fromString(stringStats.maxAsString()),
-                        nullCount);
+                fieldStats =
+                        new FieldStats(
+                                
BinaryString.fromString(stringStats.minAsString()),
+                                
BinaryString.fromString(stringStats.maxAsString()),
+                                nullCount);
+                break;
             case BOOLEAN:
                 assertStatsClass(field, stats, BooleanStatistics.class);
                 BooleanStatistics boolStats = (BooleanStatistics) stats;
-                return new FieldStats(boolStats.getMin(), boolStats.getMax(), 
nullCount);
+                fieldStats = new FieldStats(boolStats.getMin(), 
boolStats.getMax(), nullCount);
+                break;
             case DECIMAL:
                 PrimitiveType primitive = stats.type();
                 DecimalType decimalType = (DecimalType) (field.type());
                 int precision = decimalType.getPrecision();
                 int scale = decimalType.getScale();
-                return convertStatsToDecimalFieldStats(
-                        primitive, field, stats, precision, scale, nullCount);
+                fieldStats =
+                        convertStatsToDecimalFieldStats(
+                                primitive, field, stats, precision, scale, 
nullCount);
+                break;
             case TINYINT:
                 assertStatsClass(field, stats, IntStatistics.class);
                 IntStatistics byteStats = (IntStatistics) stats;
-                return new FieldStats(
-                        (byte) byteStats.getMin(), (byte) byteStats.getMax(), 
nullCount);
+                fieldStats =
+                        new FieldStats(
+                                (byte) byteStats.getMin(), (byte) 
byteStats.getMax(), nullCount);
+                break;
             case SMALLINT:
                 assertStatsClass(field, stats, IntStatistics.class);
                 IntStatistics shortStats = (IntStatistics) stats;
-                return new FieldStats(
-                        (short) shortStats.getMin(), (short) 
shortStats.getMax(), nullCount);
+                fieldStats =
+                        new FieldStats(
+                                (short) shortStats.getMin(),
+                                (short) shortStats.getMax(),
+                                nullCount);
+                break;
             case INTEGER:
             case DATE:
             case TIME_WITHOUT_TIME_ZONE:
                 assertStatsClass(field, stats, IntStatistics.class);
                 IntStatistics intStats = (IntStatistics) stats;
-                return new FieldStats(
-                        Long.valueOf(intStats.getMin()).intValue(),
-                        Long.valueOf(intStats.getMax()).intValue(),
-                        nullCount);
+                fieldStats =
+                        new FieldStats(
+                                Long.valueOf(intStats.getMin()).intValue(),
+                                Long.valueOf(intStats.getMax()).intValue(),
+                                nullCount);
+                break;
             case BIGINT:
                 assertStatsClass(field, stats, LongStatistics.class);
                 LongStatistics longStats = (LongStatistics) stats;
-                return new FieldStats(longStats.getMin(), longStats.getMax(), 
nullCount);
+                fieldStats = new FieldStats(longStats.getMin(), 
longStats.getMax(), nullCount);
+                break;
             case FLOAT:
                 assertStatsClass(field, stats, FloatStatistics.class);
                 FloatStatistics floatStats = (FloatStatistics) stats;
-                return new FieldStats(floatStats.getMin(), 
floatStats.getMax(), nullCount);
+                fieldStats = new FieldStats(floatStats.getMin(), 
floatStats.getMax(), nullCount);
+                break;
             case DOUBLE:
                 assertStatsClass(field, stats, DoubleStatistics.class);
                 DoubleStatistics doubleStats = (DoubleStatistics) stats;
-                return new FieldStats(doubleStats.getMin(), 
doubleStats.getMax(), nullCount);
+                fieldStats = new FieldStats(doubleStats.getMin(), 
doubleStats.getMax(), nullCount);
+                break;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return toTimestampStats(stats, ((TimestampType) 
field.type()).getPrecision());
+                fieldStats = toTimestampStats(stats, ((TimestampType) 
field.type()).getPrecision());
+                break;
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return toTimestampStats(
-                        stats, ((LocalZonedTimestampType) 
field.type()).getPrecision());
+                fieldStats =
+                        toTimestampStats(
+                                stats, ((LocalZonedTimestampType) 
field.type()).getPrecision());
+                break;
             default:
-                return new FieldStats(null, null, nullCount);
+                fieldStats = new FieldStats(null, null, nullCount);
         }
+        return this.stats[idx].convert(fieldStats);
     }
 
     private FieldStats toTimestampStats(Statistics<?> stats, int precision) {
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTableStatsExtractorTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTableStatsExtractorTest.java
index d5a899b65..270d36fac 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTableStatsExtractorTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTableStatsExtractorTest.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.format.orc;
 
 import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.TableStatsExtractorTestBase;
+import org.apache.paimon.format.TableFieldStatsExtractorTestBaseCollector;
 import org.apache.paimon.format.orc.filter.OrcTableStatsExtractor;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.ArrayType;
@@ -44,7 +44,7 @@ import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.types.VarCharType;
 
 /** Tests for {@link OrcTableStatsExtractor}. */
-public class OrcTableStatsExtractorTest extends TableStatsExtractorTestBase {
+public class OrcTableStatsExtractorTest extends 
TableFieldStatsExtractorTestBaseCollector {
 
     @Override
     protected FileFormat createFormat() {
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java
index 5f10de670..be5c321d6 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java
@@ -20,7 +20,7 @@ package org.apache.paimon.format.parquet;
 
 import org.apache.paimon.format.FieldStats;
 import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.TableStatsExtractorTestBase;
+import org.apache.paimon.format.TableFieldStatsExtractorTestBaseCollector;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
@@ -43,7 +43,7 @@ import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.types.VarCharType;
 
 /** Tests for {@link ParquetTableStatsExtractor}. */
-public class ParquetTableStatsExtractorTest extends 
TableStatsExtractorTestBase {
+public class ParquetTableStatsExtractorTest extends 
TableFieldStatsExtractorTestBaseCollector {
 
     @Override
     protected FileFormat createFormat() {

Reply via email to