This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 335baef7640f4329a8778f87a638dd8c67b81cbb Author: JingsongLi <[email protected]> AuthorDate: Tue Jul 4 11:50:33 2023 +0800 [bug] Fix NPE for AppendOnlyWriter of statsCollectors --- .../main/java/org/apache/paimon/format/FileFormat.java | 2 +- .../java/org/apache/paimon/append/AppendOnlyWriter.java | 8 ++++---- .../paimon/io/StatsCollectingSingleFileWriter.java | 8 ++++---- .../paimon/operation/AppendOnlyFileStoreWrite.java | 8 ++++---- .../org/apache/paimon/format/orc/OrcFileFormat.java | 4 ++-- .../format/orc/filter/OrcTableStatsExtractor.java | 12 ++++++------ .../apache/paimon/format/parquet/ParquetFileFormat.java | 4 ++-- .../format/parquet/ParquetTableStatsExtractor.java | 17 ++++++++--------- 8 files changed, 31 insertions(+), 32 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index a6ba21c50..9527cf140 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -77,7 +77,7 @@ public abstract class FileFormat { } public Optional<TableStatsExtractor> createStatsExtractor( - RowType type, FieldStatsCollector[] stats) { + RowType type, FieldStatsCollector[] statsCollectors) { return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 98c05cda4..9cdec0f0b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -63,9 +63,9 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> { private final List<DataFileMeta> compactAfter; private final LongCounter seqNumCounter; private final String fileCompression; + private final FieldStatsCollector[] statsCollectors; private RowDataRollingFileWriter writer; - private FieldStatsCollector[] stats; public AppendOnlyWriter( FileIO fileIO, @@ -79,7 +79,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> { DataFilePathFactory pathFactory, @Nullable CommitIncrement increment, String fileCompression, - FieldStatsCollector[] stats) { + FieldStatsCollector[] statsCollectors) { this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; @@ -93,6 +93,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> { this.compactAfter = new ArrayList<>(); this.seqNumCounter = new LongCounter(maxSequenceNumber + 1); this.fileCompression = fileCompression; + this.statsCollectors = statsCollectors; this.writer = createRollingRowWriter(); @@ -101,7 +102,6 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> { compactBefore.addAll(increment.compactIncrement().compactBefore()); compactAfter.addAll(increment.compactIncrement().compactAfter()); } - this.stats = stats; } @Override @@ -178,7 +178,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> { pathFactory, seqNumCounter, fileCompression, - stats); + statsCollectors); } private void trySyncLatestCompaction(boolean blocking) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java index 0fd5e5c1f..ce5d57289 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java @@ -57,16 +57,16 @@ public abstract class StatsCollectingSingleFileWriter<T, R> extends SingleFileWr RowType writeSchema, @Nullable TableStatsExtractor tableStatsExtractor, String compression, - FieldStatsCollector[] stats) { + FieldStatsCollector[] statsCollectors) { super(fileIO, factory, path, converter, compression); this.tableStatsExtractor = tableStatsExtractor; this.isStatsCollectorDisabled = - Arrays.stream(stats).allMatch(p -> p instanceof NoneFieldStatsCollector); + Arrays.stream(statsCollectors).allMatch(p -> p instanceof NoneFieldStatsCollector); if (this.tableStatsExtractor == null) { - this.tableStatsCollector = new TableStatsCollector(writeSchema, stats); + this.tableStatsCollector = new TableStatsCollector(writeSchema, statsCollectors); } Preconditions.checkArgument( - stats.length == writeSchema.getFieldCount(), + statsCollectors.length == writeSchema.getFieldCount(), "The stats collector is not aligned to write schema."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 8efd1fd6d..f22572676 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -69,7 +69,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow private final String fileCompression; private boolean skipCompaction; private BucketMode bucketMode = BucketMode.FIXED; - private FieldStatsCollector[] stats; + private FieldStatsCollector[] statsCollectors; public AppendOnlyFileStoreWrite( FileIO fileIO, @@ -95,7 +95,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow this.skipCompaction = options.writeOnly(); this.assertDisorder = options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER); this.fileCompression = options.fileCompression(); - this.stats = StatsUtils.getFieldsStatsMode(options, rowType.getFieldNames()); + this.statsCollectors = StatsUtils.getFieldsStatsMode(options, rowType.getFieldNames()); } @Override @@ -133,7 +133,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow factory, restoreIncrement, fileCompression, - stats); + statsCollectors); } public AppendOnlyCompactManager.CompactRewriter compactRewriter( @@ -152,7 +152,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow pathFactory.createDataFilePathFactory(partition, bucket), new LongCounter(toCompact.get(0).minSequenceNumber()), fileCompression, - stats); + statsCollectors); rewriter.write( new RecordReaderIterator<>( read.createReader( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index b5a4d1db9..8b3dc59a1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -88,8 +88,8 @@ public class OrcFileFormat extends FileFormat { @Override public Optional<TableStatsExtractor> createStatsExtractor( - RowType type, FieldStatsCollector[] stats) { - return Optional.of(new OrcTableStatsExtractor(type, stats)); + RowType type, FieldStatsCollector[] statsCollectors) { + return Optional.of(new OrcTableStatsExtractor(type, statsCollectors)); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java index 411b52e63..d394a11de 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java @@ -54,13 +54,13 @@ import java.util.stream.IntStream; public class OrcTableStatsExtractor implements TableStatsExtractor { private final RowType rowType; - private final FieldStatsCollector[] stats; + private final FieldStatsCollector[] statsCollectors; - public OrcTableStatsExtractor(RowType rowType, FieldStatsCollector[] stats) { + public OrcTableStatsExtractor(RowType rowType, FieldStatsCollector[] statsCollectors) { this.rowType = rowType; - this.stats = stats; + this.statsCollectors = statsCollectors; Preconditions.checkArgument( - rowType.getFieldCount() == stats.length, + rowType.getFieldCount() == statsCollectors.length, "The stats collector is not aligned to write schema."); } @@ -91,7 +91,7 @@ public class OrcTableStatsExtractor implements TableStatsExtractor { long nullCount = rowCount - stats.getNumberOfValues(); if (nullCount == rowCount) { // all nulls - return this.stats[idx].convert(new FieldStats(null, null, nullCount)); + return this.statsCollectors[idx].convert(new FieldStats(null, null, nullCount)); } Preconditions.checkState( (nullCount > 0) == stats.hasNull(), @@ -214,7 +214,7 @@ public class OrcTableStatsExtractor implements TableStatsExtractor { default: fieldStats = new FieldStats(null, null, nullCount); } - return this.stats[idx].convert(fieldStats); + return this.statsCollectors[idx].convert(fieldStats); } private void assertStatsClass( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java index 1bb0bc2fd..0eb7a9be3 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java @@ -74,8 +74,8 @@ public class ParquetFileFormat extends FileFormat { @Override public Optional<TableStatsExtractor> createStatsExtractor( - RowType type, FieldStatsCollector[] stats) { - return Optional.of(new ParquetTableStatsExtractor(type, stats)); + RowType type, FieldStatsCollector[] statsCollectors) { + return Optional.of(new ParquetTableStatsExtractor(type, statsCollectors)); } public static Options getParquetConfiguration(Options options) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java index 587b3dfc2..6bb50d851 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; -import java.time.LocalDate; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Map; @@ -57,16 +56,16 @@ import static org.apache.paimon.format.parquet.ParquetUtil.assertStatsClass; /** {@link TableStatsExtractor} for parquet files. */ public class ParquetTableStatsExtractor implements TableStatsExtractor { - private final RowType rowType; private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - private final FieldStatsCollector[] stats; - public ParquetTableStatsExtractor(RowType rowType, FieldStatsCollector[] stats) { + private final RowType rowType; + private final FieldStatsCollector[] statsCollectors; + + public ParquetTableStatsExtractor(RowType rowType, FieldStatsCollector[] statsCollectors) { this.rowType = rowType; - this.stats = stats; + this.statsCollectors = statsCollectors; Preconditions.checkArgument( - rowType.getFieldCount() == stats.length, + rowType.getFieldCount() == statsCollectors.length, "The stats collector is not aligned to write schema."); } @@ -89,7 +88,7 @@ public class ParquetTableStatsExtractor implements TableStatsExtractor { } long nullCount = stats.getNumNulls(); if (!stats.hasNonNullValue()) { - return this.stats[idx].convert(new FieldStats(null, null, nullCount)); + return this.statsCollectors[idx].convert(new FieldStats(null, null, nullCount)); } FieldStats fieldStats; @@ -171,7 +170,7 @@ public class ParquetTableStatsExtractor implements TableStatsExtractor { default: fieldStats = new FieldStats(null, null, nullCount); } - return this.stats[idx].convert(fieldStats); + return this.statsCollectors[idx].convert(fieldStats); } private FieldStats toTimestampStats(Statistics<?> stats, int precision) {
