This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a21a6a11f [core] Introduce FieldStatsCollector.Factory to fix reusing
bug
a21a6a11f is described below
commit a21a6a11feff5b79f0641b1c6e7929e071f48ec5
Author: JingsongLi <[email protected]>
AuthorDate: Tue Jul 4 13:40:52 2023 +0800
[core] Introduce FieldStatsCollector.Factory to fix reusing bug
---
.../java/org/apache/paimon/format/FileFormat.java | 2 +-
.../apache/paimon/format/TableStatsCollector.java | 8 ++---
.../paimon/statistics/FieldStatsCollector.java | 23 +++++++++++---
.../TableFieldStatsExtractorTestBaseCollector.java | 4 +--
.../java/org/apache/paimon/AbstractFileStore.java | 4 +--
.../main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../org/apache/paimon/append/AppendOnlyWriter.java | 4 +--
.../apache/paimon/io/KeyValueDataFileWriter.java | 4 +--
.../paimon/io/KeyValueFileWriterFactory.java | 4 +--
.../org/apache/paimon/io/RowDataFileWriter.java | 4 +--
.../apache/paimon/io/RowDataRollingFileWriter.java | 8 +++--
.../paimon/io/StatsCollectingSingleFileWriter.java | 9 ++----
.../org/apache/paimon/manifest/ManifestFile.java | 10 +++---
.../paimon/operation/AppendOnlyFileStoreWrite.java | 7 +++--
...ctorUtils.java => StatsCollectorFactories.java} | 18 ++++++++---
.../apache/paimon/append/AppendOnlyWriterTest.java | 4 +--
.../apache/paimon/format/FileFormatSuffixTest.java | 4 +--
.../format/FileStatsExtractingAvroFormat.java | 2 +-
.../paimon/io/DataFileTestDataGenerator.java | 14 ++++++---
.../paimon/io/KeyValueFileReadWriteTest.java | 36 ++++++++++------------
.../apache/paimon/io/RollingFileWriterTest.java | 16 +++-------
.../paimon/manifest/ManifestFileMetaTestBase.java | 4 +--
.../apache/paimon/manifest/ManifestFileTest.java | 4 +--
.../paimon/manifest/ManifestTestDataGenerator.java | 7 +++--
.../paimon/stats/TableStatsCollectorTest.java | 8 +++--
.../paimon/stats/TestTableStatsExtractor.java | 4 +--
...sTest.java => StatsCollectorFactoriesTest.java} | 9 +++---
.../apache/paimon/format/orc/OrcFileFormat.java | 2 +-
.../format/orc/filter/OrcTableStatsExtractor.java | 15 +++++----
.../paimon/format/parquet/ParquetFileFormat.java | 2 +-
.../format/parquet/ParquetTableStatsExtractor.java | 21 ++++++-------
31 files changed, 143 insertions(+), 120 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index 9527cf140..2197c513f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -77,7 +77,7 @@ public abstract class FileFormat {
}
public Optional<TableStatsExtractor> createStatsExtractor(
- RowType type, FieldStatsCollector[] statsCollectors) {
+ RowType type, FieldStatsCollector.Factory[] statsCollectors) {
return Optional.empty();
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
index 4e84a0309..c8f7322a1 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
@@ -34,15 +34,15 @@ public class TableStatsCollector {
private final FieldStatsCollector[] statsCollectors;
private final Serializer<Object>[] fieldSerializers;
- public TableStatsCollector(RowType rowType, FieldStatsCollector[]
statsCollectors) {
+ public TableStatsCollector(RowType rowType, FieldStatsCollector.Factory[]
collectorFactory) {
int numFields = rowType.getFieldCount();
checkArgument(
- numFields == statsCollectors.length,
+ numFields == collectorFactory.length,
"numFields %s should equal to stats length %s.",
numFields,
- statsCollectors.length);
+ collectorFactory.length);
+ this.statsCollectors = FieldStatsCollector.create(collectorFactory);
this.converter = new RowDataToObjectArrayConverter(rowType);
- this.statsCollectors = statsCollectors;
this.fieldSerializers = new Serializer[numFields];
for (int i = 0; i < numFields; i++) {
fieldSerializers[i] =
InternalSerializers.create(rowType.getTypeAt(i));
diff --git
a/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
b/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
index d77a2d266..99ce52285 100644
---
a/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
@@ -49,20 +49,33 @@ public interface FieldStatsCollector {
*/
FieldStats convert(FieldStats source);
- static FieldStatsCollector from(String option) {
+ /** Factory to create {@link FieldStatsCollector}. */
+ interface Factory {
+ FieldStatsCollector create();
+ }
+
+ static FieldStatsCollector[] create(FieldStatsCollector.Factory[]
factories) {
+ FieldStatsCollector[] collectors = new
FieldStatsCollector[factories.length];
+ for (int i = 0; i < factories.length; i++) {
+ collectors[i] = factories[i].create();
+ }
+ return collectors;
+ }
+
+ static Factory from(String option) {
String upper = option.toUpperCase();
switch (upper) {
case "NONE":
- return new NoneFieldStatsCollector();
+ return NoneFieldStatsCollector::new;
case "FULL":
- return new FullFieldStatsCollector();
+ return FullFieldStatsCollector::new;
case "COUNTS":
- return new CountsFieldStatsCollector();
+ return CountsFieldStatsCollector::new;
default:
Matcher matcher = TRUNCATE_PATTERN.matcher(upper);
if (matcher.matches()) {
String length = matcher.group(1);
- return new
TruncateFieldStatsCollector(Integer.parseInt(length));
+ return () -> new
TruncateFieldStatsCollector(Integer.parseInt(length));
}
throw new IllegalArgumentException("Unexpected option: " +
option);
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
b/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
index b3eda7c70..55c55625d 100644
---
a/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
+++
b/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
@@ -75,10 +75,10 @@ public abstract class
TableFieldStatsExtractorTestBaseCollector {
FileFormat format = createFormat();
RowType rowType = rowType();
int count = rowType().getFieldCount();
- FieldStatsCollector[] stats =
+ FieldStatsCollector.Factory[] stats =
IntStream.range(0, count)
.mapToObj(p -> FieldStatsCollector.from(mode))
- .toArray(FieldStatsCollector[]::new);
+ .toArray(FieldStatsCollector.Factory[]::new);
FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
Path path = new Path(tempDir.toString() + "/test");
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index c6f924f0f..bce0d9b1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -35,10 +35,10 @@ import org.apache.paimon.operation.TagFileKeeper;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsCollectorFactories;
import org.apache.paimon.utils.TagManager;
import javax.annotation.Nullable;
@@ -106,7 +106,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
pathFactory(),
options.manifestTargetSize().getBytes(),
forWrite ? writeManifestCache : null,
- FieldStatsCollectorUtils.getFieldsStatsMode(
+ StatsCollectorFactories.createStatsFactories(
options, partitionType.getFieldNames()));
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index eb127d872..010d6e169 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -698,7 +698,7 @@ public class CoreOptions implements Serializable {
public static final String STATS_MODE_SUFFIX = "stats-mode";
- public static final ConfigOption<String> STATS_MODE =
+ public static final ConfigOption<String> METADATA_STATS_MODE =
key("metadata." + STATS_MODE_SUFFIX)
.stringType()
.defaultValue("truncate(16)")
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9cdec0f0b..067520949 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -63,7 +63,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;
- private final FieldStatsCollector[] statsCollectors;
+ private final FieldStatsCollector.Factory[] statsCollectors;
private RowDataRollingFileWriter writer;
@@ -79,7 +79,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
DataFilePathFactory pathFactory,
@Nullable CommitIncrement increment,
String fileCompression,
- FieldStatsCollector[] statsCollectors) {
+ FieldStatsCollector.Factory[] statsCollectors) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 27cf60c4f..86ffc28fe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -31,7 +31,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
+import org.apache.paimon.utils.StatsCollectorFactories;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +88,7 @@ public class KeyValueDataFileWriter
KeyValue.schema(keyType, valueType),
tableStatsExtractor,
compression,
- FieldStatsCollectorUtils.getFieldsStatsMode(
+ StatsCollectorFactories.createStatsFactories(
options, KeyValue.schema(keyType,
valueType).getFieldNames()));
this.keyType = keyType;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 0c8f78cd5..f3912daf5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -29,8 +29,8 @@ import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsCollectorFactories;
import javax.annotation.Nullable;
@@ -187,7 +187,7 @@ public class KeyValueFileWriterFactory {
fileFormat
.createStatsExtractor(
recordType,
-
FieldStatsCollectorUtils.getFieldsStatsMode(
+
StatsCollectorFactories.createStatsFactories(
options,
recordType.getFieldNames()))
.orElse(null),
pathFactory.createDataFilePathFactory(partition, bucket),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 8cbbe8ed8..b3dd2d343 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -54,7 +54,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
long schemaId,
LongCounter seqNumCounter,
String fileCompression,
- FieldStatsCollector[] stats) {
+ FieldStatsCollector.Factory[] statsCollectors) {
super(
fileIO,
factory,
@@ -63,7 +63,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
writeSchema,
tableStatsExtractor,
fileCompression,
- stats);
+ statsCollectors);
this.schemaId = schemaId;
this.seqNumCounter = seqNumCounter;
this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 8b99b6389..e15f62248 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -38,7 +38,7 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
DataFilePathFactory pathFactory,
LongCounter seqNumCounter,
String fileCompression,
- FieldStatsCollector[] stats) {
+ FieldStatsCollector.Factory[] statsCollectors) {
super(
() ->
new RowDataFileWriter(
@@ -46,11 +46,13 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
fileFormat.createWriterFactory(writeSchema),
pathFactory.newPath(),
writeSchema,
- fileFormat.createStatsExtractor(writeSchema,
stats).orElse(null),
+ fileFormat
+ .createStatsExtractor(writeSchema,
statsCollectors)
+ .orElse(null),
schemaId,
seqNumCounter,
fileCompression,
- stats),
+ statsCollectors),
targetFileSize);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index ce5d57289..559b0f181 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -27,14 +27,12 @@ import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.statistics.FieldStatsCollector;
-import org.apache.paimon.statistics.NoneFieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Arrays;
import java.util.function.Function;
/**
@@ -47,7 +45,6 @@ public abstract class StatsCollectingSingleFileWriter<T, R>
extends SingleFileWr
@Nullable private final TableStatsExtractor tableStatsExtractor;
@Nullable private TableStatsCollector tableStatsCollector = null;
- private final boolean isStatsCollectorDisabled;
public StatsCollectingSingleFileWriter(
FileIO fileIO,
@@ -57,11 +54,9 @@ public abstract class StatsCollectingSingleFileWriter<T, R>
extends SingleFileWr
RowType writeSchema,
@Nullable TableStatsExtractor tableStatsExtractor,
String compression,
- FieldStatsCollector[] statsCollectors) {
+ FieldStatsCollector.Factory[] statsCollectors) {
super(fileIO, factory, path, converter, compression);
this.tableStatsExtractor = tableStatsExtractor;
- this.isStatsCollectorDisabled =
- Arrays.stream(statsCollectors).allMatch(p -> p instanceof
NoneFieldStatsCollector);
if (this.tableStatsExtractor == null) {
this.tableStatsCollector = new TableStatsCollector(writeSchema,
statsCollectors);
}
@@ -73,7 +68,7 @@ public abstract class StatsCollectingSingleFileWriter<T, R>
extends SingleFileWr
@Override
public void write(T record) throws IOException {
InternalRow rowData = writeImpl(record);
- if (tableStatsCollector != null && !isStatsCollectorDisabled) {
+ if (tableStatsCollector != null) {
tableStatsCollector.collect(rowData);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 562077920..e41184d1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -53,7 +53,7 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> {
private final RowType partitionType;
private final FormatWriterFactory writerFactory;
private final long suggestedFileSize;
- private final FieldStatsCollector[] partitionStats;
+ private final FieldStatsCollector.Factory[] partitionStats;
private ManifestFile(
FileIO fileIO,
@@ -65,7 +65,7 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> {
PathFactory pathFactory,
long suggestedFileSize,
@Nullable SegmentsCache<String> cache,
- FieldStatsCollector[] partitionStats) {
+ FieldStatsCollector.Factory[] partitionStats) {
super(fileIO, serializer, readerFactory, writerFactory, pathFactory,
cache);
this.schemaManager = schemaManager;
this.partitionType = partitionType;
@@ -116,7 +116,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
FormatWriterFactory factory,
Path path,
String fileCompression,
- FieldStatsCollector[] partitionStats) {
+ FieldStatsCollector.Factory[] partitionStats) {
super(ManifestFile.this.fileIO, factory, path, serializer::toRow,
fileCompression);
this.partitionStatsCollector = new
TableStatsCollector(partitionType, partitionStats);
@@ -166,7 +166,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;
@Nullable private final SegmentsCache<String> cache;
- private final FieldStatsCollector[] partitionStats;
+ private final FieldStatsCollector.Factory[] partitionStats;
public Factory(
FileIO fileIO,
@@ -176,7 +176,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
FileStorePathFactory pathFactory,
long suggestedFileSize,
@Nullable SegmentsCache<String> cache,
- FieldStatsCollector[] partitionStats) {
+ FieldStatsCollector.Factory[] partitionStats) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.partitionType = partitionType;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index eb24e4b5e..8019df5c5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -37,11 +37,11 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.LongCounter;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsCollectorFactories;
import javax.annotation.Nullable;
@@ -67,9 +67,10 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
private final boolean commitForceCompact;
private final boolean assertDisorder;
private final String fileCompression;
+ private final FieldStatsCollector.Factory[] statsCollectors;
+
private boolean skipCompaction;
private BucketMode bucketMode = BucketMode.FIXED;
- private FieldStatsCollector[] statsCollectors;
public AppendOnlyFileStoreWrite(
FileIO fileIO,
@@ -96,7 +97,7 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
this.assertDisorder =
options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER);
this.fileCompression = options.fileCompression();
this.statsCollectors =
- FieldStatsCollectorUtils.getFieldsStatsMode(options,
rowType.getFieldNames());
+ StatsCollectorFactories.createStatsFactories(options,
rowType.getFieldNames());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FieldStatsCollectorUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
similarity index 73%
rename from
paimon-core/src/main/java/org/apache/paimon/utils/FieldStatsCollectorUtils.java
rename to
paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
index 5828462d9..08b3a780c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FieldStatsCollectorUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
@@ -23,20 +23,22 @@ package org.apache.paimon.utils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
+import java.util.Arrays;
import java.util.List;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.STATS_MODE_SUFFIX;
import static org.apache.paimon.options.ConfigOptions.key;
-/** The stats utils. */
-public class FieldStatsCollectorUtils {
+/** The stats utils to create {@link FieldStatsCollector.Factory}s. */
+public class StatsCollectorFactories {
- public static FieldStatsCollector[] getFieldsStatsMode(
+ public static FieldStatsCollector.Factory[] createStatsFactories(
CoreOptions options, List<String> fields) {
Options cfg = options.toConfiguration();
- FieldStatsCollector[] modes = new FieldStatsCollector[fields.size()];
+ FieldStatsCollector.Factory[] modes = new
FieldStatsCollector.Factory[fields.size()];
for (int i = 0; i < fields.size(); i++) {
String fieldMode =
cfg.get(
@@ -48,9 +50,15 @@ public class FieldStatsCollectorUtils {
if (fieldMode != null) {
modes[i] = FieldStatsCollector.from(fieldMode);
} else {
- modes[i] =
FieldStatsCollector.from(cfg.get(CoreOptions.STATS_MODE));
+ modes[i] =
FieldStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE));
}
}
return modes;
}
+
+ public static FieldStatsCollector.Factory[] createFullStatsFactories(int
numFields) {
+ FieldStatsCollector.Factory[] factories = new
FieldStatsCollector.Factory[numFields];
+ Arrays.fill(factories, (FieldStatsCollector.Factory)
FullFieldStatsCollector::new);
+ return factories;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 2720252d0..171b725fa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -38,9 +38,9 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.StatsCollectorFactories;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -320,7 +320,7 @@ public class AppendOnlyWriterTest {
pathFactory,
null,
CoreOptions.FILE_COMPRESSION.defaultValue(),
- FieldStatsCollectorUtils.getFieldsStatsMode(
+ StatsCollectorFactories.createStatsFactories(
new CoreOptions(new HashMap<>()),
AppendOnlyWriterTest.SCHEMA.getFieldNames()));
return Pair.of(writer, compactManager.allFiles());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 604998bc8..09b36d2e6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -35,7 +35,7 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.CommitIncrement;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
+import org.apache.paimon.utils.StatsCollectorFactories;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -77,7 +77,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
dataFilePathFactory,
null,
CoreOptions.FILE_COMPRESSION.defaultValue(),
- FieldStatsCollectorUtils.getFieldsStatsMode(
+ StatsCollectorFactories.createStatsFactories(
new CoreOptions(new HashMap<>()),
SCHEMA.getFieldNames()));
appendOnlyWriter.write(
GenericRow.of(1, BinaryString.fromString("aaa"),
BinaryString.fromString("1")));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
index 218cd141e..0cdcaebe8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
@@ -57,7 +57,7 @@ public class FileStatsExtractingAvroFormat extends FileFormat
{
@Override
public Optional<TableStatsExtractor> createStatsExtractor(
- RowType type, FieldStatsCollector[] stats) {
+ RowType type, FieldStatsCollector.Factory[] stats) {
return Optional.of(new TestTableStatsExtractor(this, type, stats));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index c767f1d7b..6301fbc3d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -106,14 +106,20 @@ public class DataFileTestDataGenerator {
new TableStatsCollector(
TestKeyValueGenerator.KEY_TYPE,
IntStream.range(0,
TestKeyValueGenerator.KEY_TYPE.getFieldCount())
- .mapToObj(i -> new FullFieldStatsCollector())
- .toArray(FieldStatsCollector[]::new));
+ .mapToObj(
+ i ->
+ (FieldStatsCollector.Factory)
+
FullFieldStatsCollector::new)
+ .toArray(FieldStatsCollector.Factory[]::new));
TableStatsCollector valueStatsCollector =
new TableStatsCollector(
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
IntStream.range(0,
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFieldCount())
- .mapToObj(i -> new FullFieldStatsCollector())
- .toArray(FieldStatsCollector[]::new));
+ .mapToObj(
+ i ->
+ (FieldStatsCollector.Factory)
+
FullFieldStatsCollector::new)
+ .toArray(FieldStatsCollector.Factory[]::new));
FieldStatsArraySerializer keyStatsSerializer =
new FieldStatsArraySerializer(TestKeyValueGenerator.KEY_TYPE);
FieldStatsArraySerializer valueStatsSerializer =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 5ec574605..d612a1d1b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -55,6 +55,8 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
+import static org.apache.paimon.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
+import static org.apache.paimon.TestKeyValueGenerator.KEY_TYPE;
import static org.apache.paimon.TestKeyValueGenerator.createTestSchemaManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -97,12 +99,7 @@ public class KeyValueFileReadWriteTest {
writer.close();
List<DataFileMeta> actualMetas = writer.result();
- checkRollingFiles(
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.DEFAULT_ROW_TYPE,
- data.meta,
- actualMetas,
- writer.targetFileSize());
+ checkRollingFiles(data.meta, actualMetas, writer.targetFileSize());
KeyValueFileReaderFactory readerFactory =
createReaderFactory(tempDir.toString(), format, null, null);
@@ -245,18 +242,20 @@ public class KeyValueFileReadWriteTest {
format);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) +
1024;
FileIO fileIO = FileIOFinder.find(path);
+ Options options = new Options();
+ options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
return KeyValueFileWriterFactory.builder(
fileIO,
0,
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ KEY_TYPE,
+ DEFAULT_ROW_TYPE,
// normal format will buffer changes in memory and we
can't determine
// if the written file size is really larger than
suggested, so we use a
// special format which flushes for every added element
new FlushingFileFormat(format),
pathFactory,
suggestedFileSize)
- .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(new
Options()));
+ .build(BinaryRow.EMPTY_ROW, 0, null, null, new
CoreOptions(options));
}
private KeyValueFileReaderFactory createReaderFactory(
@@ -269,8 +268,8 @@ public class KeyValueFileReadWriteTest {
fileIO,
createTestSchemaManager(path),
0,
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ KEY_TYPE,
+ DEFAULT_ROW_TYPE,
ignore -> new FlushingFileFormat(format),
pathFactory,
new
TestKeyValueGenerator.TestKeyValueFieldsExtractor());
@@ -321,13 +320,10 @@ public class KeyValueFileReadWriteTest {
}
private void checkRollingFiles(
- RowType keyType,
- RowType valueType,
- DataFileMeta expected,
- List<DataFileMeta> actual,
- long suggestedFileSize) {
- FieldStatsArraySerializer keyStatsConverter = new
FieldStatsArraySerializer(keyType);
- FieldStatsArraySerializer valueStatsConverter = new
FieldStatsArraySerializer(valueType);
+ DataFileMeta expected, List<DataFileMeta> actual, long
suggestedFileSize) {
+ FieldStatsArraySerializer keyStatsConverter = new
FieldStatsArraySerializer(KEY_TYPE);
+ FieldStatsArraySerializer valueStatsConverter =
+ new FieldStatsArraySerializer(DEFAULT_ROW_TYPE);
// all but last file should be no smaller than suggestedFileSize
for (int i = 0; i + 1 < actual.size(); i++) {
@@ -345,14 +341,14 @@ public class KeyValueFileReadWriteTest {
assertThat(actual.get(actual.size() -
1).maxKey()).isEqualTo(expected.maxKey());
// check stats
- for (int i = 0; i < keyType.getFieldCount(); i++) {
+ for (int i = 0; i < KEY_TYPE.getFieldCount(); i++) {
int idx = i;
StatsTestUtils.checkRollingFileStats(
keyStatsConverter.fromBinary(expected.keyStats())[i],
actual,
m -> keyStatsConverter.fromBinary(m.keyStats())[idx]);
}
- for (int i = 0; i < valueType.getFieldCount(); i++) {
+ for (int i = 0; i < DEFAULT_ROW_TYPE.getFieldCount(); i++) {
int idx = i;
StatsTestUtils.checkRollingFileStats(
valueStatsConverter.fromBinary(expected.valueStats())[i],
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index e012c7ab1..78d1cbf99 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -25,13 +25,11 @@ import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
-import org.apache.paimon.statistics.FieldStatsCollector;
-import org.apache.paimon.statistics.FullFieldStatsCollector;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.StatsCollectorFactories;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.junit.jupiter.api.io.TempDir;
@@ -41,7 +39,6 @@ import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
-import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.FileFormatType;
import static org.assertj.core.api.Assertions.assertThat;
@@ -82,17 +79,14 @@ public class RollingFileWriterTest {
fileFormat
.createStatsExtractor(
SCHEMA,
- IntStream.range(0,
SCHEMA.getFieldCount())
- .mapToObj(
- i ->
-
new FullFieldStatsCollector())
- .toArray(
-
FieldStatsCollector[]::new))
+ StatsCollectorFactories
+
.createFullStatsFactories(
+
SCHEMA.getFieldCount()))
.orElse(null),
0L,
new LongCounter(0),
CoreOptions.FILE_COMPRESSION.defaultValue(),
-
FieldStatsCollectorUtils.getFieldsStatsMode(
+
StatsCollectorFactories.createStatsFactories(
new CoreOptions(new
HashMap<>()),
SCHEMA.getFieldNames())),
TARGET_FILE_SIZE);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index b953f88cd..5ff3f8734 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -31,8 +31,8 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsCollectorFactories;
import java.util.ArrayList;
import java.util.Arrays;
@@ -137,7 +137,7 @@ public abstract class ManifestFileMetaTestBase {
CoreOptions.FILE_FORMAT.defaultValue().toString()),
Long.MAX_VALUE,
null,
- FieldStatsCollectorUtils.getFieldsStatsMode(
+ StatsCollectorFactories.createStatsFactories(
new CoreOptions(new HashMap<>()),
getPartitionType().getFieldNames()))
.create();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 5989234b8..105914ebc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -28,8 +28,8 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.utils.FailingFileIO;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsCollectorFactories;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;
@@ -111,7 +111,7 @@ public class ManifestFileTest {
pathFactory,
suggestedFileSize,
null,
- FieldStatsCollectorUtils.getFieldsStatsMode(
+ StatsCollectorFactories.createStatsFactories(
new CoreOptions(new HashMap<>()),
DEFAULT_PART_TYPE.getFieldNames()))
.create();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 81b8ad1f7..e2ca01d92 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -89,8 +89,11 @@ public class ManifestTestDataGenerator {
new TableStatsCollector(
TestKeyValueGenerator.DEFAULT_PART_TYPE,
IntStream.range(0,
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldCount())
- .mapToObj(i -> new FullFieldStatsCollector())
- .toArray(FieldStatsCollector[]::new));
+ .mapToObj(
+ i ->
+ (FieldStatsCollector.Factory)
+
FullFieldStatsCollector::new)
+ .toArray(FieldStatsCollector.Factory[]::new));
FieldStatsArraySerializer serializer =
new
FieldStatsArraySerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
b/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
index 3701ce166..ca4d6660c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.format.TableStatsCollector;
+import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.statistics.FullFieldStatsCollector;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.IntType;
@@ -46,8 +47,11 @@ public class TableStatsCollectorTest {
new TableStatsCollector(
rowType,
IntStream.range(0, rowType.getFieldCount())
- .mapToObj(i -> new FullFieldStatsCollector())
- .toArray(FullFieldStatsCollector[]::new));
+ .mapToObj(
+ i ->
+ (FieldStatsCollector.Factory)
+
FullFieldStatsCollector::new)
+ .toArray(FieldStatsCollector.Factory[]::new));
collector.collect(
GenericRow.of(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
index 9314ac500..530fc9b5b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
+++
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
@@ -43,10 +43,10 @@ public class TestTableStatsExtractor implements
TableStatsExtractor {
private final FileFormat format;
private final RowType rowType;
- private final FieldStatsCollector[] stats;
+ private final FieldStatsCollector.Factory[] stats;
public TestTableStatsExtractor(
- FileFormat format, RowType rowType, FieldStatsCollector[] stats) {
+ FileFormat format, RowType rowType, FieldStatsCollector.Factory[]
stats) {
this.format = format;
this.rowType = rowType;
this.stats = stats;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/StatsCollectorFactoriesTest.java
similarity index 89%
rename from
paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/utils/StatsCollectorFactoriesTest.java
index e69b38bae..378a8faa5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/StatsCollectorFactoriesTest.java
@@ -36,8 +36,8 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
-/** Test for {@link FieldStatsCollectorUtils}. */
-public class FieldStatsCollectorUtilsTest {
+/** Test for {@link StatsCollectorFactories}. */
+public class StatsCollectorFactoriesTest {
@Test
public void testFieldStats() {
RowType type =
@@ -52,9 +52,10 @@ public class FieldStatsCollectorUtilsTest {
CoreOptions.FIELDS_PREFIX + ".b." +
CoreOptions.STATS_MODE_SUFFIX, "truncate(12)");
options.set(CoreOptions.FIELDS_PREFIX + ".c." +
CoreOptions.STATS_MODE_SUFFIX, "full");
- FieldStatsCollector[] stats =
- FieldStatsCollectorUtils.getFieldsStatsMode(
+ FieldStatsCollector.Factory[] statsFactories =
+ StatsCollectorFactories.createStatsFactories(
new CoreOptions(options), type.getFieldNames());
+ FieldStatsCollector[] stats =
FieldStatsCollector.create(statsFactories);
Assertions.assertEquals(3, stats.length);
Assertions.assertEquals(16, ((TruncateFieldStatsCollector)
stats[0]).getLength());
Assertions.assertEquals(12, ((TruncateFieldStatsCollector)
stats[1]).getLength());
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 8b3dc59a1..24edf5a95 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -88,7 +88,7 @@ public class OrcFileFormat extends FileFormat {
@Override
public Optional<TableStatsExtractor> createStatsExtractor(
- RowType type, FieldStatsCollector[] statsCollectors) {
+ RowType type, FieldStatsCollector.Factory[] statsCollectors) {
return Optional.of(new OrcTableStatsExtractor(type, statsCollectors));
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
index d394a11de..588e04f92 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
@@ -54,9 +54,9 @@ import java.util.stream.IntStream;
public class OrcTableStatsExtractor implements TableStatsExtractor {
private final RowType rowType;
- private final FieldStatsCollector[] statsCollectors;
+ private final FieldStatsCollector.Factory[] statsCollectors;
- public OrcTableStatsExtractor(RowType rowType, FieldStatsCollector[]
statsCollectors) {
+ public OrcTableStatsExtractor(RowType rowType,
FieldStatsCollector.Factory[] statsCollectors) {
this.rowType = rowType;
this.statsCollectors = statsCollectors;
Preconditions.checkArgument(
@@ -74,24 +74,27 @@ public class OrcTableStatsExtractor implements
TableStatsExtractor {
List<String> columnNames = schema.getFieldNames();
List<TypeDescription> columnTypes = schema.getChildren();
+ FieldStatsCollector[] collectors =
FieldStatsCollector.create(statsCollectors);
+
return IntStream.range(0, rowType.getFieldCount())
.mapToObj(
i -> {
DataField field = rowType.getFields().get(i);
int fieldIdx =
columnNames.indexOf(field.name());
int colId = columnTypes.get(fieldIdx).getId();
- return toFieldStats(field,
columnStatistics[colId], rowCount, i);
+ return toFieldStats(
+ field, columnStatistics[colId],
rowCount, collectors[i]);
})
.toArray(FieldStats[]::new);
}
}
private FieldStats toFieldStats(
- DataField field, ColumnStatistics stats, long rowCount, int idx) {
+ DataField field, ColumnStatistics stats, long rowCount,
FieldStatsCollector collector) {
long nullCount = rowCount - stats.getNumberOfValues();
if (nullCount == rowCount) {
// all nulls
- return this.statsCollectors[idx].convert(new FieldStats(null,
null, nullCount));
+ return collector.convert(new FieldStats(null, null, nullCount));
}
Preconditions.checkState(
(nullCount > 0) == stats.hasNull(),
@@ -214,7 +217,7 @@ public class OrcTableStatsExtractor implements
TableStatsExtractor {
default:
fieldStats = new FieldStats(null, null, nullCount);
}
- return this.statsCollectors[idx].convert(fieldStats);
+ return collector.convert(fieldStats);
}
private void assertStatsClass(
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index 0eb7a9be3..e39b9fe99 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -74,7 +74,7 @@ public class ParquetFileFormat extends FileFormat {
@Override
public Optional<TableStatsExtractor> createStatsExtractor(
- RowType type, FieldStatsCollector[] statsCollectors) {
+ RowType type, FieldStatsCollector.Factory[] statsCollectors) {
return Optional.of(new ParquetTableStatsExtractor(type,
statsCollectors));
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
index 6bb50d851..e8836b8fe 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
@@ -45,9 +45,6 @@ import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.time.Instant;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
import java.util.Map;
import java.util.stream.IntStream;
@@ -56,12 +53,11 @@ import static
org.apache.paimon.format.parquet.ParquetUtil.assertStatsClass;
/** {@link TableStatsExtractor} for parquet files. */
public class ParquetTableStatsExtractor implements TableStatsExtractor {
- private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-
private final RowType rowType;
- private final FieldStatsCollector[] statsCollectors;
+ private final FieldStatsCollector.Factory[] statsCollectors;
- public ParquetTableStatsExtractor(RowType rowType, FieldStatsCollector[]
statsCollectors) {
+ public ParquetTableStatsExtractor(
+ RowType rowType, FieldStatsCollector.Factory[] statsCollectors) {
this.rowType = rowType;
this.statsCollectors = statsCollectors;
Preconditions.checkArgument(
@@ -72,23 +68,24 @@ public class ParquetTableStatsExtractor implements
TableStatsExtractor {
@Override
public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
Map<String, Statistics<?>> stats =
ParquetUtil.extractColumnStats(fileIO, path);
-
+ FieldStatsCollector[] collectors =
FieldStatsCollector.create(statsCollectors);
return IntStream.range(0, rowType.getFieldCount())
.mapToObj(
i -> {
DataField field = rowType.getFields().get(i);
- return toFieldStats(field,
stats.get(field.name()), i);
+ return toFieldStats(field,
stats.get(field.name()), collectors[i]);
})
.toArray(FieldStats[]::new);
}
- private FieldStats toFieldStats(DataField field, Statistics<?> stats, int
idx) {
+ private FieldStats toFieldStats(
+ DataField field, Statistics<?> stats, FieldStatsCollector
collector) {
if (stats == null) {
return new FieldStats(null, null, null);
}
long nullCount = stats.getNumNulls();
if (!stats.hasNonNullValue()) {
- return this.statsCollectors[idx].convert(new FieldStats(null,
null, nullCount));
+ return collector.convert(new FieldStats(null, null, nullCount));
}
FieldStats fieldStats;
@@ -170,7 +167,7 @@ public class ParquetTableStatsExtractor implements
TableStatsExtractor {
default:
fieldStats = new FieldStats(null, null, nullCount);
}
- return this.statsCollectors[idx].convert(fieldStats);
+ return collector.convert(fieldStats);
}
private FieldStats toTimestampStats(Statistics<?> stats, int precision) {