This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c9f775b806 Flink: Move ParquetReader to LogicalTypeAnnotationVisitor 
(#9719)
c9f775b806 is described below

commit c9f775b8063e9af4c14de12659c36a4286e4b000
Author: Fokko Driesprong <[email protected]>
AuthorDate: Fri Apr 26 08:50:48 2024 +0200

    Flink: Move ParquetReader to LogicalTypeAnnotationVisitor (#9719)
    
    * Flink: Move ParquetReader to LogicalTypeAnnotationVisitor
    
    This will enable nanosecond timestamp support,
    since the OriginalType does not represent the
    nanosecond timestamp.
    
    * Add a test
    
    * Update gradle.properties
    
    * Add other versions as well
---
 .../java/org/apache/iceberg/data/DataTest.java     |   2 +-
 .../iceberg/flink/data/FlinkParquetReaders.java    | 183 ++++++++++++++-------
 .../iceberg/flink/data/TestFlinkParquetReader.java | 101 ++++++++++++
 .../iceberg/flink/data/FlinkParquetReaders.java    | 183 ++++++++++++++-------
 .../iceberg/flink/data/TestFlinkParquetReader.java | 101 ++++++++++++
 .../iceberg/flink/data/FlinkParquetReaders.java    | 183 ++++++++++++++-------
 .../iceberg/flink/data/TestFlinkParquetReader.java | 101 ++++++++++++
 7 files changed, 688 insertions(+), 166 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/DataTest.java 
b/data/src/test/java/org/apache/iceberg/data/DataTest.java
index 5ea742e451..638a344cd2 100644
--- a/data/src/test/java/org/apache/iceberg/data/DataTest.java
+++ b/data/src/test/java/org/apache/iceberg/data/DataTest.java
@@ -38,7 +38,7 @@ public abstract class DataTest {
 
   protected abstract void writeAndValidate(Schema schema) throws IOException;
 
-  private static final StructType SUPPORTED_PRIMITIVES =
+  protected static final StructType SUPPORTED_PRIMITIVES =
       StructType.of(
           required(100, "id", LongType.get()),
           optional(101, "data", Types.StringType.get()),
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index ab7b1174c9..ad4310a6d1 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -26,6 +26,7 @@ import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
@@ -50,6 +51,7 @@ import org.apache.iceberg.util.ArrayUtil;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -193,6 +195,124 @@ public class FlinkParquetReaders {
           ParquetValueReaders.option(valueType, valueD, valueReader));
     }
 
+    private static class LogicalTypeAnnotationParquetValueReaderVisitor
+        implements 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
+
+      private final PrimitiveType primitive;
+      private final ColumnDescriptor desc;
+      private final org.apache.iceberg.types.Type.PrimitiveType expected;
+
+      LogicalTypeAnnotationParquetValueReaderVisitor(
+          PrimitiveType primitive,
+          ColumnDescriptor desc,
+          org.apache.iceberg.types.Type.PrimitiveType expected) {
+        this.primitive = primitive;
+        this.desc = desc;
+        this.expected = expected;
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) 
{
+        return Optional.of(new StringReader(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+        return Optional.of(new StringReader(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+        return Optional.of(new StringReader(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          DecimalLogicalTypeAnnotation decimalLogicalType) {
+        switch (primitive.getPrimitiveTypeName()) {
+          case BINARY:
+          case FIXED_LEN_BYTE_ARRAY:
+            return Optional.of(
+                new BinaryDecimalReader(
+                    desc, decimalLogicalType.getPrecision(), 
decimalLogicalType.getScale()));
+          case INT64:
+            return Optional.of(
+                new LongDecimalReader(
+                    desc, decimalLogicalType.getPrecision(), 
decimalLogicalType.getScale()));
+          case INT32:
+            return Optional.of(
+                new IntegerDecimalReader(
+                    desc, decimalLogicalType.getPrecision(), 
decimalLogicalType.getScale()));
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(decimalLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+        return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+        if (timeLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+          return Optional.of(new MillisTimeReader(desc));
+        } else if (timeLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MICROS) {
+          return Optional.of(new LossyMicrosToMillisTimeReader(desc));
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timeLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.TimestampLogicalTypeAnnotation 
timestampLogicalType) {
+        if (timestampLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+          if (timestampLogicalType.isAdjustedToUTC()) {
+            return Optional.of(new MillisToTimestampTzReader(desc));
+          } else {
+            return Optional.of(new MillisToTimestampReader(desc));
+          }
+        } else if (timestampLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MICROS) {
+          if (timestampLogicalType.isAdjustedToUTC()) {
+            return Optional.of(new MicrosToTimestampTzReader(desc));
+          } else {
+            return Optional.of(new MicrosToTimestampReader(desc));
+          }
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+        int width = intLogicalType.getBitWidth();
+        if (width <= 32) {
+          if (expected.typeId() == Types.LongType.get().typeId()) {
+            return Optional.of(new ParquetValueReaders.IntAsLongReader(desc));
+          } else {
+            return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+          }
+        } else if (width <= 64) {
+          return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(intLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+        return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
+      }
+    }
+
     @Override
     @SuppressWarnings("CyclomaticComplexity")
     public ParquetValueReader<?> primitive(
@@ -202,61 +322,14 @@ public class FlinkParquetReaders {
       }
 
       ColumnDescriptor desc = type.getColumnDescription(currentPath());
-
-      if (primitive.getOriginalType() != null) {
-        switch (primitive.getOriginalType()) {
-          case ENUM:
-          case JSON:
-          case UTF8:
-            return new StringReader(desc);
-          case INT_8:
-          case INT_16:
-          case INT_32:
-            if (expected.typeId() == Types.LongType.get().typeId()) {
-              return new ParquetValueReaders.IntAsLongReader(desc);
-            } else {
-              return new ParquetValueReaders.UnboxedReader<>(desc);
-            }
-          case TIME_MICROS:
-            return new LossyMicrosToMillisTimeReader(desc);
-          case TIME_MILLIS:
-            return new MillisTimeReader(desc);
-          case DATE:
-          case INT_64:
-            return new ParquetValueReaders.UnboxedReader<>(desc);
-          case TIMESTAMP_MICROS:
-            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
-              return new MicrosToTimestampTzReader(desc);
-            } else {
-              return new MicrosToTimestampReader(desc);
-            }
-          case TIMESTAMP_MILLIS:
-            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
-              return new MillisToTimestampTzReader(desc);
-            } else {
-              return new MillisToTimestampReader(desc);
-            }
-          case DECIMAL:
-            DecimalLogicalTypeAnnotation decimal =
-                (DecimalLogicalTypeAnnotation) 
primitive.getLogicalTypeAnnotation();
-            switch (primitive.getPrimitiveTypeName()) {
-              case BINARY:
-              case FIXED_LEN_BYTE_ARRAY:
-                return new BinaryDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
-              case INT64:
-                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
-              case INT32:
-                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
-              default:
-                throw new UnsupportedOperationException(
-                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
-            }
-          case BSON:
-            return new ParquetValueReaders.ByteArrayReader(desc);
-          default:
-            throw new UnsupportedOperationException(
-                "Unsupported logical type: " + primitive.getOriginalType());
-        }
+      LogicalTypeAnnotation logicalTypeAnnotation = 
primitive.getLogicalTypeAnnotation();
+      if (logicalTypeAnnotation != null) {
+        return logicalTypeAnnotation
+            .accept(new 
LogicalTypeAnnotationParquetValueReaderVisitor(primitive, desc, expected))
+            .orElseThrow(
+                () ->
+                    new UnsupportedOperationException(
+                        "Unsupported logical type: " + 
primitive.getLogicalTypeAnnotation()));
       }
 
       switch (primitive.getPrimitiveTypeName()) {
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index 1fdc4cf838..4cfb24f629 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.data;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.parquet.schema.Types.primitive;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
@@ -44,15 +45,115 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 import org.junit.jupiter.api.Test;
 
 public class TestFlinkParquetReader extends DataTest {
   private static final int NUM_RECORDS = 100;
 
+  @Test
+  public void testBuildReader() {
+    MessageType fileSchema =
+        new MessageType(
+            "test",
+            // 0: required(100, "id", LongType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(100)
+                .named("id"),
+            // 1: optional(101, "data", Types.StringType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+                .id(101)
+                .named("data"),
+            // 2: required(102, "b", Types.BooleanType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, 
Type.Repetition.REQUIRED)
+                .id(102)
+                .named("b"),
+            // 3: optional(103, "i", Types.IntegerType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT32, 
Type.Repetition.OPTIONAL)
+                .id(103)
+                .named("i"),
+            // 4: optional(105, "f", Types.FloatType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(104)
+                .named("l"),
+            // 5: required(106, "d", Types.DoubleType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.FLOAT, 
Type.Repetition.OPTIONAL)
+                .id(105)
+                .named("f"),
+            // 6: required(106, "d", Types.DoubleType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, 
Type.Repetition.REQUIRED)
+                .id(106)
+                .named("d"),
+            // 7: optional(107, "date", Types.DateType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT32, 
Type.Repetition.OPTIONAL)
+                .id(107)
+                .as(LogicalTypeAnnotation.dateType())
+                .named("date"),
+            // 8: required(108, "ts_tz", Types.TimestampType.withZone())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(108)
+                .as(
+                    LogicalTypeAnnotation.timestampType(
+                        true, LogicalTypeAnnotation.TimeUnit.MICROS))
+                .named("ts_tz"),
+            // 9: required(109, "ts", Types.TimestampType.withoutZone())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(109)
+                .as(
+                    LogicalTypeAnnotation.timestampType(
+                        false, LogicalTypeAnnotation.TimeUnit.MICROS))
+                .named("ts"),
+            // 10: required(110, "s", Types.StringType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.REQUIRED)
+                .id(110)
+                .as(LogicalTypeAnnotation.stringType())
+                .named("s"),
+            // 11: required(112, "fixed", Types.FixedType.ofLength(7))
+            primitive(
+                    PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Type.Repetition.REQUIRED)
+                .id(112)
+                .length(7)
+                .named("f"),
+            // 12: optional(113, "bytes", Types.BinaryType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+                .id(113)
+                .named("bytes"),
+            // 13: required(114, "dec_9_0", Types.DecimalType.of(9, 0))
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(114)
+                .as(LogicalTypeAnnotation.decimalType(0, 9))
+                .named("dec_9_0"),
+            // 14: required(115, "dec_11_2", Types.DecimalType.of(11, 2))
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(115)
+                .as(LogicalTypeAnnotation.decimalType(2, 11))
+                .named("dec_11_2"),
+            // 15: required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // 
maximum precision
+            primitive(
+                    PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Type.Repetition.REQUIRED)
+                .id(116)
+                .length(16)
+                .as(LogicalTypeAnnotation.decimalType(10, 38))
+                .named("dec_38_10"),
+            // 16: required(117, "time", Types.TimeType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL)
+                .id(117)
+                .as(LogicalTypeAnnotation.timeType(true, 
LogicalTypeAnnotation.TimeUnit.MICROS))
+                .named("time"));
+    ParquetValueReader<RowData> reader =
+        FlinkParquetReaders.buildReader(new 
Schema(SUPPORTED_PRIMITIVES.fields()), fileSchema);
+
+    
assertThat(reader.columns().size()).isEqualTo(SUPPORTED_PRIMITIVES.fields().size());
+  }
+
   @Test
   public void testTwoLevelList() throws IOException {
     Schema schema =
diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index ab7b1174c9..ad4310a6d1 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -26,6 +26,7 @@ import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
@@ -50,6 +51,7 @@ import org.apache.iceberg.util.ArrayUtil;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -193,6 +195,124 @@ public class FlinkParquetReaders {
           ParquetValueReaders.option(valueType, valueD, valueReader));
     }
 
+    private static class LogicalTypeAnnotationParquetValueReaderVisitor
+        implements 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
+
+      private final PrimitiveType primitive;
+      private final ColumnDescriptor desc;
+      private final org.apache.iceberg.types.Type.PrimitiveType expected;
+
+      LogicalTypeAnnotationParquetValueReaderVisitor(
+          PrimitiveType primitive,
+          ColumnDescriptor desc,
+          org.apache.iceberg.types.Type.PrimitiveType expected) {
+        this.primitive = primitive;
+        this.desc = desc;
+        this.expected = expected;
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) 
{
+        return Optional.of(new StringReader(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+        return Optional.of(new StringReader(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+        return Optional.of(new StringReader(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          DecimalLogicalTypeAnnotation decimalLogicalType) {
+        switch (primitive.getPrimitiveTypeName()) {
+          case BINARY:
+          case FIXED_LEN_BYTE_ARRAY:
+            return Optional.of(
+                new BinaryDecimalReader(
+                    desc, decimalLogicalType.getPrecision(), 
decimalLogicalType.getScale()));
+          case INT64:
+            return Optional.of(
+                new LongDecimalReader(
+                    desc, decimalLogicalType.getPrecision(), 
decimalLogicalType.getScale()));
+          case INT32:
+            return Optional.of(
+                new IntegerDecimalReader(
+                    desc, decimalLogicalType.getPrecision(), 
decimalLogicalType.getScale()));
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(decimalLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+        return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+        if (timeLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+          return Optional.of(new MillisTimeReader(desc));
+        } else if (timeLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MICROS) {
+          return Optional.of(new LossyMicrosToMillisTimeReader(desc));
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timeLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.TimestampLogicalTypeAnnotation 
timestampLogicalType) {
+        if (timestampLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+          if (timestampLogicalType.isAdjustedToUTC()) {
+            return Optional.of(new MillisToTimestampTzReader(desc));
+          } else {
+            return Optional.of(new MillisToTimestampReader(desc));
+          }
+        } else if (timestampLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MICROS) {
+          if (timestampLogicalType.isAdjustedToUTC()) {
+            return Optional.of(new MicrosToTimestampTzReader(desc));
+          } else {
+            return Optional.of(new MicrosToTimestampReader(desc));
+          }
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+        int width = intLogicalType.getBitWidth();
+        if (width <= 32) {
+          if (expected.typeId() == Types.LongType.get().typeId()) {
+            return Optional.of(new ParquetValueReaders.IntAsLongReader(desc));
+          } else {
+            return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+          }
+        } else if (width <= 64) {
+          return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(intLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+        return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
+      }
+    }
+
     @Override
     @SuppressWarnings("CyclomaticComplexity")
     public ParquetValueReader<?> primitive(
@@ -202,61 +322,14 @@ public class FlinkParquetReaders {
       }
 
       ColumnDescriptor desc = type.getColumnDescription(currentPath());
-
-      if (primitive.getOriginalType() != null) {
-        switch (primitive.getOriginalType()) {
-          case ENUM:
-          case JSON:
-          case UTF8:
-            return new StringReader(desc);
-          case INT_8:
-          case INT_16:
-          case INT_32:
-            if (expected.typeId() == Types.LongType.get().typeId()) {
-              return new ParquetValueReaders.IntAsLongReader(desc);
-            } else {
-              return new ParquetValueReaders.UnboxedReader<>(desc);
-            }
-          case TIME_MICROS:
-            return new LossyMicrosToMillisTimeReader(desc);
-          case TIME_MILLIS:
-            return new MillisTimeReader(desc);
-          case DATE:
-          case INT_64:
-            return new ParquetValueReaders.UnboxedReader<>(desc);
-          case TIMESTAMP_MICROS:
-            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
-              return new MicrosToTimestampTzReader(desc);
-            } else {
-              return new MicrosToTimestampReader(desc);
-            }
-          case TIMESTAMP_MILLIS:
-            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
-              return new MillisToTimestampTzReader(desc);
-            } else {
-              return new MillisToTimestampReader(desc);
-            }
-          case DECIMAL:
-            DecimalLogicalTypeAnnotation decimal =
-                (DecimalLogicalTypeAnnotation) 
primitive.getLogicalTypeAnnotation();
-            switch (primitive.getPrimitiveTypeName()) {
-              case BINARY:
-              case FIXED_LEN_BYTE_ARRAY:
-                return new BinaryDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
-              case INT64:
-                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
-              case INT32:
-                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
-              default:
-                throw new UnsupportedOperationException(
-                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
-            }
-          case BSON:
-            return new ParquetValueReaders.ByteArrayReader(desc);
-          default:
-            throw new UnsupportedOperationException(
-                "Unsupported logical type: " + primitive.getOriginalType());
-        }
+      LogicalTypeAnnotation logicalTypeAnnotation = 
primitive.getLogicalTypeAnnotation();
+      if (logicalTypeAnnotation != null) {
+        return logicalTypeAnnotation
+            .accept(new 
LogicalTypeAnnotationParquetValueReaderVisitor(primitive, desc, expected))
+            .orElseThrow(
+                () ->
+                    new UnsupportedOperationException(
+                        "Unsupported logical type: " + 
primitive.getLogicalTypeAnnotation()));
       }
 
       switch (primitive.getPrimitiveTypeName()) {
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index 1fdc4cf838..4cfb24f629 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.data;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.parquet.schema.Types.primitive;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
@@ -44,15 +45,115 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 import org.junit.jupiter.api.Test;
 
 public class TestFlinkParquetReader extends DataTest {
   private static final int NUM_RECORDS = 100;
 
+  @Test
+  public void testBuildReader() {
+    MessageType fileSchema =
+        new MessageType(
+            "test",
+            // 0: required(100, "id", LongType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(100)
+                .named("id"),
+            // 1: optional(101, "data", Types.StringType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+                .id(101)
+                .named("data"),
+            // 2: required(102, "b", Types.BooleanType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, 
Type.Repetition.REQUIRED)
+                .id(102)
+                .named("b"),
+            // 3: optional(103, "i", Types.IntegerType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT32, 
Type.Repetition.OPTIONAL)
+                .id(103)
+                .named("i"),
+            // 4: optional(105, "f", Types.FloatType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(104)
+                .named("l"),
+            // 5: required(106, "d", Types.DoubleType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.FLOAT, 
Type.Repetition.OPTIONAL)
+                .id(105)
+                .named("f"),
+            // 6: required(106, "d", Types.DoubleType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, 
Type.Repetition.REQUIRED)
+                .id(106)
+                .named("d"),
+            // 7: optional(107, "date", Types.DateType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT32, 
Type.Repetition.OPTIONAL)
+                .id(107)
+                .as(LogicalTypeAnnotation.dateType())
+                .named("date"),
+            // 8: required(108, "ts_tz", Types.TimestampType.withZone())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(108)
+                .as(
+                    LogicalTypeAnnotation.timestampType(
+                        true, LogicalTypeAnnotation.TimeUnit.MICROS))
+                .named("ts_tz"),
+            // 9: required(109, "ts", Types.TimestampType.withoutZone())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(109)
+                .as(
+                    LogicalTypeAnnotation.timestampType(
+                        false, LogicalTypeAnnotation.TimeUnit.MICROS))
+                .named("ts"),
+            // 10: required(110, "s", Types.StringType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.REQUIRED)
+                .id(110)
+                .as(LogicalTypeAnnotation.stringType())
+                .named("s"),
+            // 11: required(112, "fixed", Types.FixedType.ofLength(7))
+            primitive(
+                    PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Type.Repetition.REQUIRED)
+                .id(112)
+                .length(7)
+                .named("f"),
+            // 12: optional(113, "bytes", Types.BinaryType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+                .id(113)
+                .named("bytes"),
+            // 13: required(114, "dec_9_0", Types.DecimalType.of(9, 0))
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(114)
+                .as(LogicalTypeAnnotation.decimalType(0, 9))
+                .named("dec_9_0"),
+            // 14: required(115, "dec_11_2", Types.DecimalType.of(11, 2))
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(115)
+                .as(LogicalTypeAnnotation.decimalType(2, 11))
+                .named("dec_11_2"),
+            // 15: required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // 
maximum precision
+            primitive(
+                    PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Type.Repetition.REQUIRED)
+                .id(116)
+                .length(16)
+                .as(LogicalTypeAnnotation.decimalType(10, 38))
+                .named("dec_38_10"),
+            // 16: required(117, "time", Types.TimeType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL)
+                .id(117)
+                .as(LogicalTypeAnnotation.timeType(true, 
LogicalTypeAnnotation.TimeUnit.MICROS))
+                .named("time"));
+    ParquetValueReader<RowData> reader =
+        FlinkParquetReaders.buildReader(new 
Schema(SUPPORTED_PRIMITIVES.fields()), fileSchema);
+
+    
assertThat(reader.columns().size()).isEqualTo(SUPPORTED_PRIMITIVES.fields().size());
+  }
+
   @Test
   public void testTwoLevelList() throws IOException {
     Schema schema =
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index ab7b1174c9..ad4310a6d1 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -26,6 +26,7 @@ import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
@@ -50,6 +51,7 @@ import org.apache.iceberg.util.ArrayUtil;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -193,6 +195,124 @@ public class FlinkParquetReaders {
           ParquetValueReaders.option(valueType, valueD, valueReader));
     }
 
+    private static class LogicalTypeAnnotationParquetValueReaderVisitor
+        implements 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
+
+      private final PrimitiveType primitive;
+      private final ColumnDescriptor desc;
+      private final org.apache.iceberg.types.Type.PrimitiveType expected;
+
+      LogicalTypeAnnotationParquetValueReaderVisitor(
+          PrimitiveType primitive,
+          ColumnDescriptor desc,
+          org.apache.iceberg.types.Type.PrimitiveType expected) {
+        this.primitive = primitive;
+        this.desc = desc;
+        this.expected = expected;
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) 
{
+        return Optional.of(new StringReader(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+        return Optional.of(new StringReader(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+        return Optional.of(new StringReader(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          DecimalLogicalTypeAnnotation decimalLogicalType) {
+        switch (primitive.getPrimitiveTypeName()) {
+          case BINARY:
+          case FIXED_LEN_BYTE_ARRAY:
+            return Optional.of(
+                new BinaryDecimalReader(
+                    desc, decimalLogicalType.getPrecision(), 
decimalLogicalType.getScale()));
+          case INT64:
+            return Optional.of(
+                new LongDecimalReader(
+                    desc, decimalLogicalType.getPrecision(), 
decimalLogicalType.getScale()));
+          case INT32:
+            return Optional.of(
+                new IntegerDecimalReader(
+                    desc, decimalLogicalType.getPrecision(), 
decimalLogicalType.getScale()));
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(decimalLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+        return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+        if (timeLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+          return Optional.of(new MillisTimeReader(desc));
+        } else if (timeLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MICROS) {
+          return Optional.of(new LossyMicrosToMillisTimeReader(desc));
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timeLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.TimestampLogicalTypeAnnotation 
timestampLogicalType) {
+        if (timestampLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+          if (timestampLogicalType.isAdjustedToUTC()) {
+            return Optional.of(new MillisToTimestampTzReader(desc));
+          } else {
+            return Optional.of(new MillisToTimestampReader(desc));
+          }
+        } else if (timestampLogicalType.getUnit() == 
LogicalTypeAnnotation.TimeUnit.MICROS) {
+          if (timestampLogicalType.isAdjustedToUTC()) {
+            return Optional.of(new MicrosToTimestampTzReader(desc));
+          } else {
+            return Optional.of(new MicrosToTimestampReader(desc));
+          }
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+        int width = intLogicalType.getBitWidth();
+        if (width <= 32) {
+          if (expected.typeId() == Types.LongType.get().typeId()) {
+            return Optional.of(new ParquetValueReaders.IntAsLongReader(desc));
+          } else {
+            return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+          }
+        } else if (width <= 64) {
+          return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+        }
+
+        return 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(intLogicalType);
+      }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+        return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
+      }
+    }
+
     @Override
     @SuppressWarnings("CyclomaticComplexity")
     public ParquetValueReader<?> primitive(
@@ -202,61 +322,14 @@ public class FlinkParquetReaders {
       }
 
       ColumnDescriptor desc = type.getColumnDescription(currentPath());
-
-      if (primitive.getOriginalType() != null) {
-        switch (primitive.getOriginalType()) {
-          case ENUM:
-          case JSON:
-          case UTF8:
-            return new StringReader(desc);
-          case INT_8:
-          case INT_16:
-          case INT_32:
-            if (expected.typeId() == Types.LongType.get().typeId()) {
-              return new ParquetValueReaders.IntAsLongReader(desc);
-            } else {
-              return new ParquetValueReaders.UnboxedReader<>(desc);
-            }
-          case TIME_MICROS:
-            return new LossyMicrosToMillisTimeReader(desc);
-          case TIME_MILLIS:
-            return new MillisTimeReader(desc);
-          case DATE:
-          case INT_64:
-            return new ParquetValueReaders.UnboxedReader<>(desc);
-          case TIMESTAMP_MICROS:
-            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
-              return new MicrosToTimestampTzReader(desc);
-            } else {
-              return new MicrosToTimestampReader(desc);
-            }
-          case TIMESTAMP_MILLIS:
-            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
-              return new MillisToTimestampTzReader(desc);
-            } else {
-              return new MillisToTimestampReader(desc);
-            }
-          case DECIMAL:
-            DecimalLogicalTypeAnnotation decimal =
-                (DecimalLogicalTypeAnnotation) 
primitive.getLogicalTypeAnnotation();
-            switch (primitive.getPrimitiveTypeName()) {
-              case BINARY:
-              case FIXED_LEN_BYTE_ARRAY:
-                return new BinaryDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
-              case INT64:
-                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
-              case INT32:
-                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
-              default:
-                throw new UnsupportedOperationException(
-                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
-            }
-          case BSON:
-            return new ParquetValueReaders.ByteArrayReader(desc);
-          default:
-            throw new UnsupportedOperationException(
-                "Unsupported logical type: " + primitive.getOriginalType());
-        }
+      LogicalTypeAnnotation logicalTypeAnnotation = 
primitive.getLogicalTypeAnnotation();
+      if (logicalTypeAnnotation != null) {
+        return logicalTypeAnnotation
+            .accept(new 
LogicalTypeAnnotationParquetValueReaderVisitor(primitive, desc, expected))
+            .orElseThrow(
+                () ->
+                    new UnsupportedOperationException(
+                        "Unsupported logical type: " + 
primitive.getLogicalTypeAnnotation()));
       }
 
       switch (primitive.getPrimitiveTypeName()) {
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
index 1fdc4cf838..4cfb24f629 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.data;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.parquet.schema.Types.primitive;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
@@ -44,15 +45,115 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 import org.junit.jupiter.api.Test;
 
 public class TestFlinkParquetReader extends DataTest {
   private static final int NUM_RECORDS = 100;
 
+  @Test
+  public void testBuildReader() {
+    MessageType fileSchema =
+        new MessageType(
+            "test",
+            // 0: required(100, "id", LongType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(100)
+                .named("id"),
+            // 1: optional(101, "data", Types.StringType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+                .id(101)
+                .named("data"),
+            // 2: required(102, "b", Types.BooleanType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, 
Type.Repetition.REQUIRED)
+                .id(102)
+                .named("b"),
+            // 3: optional(103, "i", Types.IntegerType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT32, 
Type.Repetition.OPTIONAL)
+                .id(103)
+                .named("i"),
+            // 4: optional(105, "f", Types.FloatType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(104)
+                .named("l"),
+            // 5: required(106, "d", Types.DoubleType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.FLOAT, 
Type.Repetition.OPTIONAL)
+                .id(105)
+                .named("f"),
+            // 6: required(106, "d", Types.DoubleType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, 
Type.Repetition.REQUIRED)
+                .id(106)
+                .named("d"),
+            // 7: optional(107, "date", Types.DateType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT32, 
Type.Repetition.OPTIONAL)
+                .id(107)
+                .as(LogicalTypeAnnotation.dateType())
+                .named("date"),
+            // 8: required(108, "ts_tz", Types.TimestampType.withZone())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(108)
+                .as(
+                    LogicalTypeAnnotation.timestampType(
+                        true, LogicalTypeAnnotation.TimeUnit.MICROS))
+                .named("ts_tz"),
+            // 9: required(109, "ts", Types.TimestampType.withoutZone())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(109)
+                .as(
+                    LogicalTypeAnnotation.timestampType(
+                        false, LogicalTypeAnnotation.TimeUnit.MICROS))
+                .named("ts"),
+            // 10: required(110, "s", Types.StringType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.REQUIRED)
+                .id(110)
+                .as(LogicalTypeAnnotation.stringType())
+                .named("s"),
+            // 11: required(112, "fixed", Types.FixedType.ofLength(7))
+            primitive(
+                    PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Type.Repetition.REQUIRED)
+                .id(112)
+                .length(7)
+                .named("f"),
+            // 12: optional(113, "bytes", Types.BinaryType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+                .id(113)
+                .named("bytes"),
+            // 13: required(114, "dec_9_0", Types.DecimalType.of(9, 0))
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(114)
+                .as(LogicalTypeAnnotation.decimalType(0, 9))
+                .named("dec_9_0"),
+            // 14: required(115, "dec_11_2", Types.DecimalType.of(11, 2))
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REQUIRED)
+                .id(115)
+                .as(LogicalTypeAnnotation.decimalType(2, 11))
+                .named("dec_11_2"),
+            // 15: required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // 
maximum precision
+            primitive(
+                    PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Type.Repetition.REQUIRED)
+                .id(116)
+                .length(16)
+                .as(LogicalTypeAnnotation.decimalType(10, 38))
+                .named("dec_38_10"),
+            // 16: required(117, "time", Types.TimeType.get())
+            primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL)
+                .id(117)
+                .as(LogicalTypeAnnotation.timeType(true, 
LogicalTypeAnnotation.TimeUnit.MICROS))
+                .named("time"));
+    ParquetValueReader<RowData> reader =
+        FlinkParquetReaders.buildReader(new 
Schema(SUPPORTED_PRIMITIVES.fields()), fileSchema);
+
+    
assertThat(reader.columns().size()).isEqualTo(SUPPORTED_PRIMITIVES.fields().size());
+  }
+
   @Test
   public void testTwoLevelList() throws IOException {
     Schema schema =

Reply via email to