This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c9f775b806 Flink: Move ParquetReader to LogicalTypeAnnotationVisitor
(#9719)
c9f775b806 is described below
commit c9f775b8063e9af4c14de12659c36a4286e4b000
Author: Fokko Driesprong <[email protected]>
AuthorDate: Fri Apr 26 08:50:48 2024 +0200
Flink: Move ParquetReader to LogicalTypeAnnotationVisitor (#9719)
* Flink: Move ParquetReader to LogicalTypeAnnotationVisitor
This will enable nanosecond timestamp support,
since the OriginalType does not represent the
nanosecond timestamp.
* Add a test
* Update gradle.properties
* Add other versions as well
---
.../java/org/apache/iceberg/data/DataTest.java | 2 +-
.../iceberg/flink/data/FlinkParquetReaders.java | 183 ++++++++++++++-------
.../iceberg/flink/data/TestFlinkParquetReader.java | 101 ++++++++++++
.../iceberg/flink/data/FlinkParquetReaders.java | 183 ++++++++++++++-------
.../iceberg/flink/data/TestFlinkParquetReader.java | 101 ++++++++++++
.../iceberg/flink/data/FlinkParquetReaders.java | 183 ++++++++++++++-------
.../iceberg/flink/data/TestFlinkParquetReader.java | 101 ++++++++++++
7 files changed, 688 insertions(+), 166 deletions(-)
diff --git a/data/src/test/java/org/apache/iceberg/data/DataTest.java
b/data/src/test/java/org/apache/iceberg/data/DataTest.java
index 5ea742e451..638a344cd2 100644
--- a/data/src/test/java/org/apache/iceberg/data/DataTest.java
+++ b/data/src/test/java/org/apache/iceberg/data/DataTest.java
@@ -38,7 +38,7 @@ public abstract class DataTest {
protected abstract void writeAndValidate(Schema schema) throws IOException;
- private static final StructType SUPPORTED_PRIMITIVES =
+ protected static final StructType SUPPORTED_PRIMITIVES =
StructType.of(
required(100, "id", LongType.get()),
optional(101, "data", Types.StringType.get()),
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index ab7b1174c9..ad4310a6d1 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -26,6 +26,7 @@ import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
@@ -50,6 +51,7 @@ import org.apache.iceberg.util.ArrayUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
@@ -193,6 +195,124 @@ public class FlinkParquetReaders {
ParquetValueReaders.option(valueType, valueD, valueReader));
}
+ private static class LogicalTypeAnnotationParquetValueReaderVisitor
+ implements
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
+
+ private final PrimitiveType primitive;
+ private final ColumnDescriptor desc;
+ private final org.apache.iceberg.types.Type.PrimitiveType expected;
+
+ LogicalTypeAnnotationParquetValueReaderVisitor(
+ PrimitiveType primitive,
+ ColumnDescriptor desc,
+ org.apache.iceberg.types.Type.PrimitiveType expected) {
+ this.primitive = primitive;
+ this.desc = desc;
+ this.expected = expected;
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType)
{
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ DecimalLogicalTypeAnnotation decimalLogicalType) {
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return Optional.of(
+ new BinaryDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ case INT64:
+ return Optional.of(
+ new LongDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ case INT32:
+ return Optional.of(
+ new IntegerDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(decimalLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+ if (timeLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ return Optional.of(new MillisTimeReader(desc));
+ } else if (timeLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MICROS) {
+ return Optional.of(new LossyMicrosToMillisTimeReader(desc));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timeLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
timestampLogicalType) {
+ if (timestampLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ if (timestampLogicalType.isAdjustedToUTC()) {
+ return Optional.of(new MillisToTimestampTzReader(desc));
+ } else {
+ return Optional.of(new MillisToTimestampReader(desc));
+ }
+ } else if (timestampLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MICROS) {
+ if (timestampLogicalType.isAdjustedToUTC()) {
+ return Optional.of(new MicrosToTimestampTzReader(desc));
+ } else {
+ return Optional.of(new MicrosToTimestampReader(desc));
+ }
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+ int width = intLogicalType.getBitWidth();
+ if (width <= 32) {
+ if (expected.typeId() == Types.LongType.get().typeId()) {
+ return Optional.of(new ParquetValueReaders.IntAsLongReader(desc));
+ } else {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+ } else if (width <= 64) {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(intLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+ return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
+ }
+ }
+
@Override
@SuppressWarnings("CyclomaticComplexity")
public ParquetValueReader<?> primitive(
@@ -202,61 +322,14 @@ public class FlinkParquetReaders {
}
ColumnDescriptor desc = type.getColumnDescription(currentPath());
-
- if (primitive.getOriginalType() != null) {
- switch (primitive.getOriginalType()) {
- case ENUM:
- case JSON:
- case UTF8:
- return new StringReader(desc);
- case INT_8:
- case INT_16:
- case INT_32:
- if (expected.typeId() == Types.LongType.get().typeId()) {
- return new ParquetValueReaders.IntAsLongReader(desc);
- } else {
- return new ParquetValueReaders.UnboxedReader<>(desc);
- }
- case TIME_MICROS:
- return new LossyMicrosToMillisTimeReader(desc);
- case TIME_MILLIS:
- return new MillisTimeReader(desc);
- case DATE:
- case INT_64:
- return new ParquetValueReaders.UnboxedReader<>(desc);
- case TIMESTAMP_MICROS:
- if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
- return new MicrosToTimestampTzReader(desc);
- } else {
- return new MicrosToTimestampReader(desc);
- }
- case TIMESTAMP_MILLIS:
- if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
- return new MillisToTimestampTzReader(desc);
- } else {
- return new MillisToTimestampReader(desc);
- }
- case DECIMAL:
- DecimalLogicalTypeAnnotation decimal =
- (DecimalLogicalTypeAnnotation)
primitive.getLogicalTypeAnnotation();
- switch (primitive.getPrimitiveTypeName()) {
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- return new BinaryDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
- case INT64:
- return new LongDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
- case INT32:
- return new IntegerDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
- default:
- throw new UnsupportedOperationException(
- "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
- }
- case BSON:
- return new ParquetValueReaders.ByteArrayReader(desc);
- default:
- throw new UnsupportedOperationException(
- "Unsupported logical type: " + primitive.getOriginalType());
- }
+ LogicalTypeAnnotation logicalTypeAnnotation =
primitive.getLogicalTypeAnnotation();
+ if (logicalTypeAnnotation != null) {
+ return logicalTypeAnnotation
+ .accept(new
LogicalTypeAnnotationParquetValueReaderVisitor(primitive, desc, expected))
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported logical type: " +
primitive.getLogicalTypeAnnotation()));
}
switch (primitive.getPrimitiveTypeName()) {
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index 1fdc4cf838..4cfb24f629 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.data;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.parquet.schema.Types.primitive;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
@@ -44,15 +45,115 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
import org.junit.jupiter.api.Test;
public class TestFlinkParquetReader extends DataTest {
private static final int NUM_RECORDS = 100;
+ @Test
+ public void testBuildReader() {
+ MessageType fileSchema =
+ new MessageType(
+ "test",
+ // 0: required(100, "id", LongType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(100)
+ .named("id"),
+ // 1: optional(101, "data", Types.StringType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.OPTIONAL)
+ .id(101)
+ .named("data"),
+ // 2: required(102, "b", Types.BooleanType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN,
Type.Repetition.REQUIRED)
+ .id(102)
+ .named("b"),
+ // 3: optional(103, "i", Types.IntegerType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT32,
Type.Repetition.OPTIONAL)
+ .id(103)
+ .named("i"),
+ // 4: optional(105, "f", Types.FloatType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(104)
+ .named("l"),
+ // 5: required(106, "d", Types.DoubleType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.FLOAT,
Type.Repetition.OPTIONAL)
+ .id(105)
+ .named("f"),
+ // 6: required(106, "d", Types.DoubleType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.DOUBLE,
Type.Repetition.REQUIRED)
+ .id(106)
+ .named("d"),
+ // 7: optional(107, "date", Types.DateType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT32,
Type.Repetition.OPTIONAL)
+ .id(107)
+ .as(LogicalTypeAnnotation.dateType())
+ .named("date"),
+ // 8: required(108, "ts_tz", Types.TimestampType.withZone())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(108)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("ts_tz"),
+ // 9: required(109, "ts", Types.TimestampType.withoutZone())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(109)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ false, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("ts"),
+ // 10: required(110, "s", Types.StringType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
+ .id(110)
+ .as(LogicalTypeAnnotation.stringType())
+ .named("s"),
+ // 11: required(112, "fixed", Types.FixedType.ofLength(7))
+ primitive(
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
Type.Repetition.REQUIRED)
+ .id(112)
+ .length(7)
+ .named("f"),
+ // 12: optional(113, "bytes", Types.BinaryType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.OPTIONAL)
+ .id(113)
+ .named("bytes"),
+ // 13: required(114, "dec_9_0", Types.DecimalType.of(9, 0))
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(114)
+ .as(LogicalTypeAnnotation.decimalType(0, 9))
+ .named("dec_9_0"),
+ // 14: required(115, "dec_11_2", Types.DecimalType.of(11, 2))
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(115)
+ .as(LogicalTypeAnnotation.decimalType(2, 11))
+ .named("dec_11_2"),
+ // 15: required(116, "dec_38_10", Types.DecimalType.of(38, 10)) //
maximum precision
+ primitive(
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
Type.Repetition.REQUIRED)
+ .id(116)
+ .length(16)
+ .as(LogicalTypeAnnotation.decimalType(10, 38))
+ .named("dec_38_10"),
+ // 16: required(117, "time", Types.TimeType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.OPTIONAL)
+ .id(117)
+ .as(LogicalTypeAnnotation.timeType(true,
LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("time"));
+ ParquetValueReader<RowData> reader =
+ FlinkParquetReaders.buildReader(new
Schema(SUPPORTED_PRIMITIVES.fields()), fileSchema);
+
+
assertThat(reader.columns().size()).isEqualTo(SUPPORTED_PRIMITIVES.fields().size());
+ }
+
@Test
public void testTwoLevelList() throws IOException {
Schema schema =
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index ab7b1174c9..ad4310a6d1 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -26,6 +26,7 @@ import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
@@ -50,6 +51,7 @@ import org.apache.iceberg.util.ArrayUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
@@ -193,6 +195,124 @@ public class FlinkParquetReaders {
ParquetValueReaders.option(valueType, valueD, valueReader));
}
+ private static class LogicalTypeAnnotationParquetValueReaderVisitor
+ implements
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
+
+ private final PrimitiveType primitive;
+ private final ColumnDescriptor desc;
+ private final org.apache.iceberg.types.Type.PrimitiveType expected;
+
+ LogicalTypeAnnotationParquetValueReaderVisitor(
+ PrimitiveType primitive,
+ ColumnDescriptor desc,
+ org.apache.iceberg.types.Type.PrimitiveType expected) {
+ this.primitive = primitive;
+ this.desc = desc;
+ this.expected = expected;
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType)
{
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ DecimalLogicalTypeAnnotation decimalLogicalType) {
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return Optional.of(
+ new BinaryDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ case INT64:
+ return Optional.of(
+ new LongDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ case INT32:
+ return Optional.of(
+ new IntegerDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(decimalLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+ if (timeLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ return Optional.of(new MillisTimeReader(desc));
+ } else if (timeLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MICROS) {
+ return Optional.of(new LossyMicrosToMillisTimeReader(desc));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timeLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
timestampLogicalType) {
+ if (timestampLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ if (timestampLogicalType.isAdjustedToUTC()) {
+ return Optional.of(new MillisToTimestampTzReader(desc));
+ } else {
+ return Optional.of(new MillisToTimestampReader(desc));
+ }
+ } else if (timestampLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MICROS) {
+ if (timestampLogicalType.isAdjustedToUTC()) {
+ return Optional.of(new MicrosToTimestampTzReader(desc));
+ } else {
+ return Optional.of(new MicrosToTimestampReader(desc));
+ }
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+ int width = intLogicalType.getBitWidth();
+ if (width <= 32) {
+ if (expected.typeId() == Types.LongType.get().typeId()) {
+ return Optional.of(new ParquetValueReaders.IntAsLongReader(desc));
+ } else {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+ } else if (width <= 64) {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(intLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+ return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
+ }
+ }
+
@Override
@SuppressWarnings("CyclomaticComplexity")
public ParquetValueReader<?> primitive(
@@ -202,61 +322,14 @@ public class FlinkParquetReaders {
}
ColumnDescriptor desc = type.getColumnDescription(currentPath());
-
- if (primitive.getOriginalType() != null) {
- switch (primitive.getOriginalType()) {
- case ENUM:
- case JSON:
- case UTF8:
- return new StringReader(desc);
- case INT_8:
- case INT_16:
- case INT_32:
- if (expected.typeId() == Types.LongType.get().typeId()) {
- return new ParquetValueReaders.IntAsLongReader(desc);
- } else {
- return new ParquetValueReaders.UnboxedReader<>(desc);
- }
- case TIME_MICROS:
- return new LossyMicrosToMillisTimeReader(desc);
- case TIME_MILLIS:
- return new MillisTimeReader(desc);
- case DATE:
- case INT_64:
- return new ParquetValueReaders.UnboxedReader<>(desc);
- case TIMESTAMP_MICROS:
- if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
- return new MicrosToTimestampTzReader(desc);
- } else {
- return new MicrosToTimestampReader(desc);
- }
- case TIMESTAMP_MILLIS:
- if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
- return new MillisToTimestampTzReader(desc);
- } else {
- return new MillisToTimestampReader(desc);
- }
- case DECIMAL:
- DecimalLogicalTypeAnnotation decimal =
- (DecimalLogicalTypeAnnotation)
primitive.getLogicalTypeAnnotation();
- switch (primitive.getPrimitiveTypeName()) {
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- return new BinaryDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
- case INT64:
- return new LongDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
- case INT32:
- return new IntegerDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
- default:
- throw new UnsupportedOperationException(
- "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
- }
- case BSON:
- return new ParquetValueReaders.ByteArrayReader(desc);
- default:
- throw new UnsupportedOperationException(
- "Unsupported logical type: " + primitive.getOriginalType());
- }
+ LogicalTypeAnnotation logicalTypeAnnotation =
primitive.getLogicalTypeAnnotation();
+ if (logicalTypeAnnotation != null) {
+ return logicalTypeAnnotation
+ .accept(new
LogicalTypeAnnotationParquetValueReaderVisitor(primitive, desc, expected))
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported logical type: " +
primitive.getLogicalTypeAnnotation()));
}
switch (primitive.getPrimitiveTypeName()) {
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index 1fdc4cf838..4cfb24f629 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.data;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.parquet.schema.Types.primitive;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
@@ -44,15 +45,115 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
import org.junit.jupiter.api.Test;
public class TestFlinkParquetReader extends DataTest {
private static final int NUM_RECORDS = 100;
+ @Test
+ public void testBuildReader() {
+ MessageType fileSchema =
+ new MessageType(
+ "test",
+ // 0: required(100, "id", LongType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(100)
+ .named("id"),
+ // 1: optional(101, "data", Types.StringType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.OPTIONAL)
+ .id(101)
+ .named("data"),
+ // 2: required(102, "b", Types.BooleanType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN,
Type.Repetition.REQUIRED)
+ .id(102)
+ .named("b"),
+ // 3: optional(103, "i", Types.IntegerType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT32,
Type.Repetition.OPTIONAL)
+ .id(103)
+ .named("i"),
+ // 4: optional(105, "f", Types.FloatType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(104)
+ .named("l"),
+ // 5: required(106, "d", Types.DoubleType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.FLOAT,
Type.Repetition.OPTIONAL)
+ .id(105)
+ .named("f"),
+ // 6: required(106, "d", Types.DoubleType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.DOUBLE,
Type.Repetition.REQUIRED)
+ .id(106)
+ .named("d"),
+ // 7: optional(107, "date", Types.DateType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT32,
Type.Repetition.OPTIONAL)
+ .id(107)
+ .as(LogicalTypeAnnotation.dateType())
+ .named("date"),
+ // 8: required(108, "ts_tz", Types.TimestampType.withZone())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(108)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("ts_tz"),
+ // 9: required(109, "ts", Types.TimestampType.withoutZone())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(109)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ false, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("ts"),
+ // 10: required(110, "s", Types.StringType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
+ .id(110)
+ .as(LogicalTypeAnnotation.stringType())
+ .named("s"),
+ // 11: required(112, "fixed", Types.FixedType.ofLength(7))
+ primitive(
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
Type.Repetition.REQUIRED)
+ .id(112)
+ .length(7)
+ .named("f"),
+ // 12: optional(113, "bytes", Types.BinaryType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.OPTIONAL)
+ .id(113)
+ .named("bytes"),
+ // 13: required(114, "dec_9_0", Types.DecimalType.of(9, 0))
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(114)
+ .as(LogicalTypeAnnotation.decimalType(0, 9))
+ .named("dec_9_0"),
+ // 14: required(115, "dec_11_2", Types.DecimalType.of(11, 2))
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(115)
+ .as(LogicalTypeAnnotation.decimalType(2, 11))
+ .named("dec_11_2"),
+ // 15: required(116, "dec_38_10", Types.DecimalType.of(38, 10)) //
maximum precision
+ primitive(
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
Type.Repetition.REQUIRED)
+ .id(116)
+ .length(16)
+ .as(LogicalTypeAnnotation.decimalType(10, 38))
+ .named("dec_38_10"),
+ // 16: required(117, "time", Types.TimeType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.OPTIONAL)
+ .id(117)
+ .as(LogicalTypeAnnotation.timeType(true,
LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("time"));
+ ParquetValueReader<RowData> reader =
+ FlinkParquetReaders.buildReader(new
Schema(SUPPORTED_PRIMITIVES.fields()), fileSchema);
+
+
assertThat(reader.columns().size()).isEqualTo(SUPPORTED_PRIMITIVES.fields().size());
+ }
+
@Test
public void testTwoLevelList() throws IOException {
Schema schema =
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index ab7b1174c9..ad4310a6d1 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -26,6 +26,7 @@ import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
@@ -50,6 +51,7 @@ import org.apache.iceberg.util.ArrayUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
@@ -193,6 +195,124 @@ public class FlinkParquetReaders {
ParquetValueReaders.option(valueType, valueD, valueReader));
}
+ private static class LogicalTypeAnnotationParquetValueReaderVisitor
+ implements
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
+
+ private final PrimitiveType primitive;
+ private final ColumnDescriptor desc;
+ private final org.apache.iceberg.types.Type.PrimitiveType expected;
+
+ LogicalTypeAnnotationParquetValueReaderVisitor(
+ PrimitiveType primitive,
+ ColumnDescriptor desc,
+ org.apache.iceberg.types.Type.PrimitiveType expected) {
+ this.primitive = primitive;
+ this.desc = desc;
+ this.expected = expected;
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType)
{
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ DecimalLogicalTypeAnnotation decimalLogicalType) {
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return Optional.of(
+ new BinaryDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ case INT64:
+ return Optional.of(
+ new LongDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ case INT32:
+ return Optional.of(
+ new IntegerDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(decimalLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+ if (timeLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ return Optional.of(new MillisTimeReader(desc));
+ } else if (timeLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MICROS) {
+ return Optional.of(new LossyMicrosToMillisTimeReader(desc));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timeLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
timestampLogicalType) {
+ if (timestampLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ if (timestampLogicalType.isAdjustedToUTC()) {
+ return Optional.of(new MillisToTimestampTzReader(desc));
+ } else {
+ return Optional.of(new MillisToTimestampReader(desc));
+ }
+ } else if (timestampLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MICROS) {
+ if (timestampLogicalType.isAdjustedToUTC()) {
+ return Optional.of(new MicrosToTimestampTzReader(desc));
+ } else {
+ return Optional.of(new MicrosToTimestampReader(desc));
+ }
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+ int width = intLogicalType.getBitWidth();
+ if (width <= 32) {
+ if (expected.typeId() == Types.LongType.get().typeId()) {
+ return Optional.of(new ParquetValueReaders.IntAsLongReader(desc));
+ } else {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+ } else if (width <= 64) {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(intLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+ return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
+ }
+ }
+
@Override
@SuppressWarnings("CyclomaticComplexity")
public ParquetValueReader<?> primitive(
@@ -202,61 +322,14 @@ public class FlinkParquetReaders {
}
ColumnDescriptor desc = type.getColumnDescription(currentPath());
-
- if (primitive.getOriginalType() != null) {
- switch (primitive.getOriginalType()) {
- case ENUM:
- case JSON:
- case UTF8:
- return new StringReader(desc);
- case INT_8:
- case INT_16:
- case INT_32:
- if (expected.typeId() == Types.LongType.get().typeId()) {
- return new ParquetValueReaders.IntAsLongReader(desc);
- } else {
- return new ParquetValueReaders.UnboxedReader<>(desc);
- }
- case TIME_MICROS:
- return new LossyMicrosToMillisTimeReader(desc);
- case TIME_MILLIS:
- return new MillisTimeReader(desc);
- case DATE:
- case INT_64:
- return new ParquetValueReaders.UnboxedReader<>(desc);
- case TIMESTAMP_MICROS:
- if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
- return new MicrosToTimestampTzReader(desc);
- } else {
- return new MicrosToTimestampReader(desc);
- }
- case TIMESTAMP_MILLIS:
- if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
- return new MillisToTimestampTzReader(desc);
- } else {
- return new MillisToTimestampReader(desc);
- }
- case DECIMAL:
- DecimalLogicalTypeAnnotation decimal =
- (DecimalLogicalTypeAnnotation)
primitive.getLogicalTypeAnnotation();
- switch (primitive.getPrimitiveTypeName()) {
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- return new BinaryDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
- case INT64:
- return new LongDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
- case INT32:
- return new IntegerDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
- default:
- throw new UnsupportedOperationException(
- "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
- }
- case BSON:
- return new ParquetValueReaders.ByteArrayReader(desc);
- default:
- throw new UnsupportedOperationException(
- "Unsupported logical type: " + primitive.getOriginalType());
- }
+ LogicalTypeAnnotation logicalTypeAnnotation =
primitive.getLogicalTypeAnnotation();
+ if (logicalTypeAnnotation != null) {
+ return logicalTypeAnnotation
+ .accept(new
LogicalTypeAnnotationParquetValueReaderVisitor(primitive, desc, expected))
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported logical type: " +
primitive.getLogicalTypeAnnotation()));
}
switch (primitive.getPrimitiveTypeName()) {
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index 1fdc4cf838..4cfb24f629 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.data;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.parquet.schema.Types.primitive;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
@@ -44,15 +45,115 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
import org.junit.jupiter.api.Test;
public class TestFlinkParquetReader extends DataTest {
private static final int NUM_RECORDS = 100;
+ @Test
+ public void testBuildReader() {
+ MessageType fileSchema =
+ new MessageType(
+ "test",
+ // 0: required(100, "id", LongType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(100)
+ .named("id"),
+ // 1: optional(101, "data", Types.StringType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.OPTIONAL)
+ .id(101)
+ .named("data"),
+ // 2: required(102, "b", Types.BooleanType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN,
Type.Repetition.REQUIRED)
+ .id(102)
+ .named("b"),
+ // 3: optional(103, "i", Types.IntegerType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT32,
Type.Repetition.OPTIONAL)
+ .id(103)
+ .named("i"),
+ // 4: optional(105, "f", Types.FloatType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(104)
+ .named("l"),
+ // 5: required(106, "d", Types.DoubleType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.FLOAT,
Type.Repetition.OPTIONAL)
+ .id(105)
+ .named("f"),
+ // 6: required(106, "d", Types.DoubleType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.DOUBLE,
Type.Repetition.REQUIRED)
+ .id(106)
+ .named("d"),
+ // 7: optional(107, "date", Types.DateType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT32,
Type.Repetition.OPTIONAL)
+ .id(107)
+ .as(LogicalTypeAnnotation.dateType())
+ .named("date"),
+ // 8: required(108, "ts_tz", Types.TimestampType.withZone())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(108)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("ts_tz"),
+ // 9: required(109, "ts", Types.TimestampType.withoutZone())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(109)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ false, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("ts"),
+ // 10: required(110, "s", Types.StringType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
+ .id(110)
+ .as(LogicalTypeAnnotation.stringType())
+ .named("s"),
+ // 11: required(112, "fixed", Types.FixedType.ofLength(7))
+ primitive(
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
Type.Repetition.REQUIRED)
+ .id(112)
+ .length(7)
+ .named("f"),
+ // 12: optional(113, "bytes", Types.BinaryType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.OPTIONAL)
+ .id(113)
+ .named("bytes"),
+ // 13: required(114, "dec_9_0", Types.DecimalType.of(9, 0))
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(114)
+ .as(LogicalTypeAnnotation.decimalType(0, 9))
+ .named("dec_9_0"),
+ // 14: required(115, "dec_11_2", Types.DecimalType.of(11, 2))
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .id(115)
+ .as(LogicalTypeAnnotation.decimalType(2, 11))
+ .named("dec_11_2"),
+ // 15: required(116, "dec_38_10", Types.DecimalType.of(38, 10)) //
maximum precision
+ primitive(
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
Type.Repetition.REQUIRED)
+ .id(116)
+ .length(16)
+ .as(LogicalTypeAnnotation.decimalType(10, 38))
+ .named("dec_38_10"),
+ // 16: required(117, "time", Types.TimeType.get())
+ primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.OPTIONAL)
+ .id(117)
+ .as(LogicalTypeAnnotation.timeType(true,
LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("time"));
+ ParquetValueReader<RowData> reader =
+ FlinkParquetReaders.buildReader(new
Schema(SUPPORTED_PRIMITIVES.fields()), fileSchema);
+
+
assertThat(reader.columns().size()).isEqualTo(SUPPORTED_PRIMITIVES.fields().size());
+ }
+
@Test
public void testTwoLevelList() throws IOException {
Schema schema =