This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 29a2c45635 Fix incorrect metrics in ParquetUtil (#8559)
29a2c45635 is described below
commit 29a2c456353a6120b8c882ed2ab544975b168d7b
Author: Yuya Ebihara <[email protected]>
AuthorDate: Tue Sep 19 21:21:49 2023 +0900
Fix incorrect metrics in ParquetUtil (#8559)
Statistics might be missing when hasNonNullValue is false.
---
.../org/apache/iceberg/parquet/ParquetUtil.java | 6 +--
.../org/apache/iceberg/parquet/TestParquet.java | 62 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 3 deletions(-)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index a879fc5f51..2de423146a 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -133,9 +133,7 @@ public class ParquetUtil {
increment(valueCounts, fieldId, column.getValueCount());
Statistics stats = column.getStatistics();
- if (stats == null) {
- missingStats.add(fieldId);
- } else if (!stats.isEmpty()) {
+ if (stats != null && !stats.isEmpty()) {
increment(nullValueCounts, fieldId, stats.getNumNulls());
// when there are metrics gathered by Iceberg for a column, we
should use those instead
@@ -153,6 +151,8 @@ public class ParquetUtil {
updateMax(upperBounds, fieldId, field.type(), max, metricsMode);
}
}
+ } else {
+ missingStats.add(fieldId);
}
}
}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
index b21e234a5d..ae0a822d34 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
@@ -18,12 +18,14 @@
*/
package org.apache.iceberg.parquet;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.iceberg.Files.localInput;
import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static
org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile;
import static org.apache.iceberg.parquet.ParquetWritingTestUtils.write;
+import static
org.apache.iceberg.relocated.com.google.common.collect.Iterables.getOnlyElement;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
@@ -38,8 +40,12 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.Files;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -47,8 +53,10 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.util.Pair;
import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -86,6 +94,60 @@ public class TestParquet {
}
}
+ @Test
+ public void testMetricsMissingColumnStatisticsInRowGroups() throws
IOException {
+ Schema schema = new Schema(optional(1, "stringCol",
Types.StringType.get()));
+
+ File file = createTempFile(temp);
+
+ List<GenericData.Record> records = Lists.newArrayListWithCapacity(1);
+ org.apache.avro.Schema avroSchema =
AvroSchemaUtil.convert(schema.asStruct());
+
+ GenericData.Record smallRecord = new GenericData.Record(avroSchema);
+ smallRecord.put("stringCol", "test");
+ records.add(smallRecord);
+
+ GenericData.Record largeRecord = new GenericData.Record(avroSchema);
+ largeRecord.put("stringCol", Strings.repeat("a", 2048));
+ records.add(largeRecord);
+
+ write(
+ file,
+ schema,
+ ImmutableMap.<String, String>builder()
+ .put(PARQUET_ROW_GROUP_SIZE_BYTES, "1")
+ .put(PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, "1")
+ .put(PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, "1")
+ .buildOrThrow(),
+ ParquetAvroWriter::buildWriter,
+ records.toArray(new GenericData.Record[] {}));
+
+ InputFile inputFile = Files.localInput(file);
+ try (ParquetFileReader reader =
ParquetFileReader.open(ParquetIO.file(inputFile))) {
+ assertThat(reader.getRowGroups()).hasSize(2);
+ List<BlockMetaData> blocks = reader.getFooter().getBlocks();
+ assertThat(blocks).hasSize(2);
+
+ Statistics<?> smallStatistics =
getOnlyElement(blocks.get(0).getColumns()).getStatistics();
+ assertThat(smallStatistics.hasNonNullValue()).isTrue();
+
assertThat(smallStatistics.getMinBytes()).isEqualTo("test".getBytes(UTF_8));
+
assertThat(smallStatistics.getMaxBytes()).isEqualTo("test".getBytes(UTF_8));
+
+ // parquet-mr doesn't write stats larger than the max size rather than
truncating
+ Statistics<?> largeStatistics =
getOnlyElement(blocks.get(1).getColumns()).getStatistics();
+ assertThat(largeStatistics.hasNonNullValue()).isFalse();
+ assertThat(largeStatistics.getMinBytes()).isNull();
+ assertThat(largeStatistics.getMaxBytes()).isNull();
+ }
+
+ // Null count, lower and upper bounds should be empty because
+ // one of the statistics in row groups is missing
+ Metrics metrics = ParquetUtil.fileMetrics(inputFile,
MetricsConfig.getDefault());
+ assertThat(metrics.nullValueCounts()).isEmpty();
+ assertThat(metrics.lowerBounds()).isEmpty();
+ assertThat(metrics.upperBounds()).isEmpty();
+ }
+
@Test
public void testNumberOfBytesWritten() throws IOException {
Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));