This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 97aa7f2bb [core] add support of stats mode (#1452)
97aa7f2bb is described below
commit 97aa7f2bb94baf6b385e7ffb607e8c01f297c7e3
Author: Aitozi <[email protected]>
AuthorDate: Tue Jul 4 10:47:10 2023 +0800
[core] add support of stats mode (#1452)
---
docs/content/how-to/creating-tables.md | 13 ++
.../shortcodes/generated/core_configuration.html | 6 +
.../java/org/apache/paimon/format/FileFormat.java | 4 +-
.../apache/paimon/format/TableStatsCollector.java | 33 +---
.../statistics/AbstractFieldStatsCollector.java | 38 +++++
.../statistics/CountsFieldStatsCollector.java | 40 +++++
.../paimon/statistics/FieldStatsCollector.java | 70 ++++++++
.../paimon/statistics/FullFieldStatsCollector.java | 53 ++++++
.../paimon/statistics/NoneFieldStatsCollector.java | 41 +++++
.../statistics/TruncateFieldStatsCollector.java | 127 ++++++++++++++
...TableFieldStatsExtractorTestBaseCollector.java} | 21 ++-
.../paimon/statistics/FieldStatsCollectorTest.java | 188 +++++++++++++++++++++
.../java/org/apache/paimon/AbstractFileStore.java | 4 +-
.../main/java/org/apache/paimon/CoreOptions.java | 31 ++++
.../org/apache/paimon/append/AppendOnlyWriter.java | 9 +-
.../apache/paimon/io/KeyValueDataFileWriter.java | 9 +-
.../paimon/io/KeyValueFileWriterFactory.java | 23 ++-
.../org/apache/paimon/io/RowDataFileWriter.java | 7 +-
.../apache/paimon/io/RowDataRollingFileWriter.java | 9 +-
.../paimon/io/StatsCollectingSingleFileWriter.java | 16 +-
.../org/apache/paimon/manifest/ManifestFile.java | 25 ++-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 10 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 6 +-
.../java/org/apache/paimon/utils/StatsUtils.java | 60 +++++++
.../apache/paimon/append/AppendOnlyWriterTest.java | 7 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 6 +-
.../format/FileStatsExtractingAvroFormat.java | 6 +-
.../paimon/io/DataFileTestDataGenerator.java | 15 +-
.../paimon/io/KeyValueFileReadWriteTest.java | 3 +-
.../apache/paimon/io/RollingFileWriterTest.java | 21 ++-
.../paimon/manifest/ManifestFileMetaTestBase.java | 7 +-
.../apache/paimon/manifest/ManifestFileTest.java | 7 +-
.../paimon/manifest/ManifestTestDataGenerator.java | 9 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 4 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 6 +-
.../paimon/stats/TableStatsCollectorTest.java | 10 +-
.../paimon/stats/TestTableStatsExtractor.java | 13 +-
.../paimon/utils/FieldStatsCollectorUtilsTest.java | 65 +++++++
.../apache/paimon/format/orc/OrcFileFormat.java | 6 +-
.../format/orc/filter/OrcTableStatsExtractor.java | 125 +++++++++-----
.../paimon/format/parquet/ParquetFileFormat.java | 6 +-
.../format/parquet/ParquetTableStatsExtractor.java | 80 ++++++---
.../format/orc/OrcTableStatsExtractorTest.java | 4 +-
.../parquet/ParquetTableStatsExtractorTest.java | 4 +-
44 files changed, 1093 insertions(+), 154 deletions(-)
diff --git a/docs/content/how-to/creating-tables.md
b/docs/content/how-to/creating-tables.md
index 0f4916c6c..6121991a5 100644
--- a/docs/content/how-to/creating-tables.md
+++ b/docs/content/how-to/creating-tables.md
@@ -235,6 +235,19 @@ The following three types of fields may be defined as
partition fields in the wa
if you declare the primary key containing partition field, you can achieve
the unique effect.
- CDC op_ts: It cannot be defined as a partition field, unable to know
previous record timestamp.
+### Specify the statistics collector mode
+
+Paimon will automatically collect the statistics of the data file for speeding
up the query process. There are four modes supported:
+
+- `full`: collect the full metrics: `null_count, min, max`
+- `truncate(length)`: length can be any positive number, the default mode is
`truncate(16)`, which means collect the null count, min/max value with
truncated length of 16.
+ This is mainly to avoid too big column which will enlarge the manifest file.
+- `counts`: only collect the null count.
+- `none`: disable the metadata stats collection.
+
+The statistics collector mode can also be configured at the field level by
setting [field.{field_name}.stats.mode]({{< ref
"maintenance/configurations#coreoptions" >}}).
+
+
## Create Table As
Table can be created and populated by the results of a query, for example, we
have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index c85ad3039..5417737df 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -242,6 +242,12 @@ under the License.
<td><p>Enum</p></td>
<td>Specify the merge engine for table with primary key.<br /><br
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last
row.</li><li>"partial-update": Partial update non-null
fields.</li><li>"aggregation": Aggregate fields with same primary
key.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>metadata.stats.mode</h5></td>
+ <td style="word-wrap: break-word;">"truncate(16)"</td>
+ <td>String</td>
+ <td>The mode of metadata stats collection. none, counts,
truncate(16), full is available.<br /><ul><li>"none": means disable the
metadata stats collection.</li></ul><ul><li>"counts" means only collect the
null count.</li></ul><ul><li>"full": means collect the null count, min/max
value.</li></ul><ul><li>"truncate(16)": means collect the null count, min/max
value with truncated length of 16.</li></ul><ul><li>Field level stats mode can
be specified by field.{field_name}.stats.mo [...]
+ </tr>
<tr>
<td><h5>num-levels</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index 015a80ec0..a6ba21c50 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -75,7 +76,8 @@ public abstract class FileFormat {
return createReaderFactory(rowType, projection, new ArrayList<>());
}
- public Optional<TableStatsExtractor> createStatsExtractor(RowType type) {
+ public Optional<TableStatsExtractor> createStatsExtractor(
+ RowType type, FieldStatsCollector[] stats) {
return Optional.empty();
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
index f85273be4..0db227389 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
@@ -21,24 +21,21 @@ package org.apache.paimon.format;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
/** Collector to extract statistics of each fields from a series of records. */
public class TableStatsCollector {
- private final Object[] minValues;
- private final Object[] maxValues;
- private final long[] nullCounts;
private final RowDataToObjectArrayConverter converter;
+ private final FieldStatsCollector[] stats;
private final Serializer<Object>[] fieldSerializers;
- public TableStatsCollector(RowType rowType) {
+ public TableStatsCollector(RowType rowType, FieldStatsCollector[] stats) {
int numFields = rowType.getFieldCount();
- this.minValues = new Object[numFields];
- this.maxValues = new Object[numFields];
- this.nullCounts = new long[numFields];
this.converter = new RowDataToObjectArrayConverter(rowType);
+ this.stats = stats;
this.fieldSerializers = new Serializer[numFields];
for (int i = 0; i < numFields; i++) {
fieldSerializers[i] =
InternalSerializers.create(rowType.getTypeAt(i));
@@ -54,30 +51,16 @@ public class TableStatsCollector {
public void collect(InternalRow row) {
Object[] objects = converter.convert(row);
for (int i = 0; i < row.getFieldCount(); i++) {
+ FieldStatsCollector collector = stats[i];
Object obj = objects[i];
- if (obj == null) {
- nullCounts[i]++;
- continue;
- }
-
- // TODO use comparator for not comparable types and extract this
logic to a util class
- if (!(obj instanceof Comparable)) {
- continue;
- }
- Comparable<Object> c = (Comparable<Object>) obj;
- if (minValues[i] == null || c.compareTo(minValues[i]) < 0) {
- minValues[i] = fieldSerializers[i].copy(c);
- }
- if (maxValues[i] == null || c.compareTo(maxValues[i]) > 0) {
- maxValues[i] = fieldSerializers[i].copy(c);
- }
+ collector.collect(obj, fieldSerializers[i]);
}
}
public FieldStats[] extract() {
- FieldStats[] stats = new FieldStats[nullCounts.length];
+ FieldStats[] stats = new FieldStats[this.stats.length];
for (int i = 0; i < stats.length; i++) {
- stats[i] = new FieldStats(minValues[i], maxValues[i],
nullCounts[i]);
+ stats[i] = this.stats[i].result();
}
return stats;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/statistics/AbstractFieldStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/statistics/AbstractFieldStatsCollector.java
new file mode 100644
index 000000000..b51efed6f
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/statistics/AbstractFieldStatsCollector.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.format.FieldStats;
+
+/** Abstract base stats collector. */
+public abstract class AbstractFieldStatsCollector implements
FieldStatsCollector {
+
+ protected Object minValue;
+
+ protected Object maxValue;
+
+ protected long nullCount;
+
+ @Override
+ public FieldStats result() {
+ return new FieldStats(minValue, maxValue, nullCount);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/statistics/CountsFieldStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/statistics/CountsFieldStatsCollector.java
new file mode 100644
index 000000000..1bab248b1
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/statistics/CountsFieldStatsCollector.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+
+/** The counts stats collector, which will only report null count stats. */
+public class CountsFieldStatsCollector extends AbstractFieldStatsCollector {
+
+ @Override
+ public void collect(Object field, Serializer<Object> serializer) {
+ if (field == null) {
+ nullCount++;
+ }
+ }
+
+ @Override
+ public FieldStats convert(FieldStats source) {
+ return new FieldStats(null, null, source.nullCount());
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
new file mode 100644
index 000000000..d77a2d266
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+
+import java.util.regex.Matcher;
+
+import static
org.apache.paimon.statistics.TruncateFieldStatsCollector.TRUNCATE_PATTERN;
+
+/** The mode of the field stats. */
+public interface FieldStatsCollector {
+
+ /**
+ * collect stats from the field.
+ *
+ * @param field The target field object.
+ * @param fieldSerializer The serializer of the field object.
+ */
+ void collect(Object field, Serializer<Object> fieldSerializer);
+
+ /** @return The collected field stats. */
+ FieldStats result();
+
+ /**
+ * Convert the field stats according to the strategy.
+ *
+ * @param source The source field stats, extracted from the file.
+ * @return The converted field stats.
+ */
+ FieldStats convert(FieldStats source);
+
+ static FieldStatsCollector from(String option) {
+ String upper = option.toUpperCase();
+ switch (upper) {
+ case "NONE":
+ return new NoneFieldStatsCollector();
+ case "FULL":
+ return new FullFieldStatsCollector();
+ case "COUNTS":
+ return new CountsFieldStatsCollector();
+ default:
+ Matcher matcher = TRUNCATE_PATTERN.matcher(upper);
+ if (matcher.matches()) {
+ String length = matcher.group(1);
+ return new
TruncateFieldStatsCollector(Integer.parseInt(length));
+ }
+ throw new IllegalArgumentException("Unexpected option: " +
option);
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/statistics/FullFieldStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/statistics/FullFieldStatsCollector.java
new file mode 100644
index 000000000..9db97a70d
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/statistics/FullFieldStatsCollector.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+
+/** The full stats collector which will report null count, min value, max
value if available. */
+public class FullFieldStatsCollector extends AbstractFieldStatsCollector {
+
+ @Override
+ public void collect(Object field, Serializer<Object> fieldSerializer) {
+ if (field == null) {
+ nullCount++;
+ return;
+ }
+
+ // TODO use comparator for not comparable types and extract this logic
to a util class
+ if (!(field instanceof Comparable)) {
+ return;
+ }
+ Comparable<Object> c = (Comparable<Object>) field;
+ if (minValue == null || c.compareTo(minValue) < 0) {
+ minValue = fieldSerializer.copy(c);
+ }
+ if (maxValue == null || c.compareTo(maxValue) > 0) {
+ maxValue = fieldSerializer.copy(c);
+ }
+ }
+
+ @Override
+ public FieldStats convert(FieldStats source) {
+ return source;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/statistics/NoneFieldStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/statistics/NoneFieldStatsCollector.java
new file mode 100644
index 000000000..23fc6ffd0
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/statistics/NoneFieldStatsCollector.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+
+/** The none stats collector which report nothing. */
+public class NoneFieldStatsCollector extends AbstractFieldStatsCollector {
+
+ @Override
+ public void collect(Object field, Serializer<Object> fieldSerializer) {}
+
+ @Override
+ public FieldStats result() {
+ return new FieldStats(null, null, null);
+ }
+
+ @Override
+ public FieldStats convert(FieldStats source) {
+ return new FieldStats(null, null, null);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/statistics/TruncateFieldStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/statistics/TruncateFieldStatsCollector.java
new file mode 100644
index 000000000..bae77bed6
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/statistics/TruncateFieldStatsCollector.java
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.regex.Pattern;
+
+/**
+ * The truncate stats collector which will report null count, truncated
min/max value. Currently,
+ * truncation only performs on the {@link BinaryString} value.
+ */
+public class TruncateFieldStatsCollector extends AbstractFieldStatsCollector {
+
+ public static final Pattern TRUNCATE_PATTERN =
Pattern.compile("TRUNCATE\\((\\d+)\\)");
+
+ private final int length;
+
+ public TruncateFieldStatsCollector(int length) {
+ Preconditions.checkArgument(length > 0, "Truncate length should larger
than zero.");
+ this.length = length;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ @Override
+ public void collect(Object field, Serializer<Object> fieldSerializer) {
+ if (field == null) {
+ nullCount++;
+ return;
+ }
+
+ // TODO use comparator for not comparable types and extract this logic
to a util class
+ if (!(field instanceof Comparable)) {
+ return;
+ }
+
+ Comparable<Object> c = (Comparable<Object>) field;
+ if (minValue == null || c.compareTo(minValue) < 0) {
+ minValue = truncateMin(field, fieldSerializer);
+ }
+ if (maxValue == null || c.compareTo(maxValue) > 0) {
+ maxValue = truncateMax(field, fieldSerializer);
+ }
+ }
+
+ @Override
+ public FieldStats convert(FieldStats source) {
+ return new FieldStats(
+ truncateMin(source.minValue(), null),
+ truncateMax(source.maxValue(), null),
+ source.nullCount());
+ }
+
+ /** @return a truncated value less or equal than the old value. */
+ private Object truncateMin(Object field, @Nullable Serializer<Object>
fieldSerializer) {
+ if (field == null) {
+ return null;
+ }
+ if (field instanceof BinaryString) {
+ return ((BinaryString) field).substring(0, length);
+ } else {
+ return fieldSerializer == null ? field :
fieldSerializer.copy(field);
+ }
+ }
+
+ /** @return a value greater or equal than the old value. */
+ private Object truncateMax(Object field, @Nullable Serializer<Object>
fieldSerializer) {
+ if (field == null) {
+ return null;
+ }
+ if (field instanceof BinaryString) {
+ BinaryString original = ((BinaryString) field);
+ BinaryString truncated = original.substring(0, length);
+
+ // No need to increment if the input length is under the truncate
length
+ if (original.getSizeInBytes() == truncated.getSizeInBytes()) {
+ return field;
+ }
+
+ StringBuilder truncatedStringBuilder = new
StringBuilder(truncated.toString());
+
+ // Try incrementing the code points from the end
+ for (int i = length - 1; i >= 0; i--) {
+ // Get the offset in the truncated string buffer where the
number of unicode
+ // characters = i
+ int offsetByCodePoint =
truncatedStringBuilder.offsetByCodePoints(0, i);
+ int nextCodePoint =
truncatedStringBuilder.codePointAt(offsetByCodePoint) + 1;
+ // No overflow
+ if (nextCodePoint != 0 &&
Character.isValidCodePoint(nextCodePoint)) {
+ truncatedStringBuilder.setLength(offsetByCodePoint);
+ // Append next code point to the truncated substring
+ truncatedStringBuilder.appendCodePoint(nextCodePoint);
+ return
BinaryString.fromString(truncatedStringBuilder.toString());
+ }
+ }
+ return null; // Cannot find a valid upper bound
+ } else {
+ return fieldSerializer == null ? field :
fieldSerializer.copy(field);
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/format/TableStatsExtractorTestBase.java
b/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
similarity index 92%
rename from
paimon-common/src/test/java/org/apache/paimon/format/TableStatsExtractorTestBase.java
rename to
paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
index 69bf6b86b..b3eda7c70 100644
---
a/paimon-common/src/test/java/org/apache/paimon/format/TableStatsExtractorTestBase.java
+++
b/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.CharType;
@@ -44,8 +45,9 @@ import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.math.BigDecimal;
import java.time.Instant;
@@ -55,21 +57,28 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
import static org.apache.paimon.types.DataTypeChecks.getPrecision;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link TableStatsExtractor}. */
-public abstract class TableStatsExtractorTestBase {
+public abstract class TableFieldStatsExtractorTestBaseCollector {
@TempDir java.nio.file.Path tempDir;
private final FileIO fileIO = new LocalFileIO();
- @Test
- public void testExtract() throws Exception {
+ @ParameterizedTest
+ @ValueSource(strings = {"none", "counts", "full", "truncate(3)",
"truncate(12)"})
+ public void testExtract(String mode) throws Exception {
FileFormat format = createFormat();
RowType rowType = rowType();
+ int count = rowType().getFieldCount();
+ FieldStatsCollector[] stats =
+ IntStream.range(0, count)
+ .mapToObj(p -> FieldStatsCollector.from(mode))
+ .toArray(FieldStatsCollector[]::new);
FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
Path path = new Path(tempDir.toString() + "/test");
@@ -82,13 +91,13 @@ public abstract class TableStatsExtractorTestBase {
}
writer.finish();
- TableStatsCollector collector = new TableStatsCollector(rowType);
+ TableStatsCollector collector = new TableStatsCollector(rowType,
stats);
for (GenericRow row : data) {
collector.collect(row);
}
FieldStats[] expected = collector.extract();
- TableStatsExtractor extractor =
format.createStatsExtractor(rowType).get();
+ TableStatsExtractor extractor = format.createStatsExtractor(rowType,
stats).get();
assertThat(extractor).isNotNull();
FieldStats[] actual = extractor.extract(fileIO, path);
for (int i = 0; i < expected.length; i++) {
diff --git
a/paimon-common/src/test/java/org/apache/paimon/statistics/FieldStatsCollectorTest.java
b/paimon-common/src/test/java/org/apache/paimon/statistics/FieldStatsCollectorTest.java
new file mode 100644
index 000000000..1dec6f25d
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/statistics/FieldStatsCollectorTest.java
@@ -0,0 +1,188 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.statistics;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Test for {@link FieldStatsCollector}. */
+public class FieldStatsCollectorTest {
+
+ private Serializer<Object>[] serializers;
+ private static final String s1 = StringUtils.repeat("a", 12);
+ private static final String s1_t = "aa";
+ private static final String s2 = StringUtils.repeat("b", 12);
+ private static final String s3 = StringUtils.repeat("d", 13);
+ private static final String s3_t = "de";
+
+ @BeforeEach
+ public void before() {
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "a", new IntType(),
"Someone's desc."),
+ new DataField(1, "b", new VarCharType())));
+ serializers = new Serializer[2];
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ serializers[i] = InternalSerializers.create(rowType.getTypeAt(i));
+ }
+ }
+
+ @Test
+ public void testParse() {
+ Assertions.assertTrue(FieldStatsCollector.from("none") instanceof
NoneFieldStatsCollector);
+ Assertions.assertTrue(FieldStatsCollector.from("Full") instanceof
FullFieldStatsCollector);
+ Assertions.assertTrue(
+ FieldStatsCollector.from("CoUNts") instanceof
CountsFieldStatsCollector);
+ TruncateFieldStatsCollector t1 =
+ (TruncateFieldStatsCollector)
FieldStatsCollector.from("truncate(10)");
+ Assertions.assertEquals(10, t1.getLength());
+ assertThatThrownBy(() -> FieldStatsCollector.from("aatruncate(10)"))
+ .isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() -> FieldStatsCollector.from("truncate(10.1)"))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testNone() {
+ List<GenericRow> rows = getRows();
+ for (int i = 0; i < serializers.length; i++) {}
+ check(
+ rows,
+ 0,
+ new FieldStats(null, null, null),
+ new FieldStats(1, 4, 0L),
+ new NoneFieldStatsCollector());
+ check(
+ rows,
+ 1,
+ new FieldStats(null, null, null),
+ new FieldStats(s1, s3, 1L),
+ new NoneFieldStatsCollector());
+ }
+
+ @Test
+ public void testCounts() {
+ List<GenericRow> rows = getRows();
+ for (int i = 0; i < serializers.length; i++) {}
+ check(
+ rows,
+ 0,
+ new FieldStats(null, null, 0L),
+ new FieldStats(1, 4, 0L),
+ new CountsFieldStatsCollector());
+ check(
+ rows,
+ 1,
+ new FieldStats(null, null, 1L),
+ new FieldStats(s1, s3, 1L),
+ new CountsFieldStatsCollector());
+ }
+
+ @Test
+ public void testFull() {
+ List<GenericRow> rows = getRows();
+ for (int i = 0; i < serializers.length; i++) {}
+ check(
+ rows,
+ 0,
+ new FieldStats(1, 4, 0L),
+ new FieldStats(1, 4, 0L),
+ new FullFieldStatsCollector());
+ check(
+ rows,
+ 1,
+ new FieldStats(BinaryString.fromString(s1),
BinaryString.fromString(s3), 1L),
+ new FieldStats(BinaryString.fromString(s1),
BinaryString.fromString(s3), 1L),
+ new FullFieldStatsCollector());
+ }
+
+ @Test
+ public void testTruncate() {
+ List<GenericRow> rows = getRows();
+ for (int i = 0; i < serializers.length; i++) {}
+ check(
+ rows,
+ 0,
+ new FieldStats(1, 4, 0L),
+ new FieldStats(1, 4, 0L),
+ new TruncateFieldStatsCollector(1));
+ check(
+ rows,
+ 1,
+ new FieldStats(BinaryString.fromString(s1_t),
BinaryString.fromString(s3_t), 1L),
+ new FieldStats(BinaryString.fromString(s1),
BinaryString.fromString(s3), 1L),
+ new TruncateFieldStatsCollector(2));
+ }
+
+ @Test
+ public void testTruncateTwoChar() {
+ TruncateFieldStatsCollector t1 = new TruncateFieldStatsCollector(1);
+ FieldStats fieldStats =
+ new FieldStats(
+ BinaryString.fromString("\uD83E\uDD18a"),
+ BinaryString.fromString("\uD83E\uDD18b"),
+ 0L);
+ fieldStats = t1.convert(fieldStats);
+ Assertions.assertEquals(BinaryString.fromString("\uD83E\uDD18"),
fieldStats.minValue());
+ Assertions.assertEquals(BinaryString.fromString("\uD83E\uDD19"),
fieldStats.maxValue());
+ }
+
+ private void check(
+ List<GenericRow> rows,
+ int column,
+ FieldStats expected,
+ FieldStats formatExtracted,
+ FieldStatsCollector fieldStatsCollector) {
+ for (GenericRow row : rows) {
+ fieldStatsCollector.collect(row.getField(column),
serializers[column]);
+ }
+ Assertions.assertEquals(expected, fieldStatsCollector.result());
+ Assertions.assertEquals(expected,
fieldStatsCollector.convert(formatExtracted));
+ }
+
+ private List<GenericRow> getRows() {
+ List<GenericRow> rows = new ArrayList<>();
+ rows.add(GenericRow.of(1, BinaryString.fromString(s1)));
+ rows.add(GenericRow.of(2, BinaryString.fromString(s2)));
+ rows.add(GenericRow.of(3, null));
+ rows.add(GenericRow.of(4, BinaryString.fromString(s3)));
+ return rows;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 558daa899..883eb2611 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -38,6 +38,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsUtils;
import org.apache.paimon.utils.TagManager;
import javax.annotation.Nullable;
@@ -104,7 +105,8 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
options.manifestFormat(),
pathFactory(),
options.manifestTargetSize().getBytes(),
- forWrite ? writeManifestCache : null);
+ forWrite ? writeManifestCache : null,
+ StatsUtils.getFieldsStatsMode(options,
partitionType.getFieldNames()));
}
@VisibleForTesting
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index b35aae375..d0baf4935 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -694,6 +694,37 @@ public class CoreOptions implements Serializable {
"Read incremental changes between start snapshot
(exclusive) and end snapshot, "
+ "for example, '5,10' means changes
between snapshot 5 and snapshot 10.");
+ public static final String FIELD_STATS_MODE_PREFIX = "field";
+ public static final String FIELD_STATS_MODE_SUFFIX = "stats.mode";
+
+ public static final ConfigOption<String> STATS_MODE =
+ key("metadata.stats.mode")
+ .stringType()
+ .defaultValue("truncate(16)")
+ .withDescription(
+ Description.builder()
+ .text(
+ "The mode of metadata stats
collection. none, counts, truncate(16), full is available.")
+ .linebreak()
+ .list(
+ text(
+ "\"none\": means disable
the metadata stats collection."))
+ .list(text("\"counts\" means only collect
the null count."))
+ .list(
+ text(
+ "\"full\": means collect
the null count, min/max value."))
+ .list(
+ text(
+ "\"truncate(16)\": means
collect the null count, min/max value with truncated length of 16."))
+ .list(
+ text(
+ "Field level stats mode
can be specified by "
+ +
FIELD_STATS_MODE_PREFIX
+ + "."
+ + "{field_name}."
+ +
FIELD_STATS_MODE_SUFFIX))
+ .build());
+
private final Options options;
public CoreOptions(Map<String, String> options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index b40c038c5..98c05cda4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -28,6 +28,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
@@ -64,6 +65,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
private final String fileCompression;
private RowDataRollingFileWriter writer;
+ private FieldStatsCollector[] stats;
public AppendOnlyWriter(
FileIO fileIO,
@@ -76,7 +78,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
boolean forceCompact,
DataFilePathFactory pathFactory,
@Nullable CommitIncrement increment,
- String fileCompression) {
+ String fileCompression,
+ FieldStatsCollector[] stats) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -98,6 +101,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
compactBefore.addAll(increment.compactIncrement().compactBefore());
compactAfter.addAll(increment.compactIncrement().compactAfter());
}
+ this.stats = stats;
}
@Override
@@ -173,7 +177,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
writeSchema,
pathFactory,
seqNumCounter,
- fileCompression);
+ fileCompression,
+ stats);
}
private void trySyncLatestCompaction(boolean blocking)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 4ee489f00..1f7b16422 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.io;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
@@ -30,6 +31,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StatsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +78,8 @@ public class KeyValueDataFileWriter
@Nullable TableStatsExtractor tableStatsExtractor,
long schemaId,
int level,
- String compression) {
+ String compression,
+ CoreOptions options) {
super(
fileIO,
factory,
@@ -84,7 +87,9 @@ public class KeyValueDataFileWriter
converter,
KeyValue.schema(keyType, valueType),
tableStatsExtractor,
- compression);
+ compression,
+ StatsUtils.getFieldsStatsMode(
+ options, KeyValue.schema(keyType,
valueType).getFieldNames()));
this.keyType = keyType;
this.valueType = valueType;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index f0a322c2d..87dc5d88f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -18,6 +18,7 @@
package org.apache.paimon.io;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializer;
import org.apache.paimon.annotation.VisibleForTesting;
@@ -29,6 +30,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsUtils;
import javax.annotation.Nullable;
@@ -47,6 +49,7 @@ public class KeyValueFileWriterFactory {
private final long suggestedFileSize;
private final Map<Integer, String> levelCompressions;
private final String fileCompression;
+ private final CoreOptions options;
private KeyValueFileWriterFactory(
FileIO fileIO,
@@ -58,7 +61,8 @@ public class KeyValueFileWriterFactory {
DataFilePathFactory pathFactory,
long suggestedFileSize,
Map<Integer, String> levelCompressions,
- String fileCompression) {
+ String fileCompression,
+ CoreOptions options) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.keyType = keyType;
@@ -69,6 +73,7 @@ public class KeyValueFileWriterFactory {
this.suggestedFileSize = suggestedFileSize;
this.levelCompressions = levelCompressions;
this.fileCompression = fileCompression;
+ this.options = options;
}
public RowType keyType() {
@@ -118,7 +123,8 @@ public class KeyValueFileWriterFactory {
tableStatsExtractor,
schemaId,
level,
- compression);
+ compression,
+ options);
}
public void deleteFile(String filename) {
@@ -169,7 +175,8 @@ public class KeyValueFileWriterFactory {
BinaryRow partition,
int bucket,
Map<Integer, String> levelCompressions,
- String fileCompression) {
+ String fileCompression,
+ CoreOptions options) {
RowType recordType = KeyValue.schema(keyType, valueType);
return new KeyValueFileWriterFactory(
fileIO,
@@ -177,11 +184,17 @@ public class KeyValueFileWriterFactory {
keyType,
valueType,
fileFormat.createWriterFactory(recordType),
- fileFormat.createStatsExtractor(recordType).orElse(null),
+ fileFormat
+ .createStatsExtractor(
+ recordType,
+ StatsUtils.getFieldsStatsMode(
+ options,
recordType.getFieldNames()))
+ .orElse(null),
pathFactory.createDataFilePathFactory(partition, bucket),
suggestedFileSize,
levelCompressions,
- fileCompression);
+ fileCompression,
+ options);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 241f41ec8..8cbbe8ed8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.RowType;
@@ -52,7 +53,8 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
@Nullable TableStatsExtractor tableStatsExtractor,
long schemaId,
LongCounter seqNumCounter,
- String fileCompression) {
+ String fileCompression,
+ FieldStatsCollector[] stats) {
super(
fileIO,
factory,
@@ -60,7 +62,8 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
Function.identity(),
writeSchema,
tableStatsExtractor,
- fileCompression);
+ fileCompression,
+ stats);
this.schemaId = schemaId;
this.seqNumCounter = seqNumCounter;
this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 7b17ed167..8b99b6389 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -22,6 +22,7 @@ package org.apache.paimon.io;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
@@ -36,7 +37,8 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
RowType writeSchema,
DataFilePathFactory pathFactory,
LongCounter seqNumCounter,
- String fileCompression) {
+ String fileCompression,
+ FieldStatsCollector[] stats) {
super(
() ->
new RowDataFileWriter(
@@ -44,10 +46,11 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
fileFormat.createWriterFactory(writeSchema),
pathFactory.newPath(),
writeSchema,
-
fileFormat.createStatsExtractor(writeSchema).orElse(null),
+ fileFormat.createStatsExtractor(writeSchema,
stats).orElse(null),
schemaId,
seqNumCounter,
- fileCompression),
+ fileCompression,
+ stats),
targetFileSize);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index 03742b1d3..0fd5e5c1f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -26,12 +26,15 @@ import org.apache.paimon.format.TableStatsCollector;
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.NoneFieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.function.Function;
/**
@@ -44,6 +47,7 @@ public abstract class StatsCollectingSingleFileWriter<T, R>
extends SingleFileWr
@Nullable private final TableStatsExtractor tableStatsExtractor;
@Nullable private TableStatsCollector tableStatsCollector = null;
+ private final boolean isStatsCollectorDisabled;
public StatsCollectingSingleFileWriter(
FileIO fileIO,
@@ -52,18 +56,24 @@ public abstract class StatsCollectingSingleFileWriter<T, R>
extends SingleFileWr
Function<T, InternalRow> converter,
RowType writeSchema,
@Nullable TableStatsExtractor tableStatsExtractor,
- String compression) {
+ String compression,
+ FieldStatsCollector[] stats) {
super(fileIO, factory, path, converter, compression);
this.tableStatsExtractor = tableStatsExtractor;
+ this.isStatsCollectorDisabled =
+ Arrays.stream(stats).allMatch(p -> p instanceof
NoneFieldStatsCollector);
if (this.tableStatsExtractor == null) {
- this.tableStatsCollector = new TableStatsCollector(writeSchema);
+ this.tableStatsCollector = new TableStatsCollector(writeSchema,
stats);
}
+ Preconditions.checkArgument(
+ stats.length == writeSchema.getFieldCount(),
+ "The stats collector is not aligned to write schema.");
}
@Override
public void write(T record) throws IOException {
InternalRow rowData = writeImpl(record);
- if (tableStatsCollector != null) {
+ if (tableStatsCollector != null && !isStatsCollectorDisabled) {
tableStatsCollector.collect(rowData);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index e5da04c94..562077920 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -52,6 +53,7 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> {
private final RowType partitionType;
private final FormatWriterFactory writerFactory;
private final long suggestedFileSize;
+ private final FieldStatsCollector[] partitionStats;
private ManifestFile(
FileIO fileIO,
@@ -62,12 +64,14 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
FormatWriterFactory writerFactory,
PathFactory pathFactory,
long suggestedFileSize,
- @Nullable SegmentsCache<String> cache) {
+ @Nullable SegmentsCache<String> cache,
+ FieldStatsCollector[] partitionStats) {
super(fileIO, serializer, readerFactory, writerFactory, pathFactory,
cache);
this.schemaManager = schemaManager;
this.partitionType = partitionType;
this.writerFactory = writerFactory;
this.suggestedFileSize = suggestedFileSize;
+ this.partitionStats = partitionStats;
}
@VisibleForTesting
@@ -87,7 +91,8 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> {
new ManifestEntryWriter(
writerFactory,
pathFactory.newPath(),
-
CoreOptions.FILE_COMPRESSION.defaultValue()),
+
CoreOptions.FILE_COMPRESSION.defaultValue(),
+ partitionStats),
suggestedFileSize);
try {
writer.write(entries);
@@ -107,10 +112,14 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
private long numDeletedFiles = 0;
private long schemaId = Long.MIN_VALUE;
- ManifestEntryWriter(FormatWriterFactory factory, Path path, String
fileCompression) {
+ ManifestEntryWriter(
+ FormatWriterFactory factory,
+ Path path,
+ String fileCompression,
+ FieldStatsCollector[] partitionStats) {
super(ManifestFile.this.fileIO, factory, path, serializer::toRow,
fileCompression);
- this.partitionStatsCollector = new
TableStatsCollector(partitionType);
+ this.partitionStatsCollector = new
TableStatsCollector(partitionType, partitionStats);
this.partitionStatsSerializer = new
FieldStatsArraySerializer(partitionType);
}
@@ -157,6 +166,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;
@Nullable private final SegmentsCache<String> cache;
+ private final FieldStatsCollector[] partitionStats;
public Factory(
FileIO fileIO,
@@ -165,7 +175,8 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
FileFormat fileFormat,
FileStorePathFactory pathFactory,
long suggestedFileSize,
- @Nullable SegmentsCache<String> cache) {
+ @Nullable SegmentsCache<String> cache,
+ FieldStatsCollector[] partitionStats) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.partitionType = partitionType;
@@ -173,6 +184,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
this.cache = cache;
+ this.partitionStats = partitionStats;
}
public ManifestFile create() {
@@ -186,7 +198,8 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
fileFormat.createWriterFactory(entryType),
pathFactory.manifestFileFactory(),
suggestedFileSize,
- cache);
+ cache,
+ partitionStats);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 15727b38d..8efd1fd6d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -32,6 +32,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
@@ -40,6 +41,7 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.LongCounter;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsUtils;
import javax.annotation.Nullable;
@@ -67,6 +69,7 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
private final String fileCompression;
private boolean skipCompaction;
private BucketMode bucketMode = BucketMode.FIXED;
+ private FieldStatsCollector[] stats;
public AppendOnlyFileStoreWrite(
FileIO fileIO,
@@ -92,6 +95,7 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
this.skipCompaction = options.writeOnly();
this.assertDisorder =
options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER);
this.fileCompression = options.fileCompression();
+ this.stats = StatsUtils.getFieldsStatsMode(options,
rowType.getFieldNames());
}
@Override
@@ -128,7 +132,8 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
commitForceCompact,
factory,
restoreIncrement,
- fileCompression);
+ fileCompression,
+ stats);
}
public AppendOnlyCompactManager.CompactRewriter compactRewriter(
@@ -146,7 +151,8 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
rowType,
pathFactory.createDataFilePathFactory(partition,
bucket),
new
LongCounter(toCompact.get(0).minSequenceNumber()),
- fileCompression);
+ fileCompression,
+ stats);
rewriter.write(
new RecordReaderIterator<>(
read.createReader(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 01cf7b526..4c5d6926c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -147,7 +147,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
partition,
bucket,
options.fileCompressionPerLevel(),
- options.fileCompression());
+ options.fileCompression(),
+ options);
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles,
options.numLevels());
UniversalCompaction universalCompaction =
@@ -210,7 +211,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
partition,
bucket,
options.fileCompressionPerLevel(),
- options.fileCompression());
+ options.fileCompression(),
+ options);
MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType,
ioManager);
switch (options.changelogProducer()) {
case FULL_COMPACTION:
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsUtils.java
new file mode 100644
index 000000000..35141c6b7
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsUtils.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.FieldStatsCollector;
+
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.FIELD_STATS_MODE_PREFIX;
+import static org.apache.paimon.CoreOptions.FIELD_STATS_MODE_SUFFIX;
+import static org.apache.paimon.options.ConfigOptions.key;
+
+/** The stats utils. */
+public class StatsUtils {
+
+ public static FieldStatsCollector[] getFieldsStatsMode(
+ CoreOptions options, List<String> fields) {
+ Options cfg = options.toConfiguration();
+ FieldStatsCollector defaultMode =
FieldStatsCollector.from(cfg.get(CoreOptions.STATS_MODE));
+ FieldStatsCollector[] modes = new FieldStatsCollector[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+
+ String fieldMode =
+ cfg.get(
+ key(String.format(
+ "%s.%s.%s",
+ FIELD_STATS_MODE_PREFIX,
+ fields.get(i),
+ FIELD_STATS_MODE_SUFFIX))
+ .stringType()
+ .noDefaultValue());
+ if (fieldMode != null) {
+ modes[i] = FieldStatsCollector.from(fieldMode);
+ } else {
+ modes[i] = defaultMode;
+ }
+ }
+ return modes;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 9c2913c7e..92a8cb531 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -40,12 +40,14 @@ import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.StatsUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
@@ -317,7 +319,10 @@ public class AppendOnlyWriterTest {
forceCompact,
pathFactory,
null,
- CoreOptions.FILE_COMPRESSION.defaultValue());
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ StatsUtils.getFieldsStatsMode(
+ new CoreOptions(new HashMap<>()),
+ AppendOnlyWriterTest.SCHEMA.getFieldNames()));
return Pair.of(writer, compactManager.allFiles());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 40e814844..13caec6b7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -35,11 +35,13 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.StatsUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.util.HashMap;
import java.util.LinkedList;
/** test file format suffix. */
@@ -74,7 +76,9 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
false,
dataFilePathFactory,
null,
- CoreOptions.FILE_COMPRESSION.defaultValue());
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ StatsUtils.getFieldsStatsMode(
+ new CoreOptions(new HashMap<>()),
SCHEMA.getFieldNames()));
appendOnlyWriter.write(
GenericRow.of(1, BinaryString.fromString("aaa"),
BinaryString.fromString("1")));
CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
index 82ce6e2fd..218cd141e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.stats.TestTableStatsExtractor;
import org.apache.paimon.types.RowType;
@@ -55,7 +56,8 @@ public class FileStatsExtractingAvroFormat extends FileFormat
{
}
@Override
- public Optional<TableStatsExtractor> createStatsExtractor(RowType type) {
- return Optional.of(new TestTableStatsExtractor(this, type));
+ public Optional<TableStatsExtractor> createStatsExtractor(
+ RowType type, FieldStatsCollector[] stats) {
+ return Optional.of(new TestTableStatsExtractor(this, type, stats));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index cbdcbf4ec..c767f1d7b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -22,6 +22,8 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.format.TableStatsCollector;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import java.util.ArrayList;
@@ -29,6 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.IntStream;
/** Random {@link DataFileMeta} generator. */
public class DataFileTestDataGenerator {
@@ -100,9 +103,17 @@ public class DataFileTestDataGenerator {
private Data createDataFile(List<KeyValue> kvs, int level, BinaryRow
partition, int bucket) {
TableStatsCollector keyStatsCollector =
- new TableStatsCollector(TestKeyValueGenerator.KEY_TYPE);
+ new TableStatsCollector(
+ TestKeyValueGenerator.KEY_TYPE,
+ IntStream.range(0,
TestKeyValueGenerator.KEY_TYPE.getFieldCount())
+ .mapToObj(i -> new FullFieldStatsCollector())
+ .toArray(FieldStatsCollector[]::new));
TableStatsCollector valueStatsCollector =
- new
TableStatsCollector(TestKeyValueGenerator.DEFAULT_ROW_TYPE);
+ new TableStatsCollector(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ IntStream.range(0,
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFieldCount())
+ .mapToObj(i -> new FullFieldStatsCollector())
+ .toArray(FieldStatsCollector[]::new));
FieldStatsArraySerializer keyStatsSerializer =
new FieldStatsArraySerializer(TestKeyValueGenerator.KEY_TYPE);
FieldStatsArraySerializer valueStatsSerializer =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 2a236c966..5ec574605 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.stats.StatsTestUtils;
@@ -255,7 +256,7 @@ public class KeyValueFileReadWriteTest {
new FlushingFileFormat(format),
pathFactory,
suggestedFileSize)
- .build(BinaryRow.EMPTY_ROW, 0, null, null);
+ .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(new
Options()));
}
private KeyValueFileReaderFactory createReaderFactory(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index df183a535..7b273b693 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -25,10 +25,13 @@ import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.StatsUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.junit.jupiter.api.io.TempDir;
@@ -37,6 +40,8 @@ import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.FileFormatType;
import static org.assertj.core.api.Assertions.assertThat;
@@ -74,10 +79,22 @@ public class RollingFileWriterTest {
.toString())
.newPath(),
SCHEMA,
-
fileFormat.createStatsExtractor(SCHEMA).orElse(null),
+ fileFormat
+ .createStatsExtractor(
+ SCHEMA,
+ IntStream.range(0,
SCHEMA.getFieldCount())
+ .mapToObj(
+ i ->
+
new FullFieldStatsCollector())
+ .toArray(
+
FieldStatsCollector[]::new))
+ .orElse(null),
0L,
new LongCounter(0),
-
CoreOptions.FILE_COMPRESSION.defaultValue()),
+
CoreOptions.FILE_COMPRESSION.defaultValue(),
+ StatsUtils.getFieldsStatsMode(
+ new CoreOptions(new
HashMap<>()),
+ SCHEMA.getFieldNames())),
TARGET_FILE_SIZE);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index e66a6eccb..a4ea867fa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -32,10 +32,12 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
@@ -134,7 +136,10 @@ public abstract class ManifestFileMetaTestBase {
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString()),
Long.MAX_VALUE,
- null)
+ null,
+ StatsUtils.getFieldsStatsMode(
+ new CoreOptions(new HashMap<>()),
+ getPartitionType().getFieldNames()))
.create();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 5be026b7c..a664fb2d3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -29,12 +29,14 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsUtils;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -108,7 +110,10 @@ public class ManifestFileTest {
avro,
pathFactory,
suggestedFileSize,
- null)
+ null,
+ StatsUtils.getFieldsStatsMode(
+ new CoreOptions(new HashMap<>()),
+ DEFAULT_PART_TYPE.getFieldNames()))
.create();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 81098d767..c09d4e64c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -23,6 +23,8 @@ import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.format.TableStatsCollector;
import org.apache.paimon.io.DataFileTestDataGenerator;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.utils.Preconditions;
@@ -32,6 +34,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.IntStream;
/** Random {@link ManifestEntry} generator. */
public class ManifestTestDataGenerator {
@@ -83,7 +86,11 @@ public class ManifestTestDataGenerator {
!entries.isEmpty(), "Manifest entries are empty. Invalid test
data.");
TableStatsCollector collector =
- new
TableStatsCollector(TestKeyValueGenerator.DEFAULT_PART_TYPE);
+ new TableStatsCollector(
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ IntStream.range(0,
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFieldCount())
+ .mapToObj(i -> new FullFieldStatsCollector())
+ .toArray(FieldStatsCollector[]::new));
FieldStatsArraySerializer serializer =
new
FieldStatsArraySerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index e4b074a51..114ff92a3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
@@ -32,6 +33,7 @@ import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -234,7 +236,7 @@ public class LookupLevelsTest {
new FlushingFileFormat("avro"),
new FileStorePathFactory(path),
TARGET_FILE_SIZE.defaultValue().getBytes())
- .build(BinaryRow.EMPTY_ROW, 0, null, null);
+ .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(new
Options()));
}
private KeyValueFileReaderFactory createReaderFactory() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index fb4533a27..fd36afa8a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -187,13 +187,15 @@ public abstract class MergeTreeTestBase {
BinaryRow.EMPTY_ROW,
0,
options.fileCompressionPerLevel(),
- options.fileCompression());
+ options.fileCompression(),
+ options);
compactWriterFactory =
writerFactoryBuilder.build(
BinaryRow.EMPTY_ROW,
0,
options.fileCompressionPerLevel(),
- options.fileCompression());
+ options.fileCompression(),
+ options);
writer = createMergeTreeWriter(Collections.emptyList());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
b/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
index 57f137d01..3701ce166 100644
---
a/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.format.TableStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
@@ -30,6 +31,8 @@ import org.apache.paimon.types.VarCharType;
import org.junit.jupiter.api.Test;
+import java.util.stream.IntStream;
+
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link TableStatsCollector}. */
@@ -39,7 +42,12 @@ public class TableStatsCollectorTest {
public void testCollect() {
RowType rowType =
RowType.of(new IntType(), new VarCharType(10), new
ArrayType(new IntType()));
- TableStatsCollector collector = new TableStatsCollector(rowType);
+ TableStatsCollector collector =
+ new TableStatsCollector(
+ rowType,
+ IntStream.range(0, rowType.getFieldCount())
+ .mapToObj(i -> new FullFieldStatsCollector())
+ .toArray(FullFieldStatsCollector[]::new));
collector.collect(
GenericRow.of(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
index 2146c1bca..9314ac500 100644
---
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
+++
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
@@ -26,9 +26,11 @@ import org.apache.paimon.format.TableStatsCollector;
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
import java.util.List;
@@ -41,10 +43,16 @@ public class TestTableStatsExtractor implements
TableStatsExtractor {
private final FileFormat format;
private final RowType rowType;
+ private final FieldStatsCollector[] stats;
- public TestTableStatsExtractor(FileFormat format, RowType rowType) {
+ public TestTableStatsExtractor(
+ FileFormat format, RowType rowType, FieldStatsCollector[] stats) {
this.format = format;
this.rowType = rowType;
+ this.stats = stats;
+ Preconditions.checkArgument(
+ rowType.getFieldCount() == stats.length,
+ "The stats collector is not aligned to write schema.");
}
@Override
@@ -53,7 +61,8 @@ public class TestTableStatsExtractor implements
TableStatsExtractor {
FormatReaderFactory readerFactory =
format.createReaderFactory(rowType);
List<InternalRow> records =
FileUtils.readListFromFile(fileIO, path, serializer,
readerFactory);
- TableStatsCollector statsCollector = new TableStatsCollector(rowType);
+
+ TableStatsCollector statsCollector = new TableStatsCollector(rowType,
stats);
for (InternalRow record : records) {
statsCollector.collect(record);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
new file mode 100644
index 000000000..00b4ec0c3
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
+import org.apache.paimon.statistics.TruncateFieldStatsCollector;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+/** Test for {@link StatsUtils}. */
+public class FieldStatsCollectorUtilsTest {
+ @Test
+ public void testFieldStats() {
+ RowType type =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "a", new VarCharType(),
"Someone's desc."),
+ new DataField(1, "b", new TimestampType()),
+ new DataField(2, "c", new CharType())));
+
+ Options options = new Options();
+ options.set(
+ CoreOptions.FIELD_STATS_MODE_PREFIX + ".b." +
CoreOptions.FIELD_STATS_MODE_SUFFIX,
+ "truncate(12)");
+ options.set(
+ CoreOptions.FIELD_STATS_MODE_PREFIX + ".c." +
CoreOptions.FIELD_STATS_MODE_SUFFIX,
+ "full");
+
+ FieldStatsCollector[] stats =
+ StatsUtils.getFieldsStatsMode(new CoreOptions(options),
type.getFieldNames());
+ Assertions.assertEquals(3, stats.length);
+ Assertions.assertEquals(16, ((TruncateFieldStatsCollector)
stats[0]).getLength());
+ Assertions.assertEquals(12, ((TruncateFieldStatsCollector)
stats[1]).getLength());
+ Assertions.assertTrue(stats[2] instanceof FullFieldStatsCollector);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 5677973c6..b5a4d1db9 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -33,6 +33,7 @@ import org.apache.paimon.format.orc.writer.RowDataVectorizer;
import org.apache.paimon.format.orc.writer.Vectorizer;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -86,8 +87,9 @@ public class OrcFileFormat extends FileFormat {
}
@Override
- public Optional<TableStatsExtractor> createStatsExtractor(RowType type) {
- return Optional.of(new OrcTableStatsExtractor(type));
+ public Optional<TableStatsExtractor> createStatsExtractor(
+ RowType type, FieldStatsCollector[] stats) {
+ return Optional.of(new OrcTableStatsExtractor(type, stats));
}
@Override
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
index 800024547..411b52e63 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
@@ -26,6 +26,7 @@ import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.format.orc.OrcReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.RowType;
@@ -53,9 +54,14 @@ import java.util.stream.IntStream;
public class OrcTableStatsExtractor implements TableStatsExtractor {
private final RowType rowType;
+ private final FieldStatsCollector[] stats;
- public OrcTableStatsExtractor(RowType rowType) {
+ public OrcTableStatsExtractor(RowType rowType, FieldStatsCollector[]
stats) {
this.rowType = rowType;
+ this.stats = stats;
+ Preconditions.checkArgument(
+ rowType.getFieldCount() == stats.length,
+ "The stats collector is not aligned to write schema.");
}
@Override
@@ -74,17 +80,18 @@ public class OrcTableStatsExtractor implements
TableStatsExtractor {
DataField field = rowType.getFields().get(i);
int fieldIdx =
columnNames.indexOf(field.name());
int colId = columnTypes.get(fieldIdx).getId();
- return toFieldStats(field,
columnStatistics[colId], rowCount);
+ return toFieldStats(field,
columnStatistics[colId], rowCount, i);
})
.toArray(FieldStats[]::new);
}
}
- private FieldStats toFieldStats(DataField field, ColumnStatistics stats,
long rowCount) {
+ private FieldStats toFieldStats(
+ DataField field, ColumnStatistics stats, long rowCount, int idx) {
long nullCount = rowCount - stats.getNumberOfValues();
if (nullCount == rowCount) {
// all nulls
- return new FieldStats(null, null, nullCount);
+ return this.stats[idx].convert(new FieldStats(null, null,
nullCount));
}
Preconditions.checkState(
(nullCount > 0) == stats.hasNull(),
@@ -94,86 +101,120 @@ public class OrcTableStatsExtractor implements
TableStatsExtractor {
+ stats.hasNull()
+ "!");
+ FieldStats fieldStats;
switch (field.type().getTypeRoot()) {
case CHAR:
case VARCHAR:
assertStatsClass(field, stats, StringColumnStatistics.class);
StringColumnStatistics stringStats = (StringColumnStatistics)
stats;
- return new FieldStats(
- BinaryString.fromString(stringStats.getMinimum()),
- BinaryString.fromString(stringStats.getMaximum()),
- nullCount);
+ fieldStats =
+ new FieldStats(
+
BinaryString.fromString(stringStats.getMinimum()),
+
BinaryString.fromString(stringStats.getMaximum()),
+ nullCount);
+ break;
case BOOLEAN:
assertStatsClass(field, stats, BooleanColumnStatistics.class);
BooleanColumnStatistics boolStats = (BooleanColumnStatistics)
stats;
- return new FieldStats(
- boolStats.getFalseCount() == 0,
boolStats.getTrueCount() != 0, nullCount);
+ fieldStats =
+ new FieldStats(
+ boolStats.getFalseCount() == 0,
+ boolStats.getTrueCount() != 0,
+ nullCount);
+ break;
case DECIMAL:
assertStatsClass(field, stats, DecimalColumnStatistics.class);
DecimalColumnStatistics decimalStats =
(DecimalColumnStatistics) stats;
DecimalType decimalType = (DecimalType) (field.type());
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
- return new FieldStats(
- Decimal.fromBigDecimal(
- decimalStats.getMinimum().bigDecimalValue(),
precision, scale),
- Decimal.fromBigDecimal(
- decimalStats.getMaximum().bigDecimalValue(),
precision, scale),
- nullCount);
+ fieldStats =
+ new FieldStats(
+ Decimal.fromBigDecimal(
+
decimalStats.getMinimum().bigDecimalValue(),
+ precision,
+ scale),
+ Decimal.fromBigDecimal(
+
decimalStats.getMaximum().bigDecimalValue(),
+ precision,
+ scale),
+ nullCount);
+ break;
case TINYINT:
assertStatsClass(field, stats, IntegerColumnStatistics.class);
IntegerColumnStatistics byteStats = (IntegerColumnStatistics)
stats;
- return new FieldStats(
- (byte) byteStats.getMinimum(), (byte)
byteStats.getMaximum(), nullCount);
+ fieldStats =
+ new FieldStats(
+ (byte) byteStats.getMinimum(),
+ (byte) byteStats.getMaximum(),
+ nullCount);
+ break;
case SMALLINT:
assertStatsClass(field, stats, IntegerColumnStatistics.class);
IntegerColumnStatistics shortStats = (IntegerColumnStatistics)
stats;
- return new FieldStats(
- (short) shortStats.getMinimum(),
- (short) shortStats.getMaximum(),
- nullCount);
+ fieldStats =
+ new FieldStats(
+ (short) shortStats.getMinimum(),
+ (short) shortStats.getMaximum(),
+ nullCount);
+ break;
case INTEGER:
case TIME_WITHOUT_TIME_ZONE:
assertStatsClass(field, stats, IntegerColumnStatistics.class);
IntegerColumnStatistics intStats = (IntegerColumnStatistics)
stats;
- return new FieldStats(
- Long.valueOf(intStats.getMinimum()).intValue(),
- Long.valueOf(intStats.getMaximum()).intValue(),
- nullCount);
+ fieldStats =
+ new FieldStats(
+ Long.valueOf(intStats.getMinimum()).intValue(),
+ Long.valueOf(intStats.getMaximum()).intValue(),
+ nullCount);
+ break;
case BIGINT:
assertStatsClass(field, stats, IntegerColumnStatistics.class);
IntegerColumnStatistics longStats = (IntegerColumnStatistics)
stats;
- return new FieldStats(longStats.getMinimum(),
longStats.getMaximum(), nullCount);
+ fieldStats =
+ new FieldStats(longStats.getMinimum(),
longStats.getMaximum(), nullCount);
+ break;
case FLOAT:
assertStatsClass(field, stats, DoubleColumnStatistics.class);
DoubleColumnStatistics floatStats = (DoubleColumnStatistics)
stats;
- return new FieldStats(
- (float) floatStats.getMinimum(),
- (float) floatStats.getMaximum(),
- nullCount);
+ fieldStats =
+ new FieldStats(
+ (float) floatStats.getMinimum(),
+ (float) floatStats.getMaximum(),
+ nullCount);
+ break;
case DOUBLE:
assertStatsClass(field, stats, DoubleColumnStatistics.class);
DoubleColumnStatistics doubleStats = (DoubleColumnStatistics)
stats;
- return new FieldStats(
- doubleStats.getMinimum(), doubleStats.getMaximum(),
nullCount);
+ fieldStats =
+ new FieldStats(
+ doubleStats.getMinimum(),
doubleStats.getMaximum(), nullCount);
+ break;
case DATE:
assertStatsClass(field, stats, DateColumnStatistics.class);
DateColumnStatistics dateStats = (DateColumnStatistics) stats;
- return new FieldStats(
- DateTimeUtils.toInternal(new
Date(dateStats.getMinimum().getTime())),
- DateTimeUtils.toInternal(new
Date(dateStats.getMaximum().getTime())),
- nullCount);
+ fieldStats =
+ new FieldStats(
+ DateTimeUtils.toInternal(
+ new
Date(dateStats.getMinimum().getTime())),
+ DateTimeUtils.toInternal(
+ new
Date(dateStats.getMaximum().getTime())),
+ nullCount);
+ break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
assertStatsClass(field, stats,
TimestampColumnStatistics.class);
TimestampColumnStatistics timestampStats =
(TimestampColumnStatistics) stats;
- return new FieldStats(
-
Timestamp.fromSQLTimestamp(timestampStats.getMinimum()),
-
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
- nullCount);
+ fieldStats =
+ new FieldStats(
+
Timestamp.fromSQLTimestamp(timestampStats.getMinimum()),
+
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
+ nullCount);
+ break;
default:
- return new FieldStats(null, null, nullCount);
+ fieldStats = new FieldStats(null, null, nullCount);
}
+ return this.stats[idx].convert(fieldStats);
}
private void assertStatsClass(
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index dea46559b..1bb0bc2fd 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -27,6 +27,7 @@ import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Projection;
@@ -72,8 +73,9 @@ public class ParquetFileFormat extends FileFormat {
}
@Override
- public Optional<TableStatsExtractor> createStatsExtractor(RowType type) {
- return Optional.of(new ParquetTableStatsExtractor(type));
+ public Optional<TableStatsExtractor> createStatsExtractor(
+ RowType type, FieldStatsCollector[] stats) {
+ return Optional.of(new ParquetTableStatsExtractor(type, stats));
}
public static Options getParquetConfiguration(Options options) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
index 115d9879c..587b3dfc2 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
@@ -25,11 +25,13 @@ import org.apache.paimon.format.FieldStats;
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Preconditions;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.BooleanStatistics;
@@ -58,9 +60,14 @@ public class ParquetTableStatsExtractor implements
TableStatsExtractor {
private final RowType rowType;
private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+ private final FieldStatsCollector[] stats;
- public ParquetTableStatsExtractor(RowType rowType) {
+ public ParquetTableStatsExtractor(RowType rowType, FieldStatsCollector[]
stats) {
this.rowType = rowType;
+ this.stats = stats;
+ Preconditions.checkArgument(
+ rowType.getFieldCount() == stats.length,
+ "The stats collector is not aligned to write schema.");
}
@Override
@@ -71,79 +78,100 @@ public class ParquetTableStatsExtractor implements
TableStatsExtractor {
.mapToObj(
i -> {
DataField field = rowType.getFields().get(i);
- return toFieldStats(field,
stats.get(field.name()));
+ return toFieldStats(field,
stats.get(field.name()), i);
})
.toArray(FieldStats[]::new);
}
- private FieldStats toFieldStats(DataField field, Statistics<?> stats) {
+ private FieldStats toFieldStats(DataField field, Statistics<?> stats, int
idx) {
if (stats == null) {
return new FieldStats(null, null, null);
}
long nullCount = stats.getNumNulls();
if (!stats.hasNonNullValue()) {
- return new FieldStats(null, null, nullCount);
+ return this.stats[idx].convert(new FieldStats(null, null,
nullCount));
}
+ FieldStats fieldStats;
switch (field.type().getTypeRoot()) {
case CHAR:
case VARCHAR:
assertStatsClass(field, stats, BinaryStatistics.class);
BinaryStatistics stringStats = (BinaryStatistics) stats;
- return new FieldStats(
- BinaryString.fromString(stringStats.minAsString()),
- BinaryString.fromString(stringStats.maxAsString()),
- nullCount);
+ fieldStats =
+ new FieldStats(
+
BinaryString.fromString(stringStats.minAsString()),
+
BinaryString.fromString(stringStats.maxAsString()),
+ nullCount);
+ break;
case BOOLEAN:
assertStatsClass(field, stats, BooleanStatistics.class);
BooleanStatistics boolStats = (BooleanStatistics) stats;
- return new FieldStats(boolStats.getMin(), boolStats.getMax(),
nullCount);
+ fieldStats = new FieldStats(boolStats.getMin(),
boolStats.getMax(), nullCount);
+ break;
case DECIMAL:
PrimitiveType primitive = stats.type();
DecimalType decimalType = (DecimalType) (field.type());
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
- return convertStatsToDecimalFieldStats(
- primitive, field, stats, precision, scale, nullCount);
+ fieldStats =
+ convertStatsToDecimalFieldStats(
+ primitive, field, stats, precision, scale,
nullCount);
+ break;
case TINYINT:
assertStatsClass(field, stats, IntStatistics.class);
IntStatistics byteStats = (IntStatistics) stats;
- return new FieldStats(
- (byte) byteStats.getMin(), (byte) byteStats.getMax(),
nullCount);
+ fieldStats =
+ new FieldStats(
+ (byte) byteStats.getMin(), (byte)
byteStats.getMax(), nullCount);
+ break;
case SMALLINT:
assertStatsClass(field, stats, IntStatistics.class);
IntStatistics shortStats = (IntStatistics) stats;
- return new FieldStats(
- (short) shortStats.getMin(), (short)
shortStats.getMax(), nullCount);
+ fieldStats =
+ new FieldStats(
+ (short) shortStats.getMin(),
+ (short) shortStats.getMax(),
+ nullCount);
+ break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
assertStatsClass(field, stats, IntStatistics.class);
IntStatistics intStats = (IntStatistics) stats;
- return new FieldStats(
- Long.valueOf(intStats.getMin()).intValue(),
- Long.valueOf(intStats.getMax()).intValue(),
- nullCount);
+ fieldStats =
+ new FieldStats(
+ Long.valueOf(intStats.getMin()).intValue(),
+ Long.valueOf(intStats.getMax()).intValue(),
+ nullCount);
+ break;
case BIGINT:
assertStatsClass(field, stats, LongStatistics.class);
LongStatistics longStats = (LongStatistics) stats;
- return new FieldStats(longStats.getMin(), longStats.getMax(),
nullCount);
+ fieldStats = new FieldStats(longStats.getMin(),
longStats.getMax(), nullCount);
+ break;
case FLOAT:
assertStatsClass(field, stats, FloatStatistics.class);
FloatStatistics floatStats = (FloatStatistics) stats;
- return new FieldStats(floatStats.getMin(),
floatStats.getMax(), nullCount);
+ fieldStats = new FieldStats(floatStats.getMin(),
floatStats.getMax(), nullCount);
+ break;
case DOUBLE:
assertStatsClass(field, stats, DoubleStatistics.class);
DoubleStatistics doubleStats = (DoubleStatistics) stats;
- return new FieldStats(doubleStats.getMin(),
doubleStats.getMax(), nullCount);
+ fieldStats = new FieldStats(doubleStats.getMin(),
doubleStats.getMax(), nullCount);
+ break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
- return toTimestampStats(stats, ((TimestampType)
field.type()).getPrecision());
+ fieldStats = toTimestampStats(stats, ((TimestampType)
field.type()).getPrecision());
+ break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return toTimestampStats(
- stats, ((LocalZonedTimestampType)
field.type()).getPrecision());
+ fieldStats =
+ toTimestampStats(
+ stats, ((LocalZonedTimestampType)
field.type()).getPrecision());
+ break;
default:
- return new FieldStats(null, null, nullCount);
+ fieldStats = new FieldStats(null, null, nullCount);
}
+ return this.stats[idx].convert(fieldStats);
}
private FieldStats toTimestampStats(Statistics<?> stats, int precision) {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTableStatsExtractorTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTableStatsExtractorTest.java
index d5a899b65..270d36fac 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTableStatsExtractorTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTableStatsExtractorTest.java
@@ -19,7 +19,7 @@
package org.apache.paimon.format.orc;
import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.TableStatsExtractorTestBase;
+import org.apache.paimon.format.TableFieldStatsExtractorTestBaseCollector;
import org.apache.paimon.format.orc.filter.OrcTableStatsExtractor;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.ArrayType;
@@ -44,7 +44,7 @@ import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
/** Tests for {@link OrcTableStatsExtractor}. */
-public class OrcTableStatsExtractorTest extends TableStatsExtractorTestBase {
+public class OrcTableStatsExtractorTest extends
TableFieldStatsExtractorTestBaseCollector {
@Override
protected FileFormat createFormat() {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java
index 5f10de670..be5c321d6 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractorTest.java
@@ -20,7 +20,7 @@ package org.apache.paimon.format.parquet;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.TableStatsExtractorTestBase;
+import org.apache.paimon.format.TableFieldStatsExtractorTestBaseCollector;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
@@ -43,7 +43,7 @@ import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
/** Tests for {@link ParquetTableStatsExtractor}. */
-public class ParquetTableStatsExtractorTest extends
TableStatsExtractorTestBase {
+public class ParquetTableStatsExtractorTest extends
TableFieldStatsExtractorTestBaseCollector {
@Override
protected FileFormat createFormat() {