This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 41cc133  Add abstract BaseParquetReaders for Iceberg generics and 
Flink (#1162)
41cc133 is described below

commit 41cc1334bfe50c7dccb58baeb664b236732f0e2b
Author: openinx <[email protected]>
AuthorDate: Fri Jul 3 03:42:27 2020 +0800

    Add abstract BaseParquetReaders for Iceberg generics and Flink (#1162)
---
 ...ParquetReaders.java => BaseParquetReaders.java} | 154 +++------
 ...icParquetWriter.java => BaseParquetWriter.java} |  79 ++---
 .../data/parquet/GenericParquetReaders.java        | 377 +--------------------
 .../iceberg/data/parquet/GenericParquetWriter.java | 264 +--------------
 .../iceberg/flink/data/FlinkParquetReaders.java    |  58 +---
 .../iceberg/flink/data/FlinkParquetWriters.java    |  26 +-
 .../flink/data/TestFlinkParquetReaderWriter.java   |   4 +-
 7 files changed, 132 insertions(+), 830 deletions(-)

diff --git 
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java 
b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
similarity index 68%
copy from 
data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
copy to 
data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
index 3e4e7c7..36d0144 100644
--- 
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
@@ -29,31 +29,15 @@ import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
-import org.apache.iceberg.parquet.ParquetValueReaders.BinaryAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.BytesReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.IntAsLongReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.IntegerAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.ListReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.LongAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.MapReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.PrimitiveReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.StringReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.StructReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.UnboxedReader;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Type.TypeID;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.types.Types.TimestampType;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
@@ -61,44 +45,48 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class GenericParquetReaders {
-  protected GenericParquetReaders() {
+public abstract class BaseParquetReaders<T> {
+  protected BaseParquetReaders() {
   }
 
-  public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
-                                                       MessageType fileSchema) 
{
-    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  protected ParquetValueReader<T> createReader(Schema expectedSchema,
+                                               MessageType fileSchema) {
+    return createReader(expectedSchema, fileSchema, ImmutableMap.of());
   }
 
   @SuppressWarnings("unchecked")
-  public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
-                                                       MessageType fileSchema,
-                                                       Map<Integer, ?> 
idToConstant) {
+  protected ParquetValueReader<T> createReader(Schema expectedSchema,
+                                               MessageType fileSchema,
+                                               Map<Integer, ?> idToConstant) {
     if (ParquetSchemaUtil.hasIds(fileSchema)) {
-      return (ParquetValueReader<Record>)
+      return (ParquetValueReader<T>)
           TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
               new ReadBuilder(fileSchema, idToConstant));
     } else {
-      return (ParquetValueReader<Record>)
+      return (ParquetValueReader<T>)
           TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
               new FallbackReadBuilder(fileSchema, idToConstant));
     }
   }
 
-  protected static class FallbackReadBuilder extends ReadBuilder {
-    protected FallbackReadBuilder(MessageType type, Map<Integer, ?> 
idToConstant) {
+  protected abstract ParquetValueReader<T> createStructReader(List<Type> types,
+                                                              
List<ParquetValueReader<?>> fieldReaders,
+                                                              Types.StructType 
structType);
+
+  private class FallbackReadBuilder extends ReadBuilder {
+    private FallbackReadBuilder(MessageType type, Map<Integer, ?> 
idToConstant) {
       super(type, idToConstant);
     }
 
     @Override
-    public ParquetValueReader<?> message(StructType expected, MessageType 
message,
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
                                          List<ParquetValueReader<?>> 
fieldReaders) {
       // the top level matches by ID, but the remaining IDs are missing
       return super.struct(expected, message, fieldReaders);
     }
 
     @Override
-    public ParquetValueReader<?> struct(StructType expected, GroupType struct,
+    public ParquetValueReader<?> struct(Types.StructType expected, GroupType 
struct,
                                         List<ParquetValueReader<?>> 
fieldReaders) {
       // the expected struct is ignored because nested fields are never found 
when the
       List<ParquetValueReader<?>> newFields = 
Lists.newArrayListWithExpectedSize(
@@ -116,29 +104,23 @@ public class GenericParquetReaders {
     }
   }
 
-  protected static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+  private class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
     private final MessageType type;
     private final Map<Integer, ?> idToConstant;
 
-    protected ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+    private ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
       this.type = type;
       this.idToConstant = idToConstant;
     }
 
     @Override
-    public ParquetValueReader<?> message(StructType expected, MessageType 
message,
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
                                          List<ParquetValueReader<?>> 
fieldReaders) {
       return struct(expected, message.asGroupType(), fieldReaders);
     }
 
-    protected StructReader<?, ?> createStructReader(List<Type> types,
-                                                    
List<ParquetValueReader<?>> readers,
-                                                    StructType struct) {
-      return new RecordReader(types, readers, struct);
-    }
-
     @Override
-    public ParquetValueReader<?> struct(StructType expected, GroupType struct,
+    public ParquetValueReader<?> struct(Types.StructType expected, GroupType 
struct,
                                         List<ParquetValueReader<?>> 
fieldReaders) {
       // match the expected struct's order
       Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
@@ -190,7 +172,8 @@ public class GenericParquetReaders {
       Type elementType = repeated.getType(0);
       int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
 
-      return new ListReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+      return new ParquetValueReaders.ListReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(elementType, elementD, elementReader));
     }
 
     @Override
@@ -208,7 +191,7 @@ public class GenericParquetReaders {
       Type valueType = repeatedKeyValue.getType(1);
       int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
 
-      return new MapReader<>(repeatedD, repeatedR,
+      return new ParquetValueReaders.MapReader<>(repeatedD, repeatedR,
           ParquetValueReaders.option(keyType, keyD, keyReader),
           ParquetValueReaders.option(valueType, valueD, valueReader));
     }
@@ -223,28 +206,28 @@ public class GenericParquetReaders {
           case ENUM:
           case JSON:
           case UTF8:
-            return new StringReader(desc);
+            return new ParquetValueReaders.StringReader(desc);
           case INT_8:
           case INT_16:
           case INT_32:
-            if (expected.typeId() == TypeID.LONG) {
-              return new IntAsLongReader(desc);
+            if (expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
             } else {
-              return new UnboxedReader<>(desc);
+              return new ParquetValueReaders.UnboxedReader<>(desc);
             }
           case INT_64:
-            return new UnboxedReader<>(desc);
+            return new ParquetValueReaders.UnboxedReader<>(desc);
           case DATE:
             return new DateReader(desc);
           case TIMESTAMP_MICROS:
-            TimestampType tsMicrosType = (TimestampType) expected;
+            Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
             if (tsMicrosType.shouldAdjustToUTC()) {
               return new TimestamptzReader(desc);
             } else {
               return new TimestampReader(desc);
             }
           case TIMESTAMP_MILLIS:
-            TimestampType tsMillisType = (TimestampType) expected;
+            Types.TimestampType tsMillisType = (Types.TimestampType) expected;
             if (tsMillisType.shouldAdjustToUTC()) {
               return new TimestamptzMillisReader(desc);
             } else {
@@ -259,17 +242,17 @@ public class GenericParquetReaders {
             switch (primitive.getPrimitiveTypeName()) {
               case BINARY:
               case FIXED_LEN_BYTE_ARRAY:
-                return new BinaryAsDecimalReader(desc, decimal.getScale());
+                return new ParquetValueReaders.BinaryAsDecimalReader(desc, 
decimal.getScale());
               case INT64:
-                return new LongAsDecimalReader(desc, decimal.getScale());
+                return new ParquetValueReaders.LongAsDecimalReader(desc, 
decimal.getScale());
               case INT32:
-                return new IntegerAsDecimalReader(desc, decimal.getScale());
+                return new ParquetValueReaders.IntegerAsDecimalReader(desc, 
decimal.getScale());
               default:
                 throw new UnsupportedOperationException(
                     "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
             }
           case BSON:
-            return new BytesReader(desc);
+            return new ParquetValueReaders.BytesReader(desc);
           default:
             throw new UnsupportedOperationException(
                 "Unsupported logical type: " + primitive.getOriginalType());
@@ -280,23 +263,23 @@ public class GenericParquetReaders {
         case FIXED_LEN_BYTE_ARRAY:
           return new FixedReader(desc);
         case BINARY:
-          return new BytesReader(desc);
+          return new ParquetValueReaders.BytesReader(desc);
         case INT32:
-          if (expected != null && expected.typeId() == TypeID.LONG) {
-            return new IntAsLongReader(desc);
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+            return new ParquetValueReaders.IntAsLongReader(desc);
           } else {
-            return new UnboxedReader<>(desc);
+            return new ParquetValueReaders.UnboxedReader<>(desc);
           }
         case FLOAT:
-          if (expected != null && expected.typeId() == TypeID.DOUBLE) {
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
             return new ParquetValueReaders.FloatAsDoubleReader(desc);
           } else {
-            return new UnboxedReader<>(desc);
+            return new ParquetValueReaders.UnboxedReader<>(desc);
           }
         case BOOLEAN:
         case INT64:
         case DOUBLE:
-          return new UnboxedReader<>(desc);
+          return new ParquetValueReaders.UnboxedReader<>(desc);
         default:
           throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
       }
@@ -310,7 +293,7 @@ public class GenericParquetReaders {
   private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
   private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
 
-  private static class DateReader extends PrimitiveReader<LocalDate> {
+  private static class DateReader extends 
ParquetValueReaders.PrimitiveReader<LocalDate> {
     private DateReader(ColumnDescriptor desc) {
       super(desc);
     }
@@ -321,7 +304,7 @@ public class GenericParquetReaders {
     }
   }
 
-  private static class TimestampReader extends PrimitiveReader<LocalDateTime> {
+  private static class TimestampReader extends 
ParquetValueReaders.PrimitiveReader<LocalDateTime> {
     private TimestampReader(ColumnDescriptor desc) {
       super(desc);
     }
@@ -332,7 +315,7 @@ public class GenericParquetReaders {
     }
   }
 
-  private static class TimestampMillisReader extends 
PrimitiveReader<LocalDateTime> {
+  private static class TimestampMillisReader extends 
ParquetValueReaders.PrimitiveReader<LocalDateTime> {
     private TimestampMillisReader(ColumnDescriptor desc) {
       super(desc);
     }
@@ -343,7 +326,7 @@ public class GenericParquetReaders {
     }
   }
 
-  private static class TimestamptzReader extends 
PrimitiveReader<OffsetDateTime> {
+  private static class TimestamptzReader extends 
ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
     private TimestamptzReader(ColumnDescriptor desc) {
       super(desc);
     }
@@ -354,7 +337,7 @@ public class GenericParquetReaders {
     }
   }
 
-  private static class TimestamptzMillisReader extends 
PrimitiveReader<OffsetDateTime> {
+  private static class TimestamptzMillisReader extends 
ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
     private TimestamptzMillisReader(ColumnDescriptor desc) {
       super(desc);
     }
@@ -365,7 +348,7 @@ public class GenericParquetReaders {
     }
   }
 
-  private static class TimeMillisReader extends PrimitiveReader<LocalTime> {
+  private static class TimeMillisReader extends 
ParquetValueReaders.PrimitiveReader<LocalTime> {
     private TimeMillisReader(ColumnDescriptor desc) {
       super(desc);
     }
@@ -376,7 +359,7 @@ public class GenericParquetReaders {
     }
   }
 
-  private static class TimeReader extends PrimitiveReader<LocalTime> {
+  private static class TimeReader extends 
ParquetValueReaders.PrimitiveReader<LocalTime> {
     private TimeReader(ColumnDescriptor desc) {
       super(desc);
     }
@@ -387,7 +370,7 @@ public class GenericParquetReaders {
     }
   }
 
-  private static class FixedReader extends PrimitiveReader<byte[]> {
+  private static class FixedReader extends 
ParquetValueReaders.PrimitiveReader<byte[]> {
     private FixedReader(ColumnDescriptor desc) {
       super(desc);
     }
@@ -402,39 +385,4 @@ public class GenericParquetReaders {
       }
     }
   }
-
-  static class RecordReader extends StructReader<Record, Record> {
-    private final StructType structType;
-
-    RecordReader(List<Type> types,
-                 List<ParquetValueReader<?>> readers,
-                 StructType struct) {
-      super(types, readers);
-      this.structType = struct;
-    }
-
-    @Override
-    protected Record newStructData(Record reuse) {
-      if (reuse != null) {
-        return reuse;
-      } else {
-        return GenericRecord.create(structType);
-      }
-    }
-
-    @Override
-    protected Object getField(Record intermediate, int pos) {
-      return intermediate.get(pos);
-    }
-
-    @Override
-    protected Record buildStruct(Record struct) {
-      return struct;
-    }
-
-    @Override
-    protected void set(Record struct, int pos, Object value) {
-      struct.set(pos, value);
-    }
-  }
 }
diff --git 
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java 
b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java
similarity index 75%
copy from 
data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
copy to 
data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java
index 4cdb2de..282a92d 100644
--- 
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
+++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java
@@ -28,49 +28,40 @@ import java.time.ZoneOffset;
 import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Optional;
-import org.apache.iceberg.data.Record;
 import org.apache.iceberg.parquet.ParquetTypeVisitor;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
-import org.apache.iceberg.parquet.ParquetValueWriters.PrimitiveWriter;
-import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
-import 
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class GenericParquetWriter {
-  protected GenericParquetWriter() {
-  }
+public abstract class BaseParquetWriter<T> {
 
   @SuppressWarnings("unchecked")
-  public static ParquetValueWriter<Record> buildWriter(MessageType type) {
-    return (ParquetValueWriter<Record>) ParquetTypeVisitor.visit(type, new 
WriteBuilder(type));
+  protected ParquetValueWriter<T> createWriter(MessageType type) {
+    return (ParquetValueWriter<T>) ParquetTypeVisitor.visit(type, new 
WriteBuilder(type));
   }
 
-  protected static class WriteBuilder extends 
ParquetTypeVisitor<ParquetValueWriter<?>> {
+  protected abstract ParquetValueWriters.StructWriter<T> 
createStructWriter(List<ParquetValueWriter<?>> writers);
+
+  private class WriteBuilder extends ParquetTypeVisitor<ParquetValueWriter<?>> 
{
     private final MessageType type;
 
-    protected WriteBuilder(MessageType type) {
+    private WriteBuilder(MessageType type) {
       this.type = type;
     }
 
     @Override
-    public ParquetValueWriter<?> message(MessageType message,
-                                         List<ParquetValueWriter<?>> 
fieldWriters) {
+    public ParquetValueWriter<?> message(MessageType message, 
List<ParquetValueWriter<?>> fieldWriters) {
       return struct(message.asGroupType(), fieldWriters);
     }
 
-    protected StructWriter<?> createStructWriter(List<ParquetValueWriter<?>> 
writers) {
-      return new RecordWriter(writers);
-    }
-
     @Override
     public ParquetValueWriter<?> struct(GroupType struct,
                                         List<ParquetValueWriter<?>> 
fieldWriters) {
@@ -125,7 +116,8 @@ public class GenericParquetWriter {
       ColumnDescriptor desc = type.getColumnDescription(currentPath());
       LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation();
       if (logicalType != null) {
-        Optional<PrimitiveWriter<?>> writer = logicalType.accept(new 
LogicalTypeWriterVisitor(desc));
+        Optional<ParquetValueWriters.PrimitiveWriter<?>> writer =
+            logicalType.accept(new LogicalTypeWriterVisitor(desc));
         if (writer.isPresent()) {
           return writer.get();
         }
@@ -152,7 +144,8 @@ public class GenericParquetWriter {
     }
   }
 
-  private static class LogicalTypeWriterVisitor implements 
LogicalTypeAnnotationVisitor<PrimitiveWriter<?>> {
+  private static class LogicalTypeWriterVisitor implements
+      
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueWriters.PrimitiveWriter<?>>
 {
     private final ColumnDescriptor desc;
 
     private LogicalTypeWriterVisitor(ColumnDescriptor desc) {
@@ -160,17 +153,20 @@ public class GenericParquetWriter {
     }
 
     @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
+    public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+        LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
       return Optional.of(ParquetValueWriters.strings(desc));
     }
 
     @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
+    public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+        LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
       return Optional.of(ParquetValueWriters.strings(desc));
     }
 
     @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
+    public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+        LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
       switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
         case INT32:
           return Optional.of(ParquetValueWriters.decimalAsInteger(
@@ -187,17 +183,20 @@ public class GenericParquetWriter {
     }
 
     @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
+    public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+        LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
       return Optional.of(new DateWriter(desc));
     }
 
     @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
+    public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+        LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
       return Optional.of(new TimeWriter(desc));
     }
 
     @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
+    public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+        LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
       
Preconditions.checkArgument(LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
           "Cannot write timestamp in %s, only MICROS is supported", 
timestampType.getUnit());
       if (timestampType.isAdjustedToUTC()) {
@@ -208,7 +207,8 @@ public class GenericParquetWriter {
     }
 
     @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
+    public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+        LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
       Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth() 
< 64,
           "Cannot read uint64: not a supported Java type");
       if (intType.getBitWidth() < 64) {
@@ -219,12 +219,14 @@ public class GenericParquetWriter {
     }
 
     @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+    public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+        LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
       return Optional.of(ParquetValueWriters.strings(desc));
     }
 
     @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
+    public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+        LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
       return Optional.of(ParquetValueWriters.byteBuffers(desc));
     }
   }
@@ -232,7 +234,7 @@ public class GenericParquetWriter {
   private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
   private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
 
-  private static class DateWriter extends PrimitiveWriter<LocalDate> {
+  private static class DateWriter extends 
ParquetValueWriters.PrimitiveWriter<LocalDate> {
     private DateWriter(ColumnDescriptor desc) {
       super(desc);
     }
@@ -243,7 +245,7 @@ public class GenericParquetWriter {
     }
   }
 
-  private static class TimeWriter extends PrimitiveWriter<LocalTime> {
+  private static class TimeWriter extends 
ParquetValueWriters.PrimitiveWriter<LocalTime> {
     private TimeWriter(ColumnDescriptor desc) {
       super(desc);
     }
@@ -254,7 +256,7 @@ public class GenericParquetWriter {
     }
   }
 
-  private static class TimestampWriter extends PrimitiveWriter<LocalDateTime> {
+  private static class TimestampWriter extends 
ParquetValueWriters.PrimitiveWriter<LocalDateTime> {
     private TimestampWriter(ColumnDescriptor desc) {
       super(desc);
     }
@@ -266,7 +268,7 @@ public class GenericParquetWriter {
     }
   }
 
-  private static class TimestamptzWriter extends 
PrimitiveWriter<OffsetDateTime> {
+  private static class TimestamptzWriter extends 
ParquetValueWriters.PrimitiveWriter<OffsetDateTime> {
     private TimestamptzWriter(ColumnDescriptor desc) {
       super(desc);
     }
@@ -277,7 +279,7 @@ public class GenericParquetWriter {
     }
   }
 
-  private static class FixedWriter extends PrimitiveWriter<byte[]> {
+  private static class FixedWriter extends 
ParquetValueWriters.PrimitiveWriter<byte[]> {
     private FixedWriter(ColumnDescriptor desc) {
       super(desc);
     }
@@ -287,15 +289,4 @@ public class GenericParquetWriter {
       column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value));
     }
   }
-
-  private static class RecordWriter extends StructWriter<Record> {
-    private RecordWriter(List<ParquetValueWriter<?>> writers) {
-      super(writers);
-    }
-
-    @Override
-    protected Object get(Record struct, int index) {
-      return struct.get(index);
-    }
-  }
 }
diff --git 
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java 
b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
index 3e4e7c7..e8a914a 100644
--- 
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
+++ 
b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
@@ -19,391 +19,40 @@
 
 package org.apache.iceberg.data.parquet;
 
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
-import org.apache.iceberg.parquet.ParquetValueReaders;
-import org.apache.iceberg.parquet.ParquetValueReaders.BinaryAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.BytesReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.IntAsLongReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.IntegerAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.ListReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.LongAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.MapReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.PrimitiveReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.StringReader;
 import org.apache.iceberg.parquet.ParquetValueReaders.StructReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.UnboxedReader;
-import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Type.TypeID;
-import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.types.Types.TimestampType;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.schema.DecimalMetadata;
-import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class GenericParquetReaders {
-  protected GenericParquetReaders() {
-  }
-
-  public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
-                                                       MessageType fileSchema) 
{
-    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
-  }
-
-  @SuppressWarnings("unchecked")
-  public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
-                                                       MessageType fileSchema,
-                                                       Map<Integer, ?> 
idToConstant) {
-    if (ParquetSchemaUtil.hasIds(fileSchema)) {
-      return (ParquetValueReader<Record>)
-          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-              new ReadBuilder(fileSchema, idToConstant));
-    } else {
-      return (ParquetValueReader<Record>)
-          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-              new FallbackReadBuilder(fileSchema, idToConstant));
-    }
-  }
-
-  protected static class FallbackReadBuilder extends ReadBuilder {
-    protected FallbackReadBuilder(MessageType type, Map<Integer, ?> 
idToConstant) {
-      super(type, idToConstant);
-    }
-
-    @Override
-    public ParquetValueReader<?> message(StructType expected, MessageType 
message,
-                                         List<ParquetValueReader<?>> 
fieldReaders) {
-      // the top level matches by ID, but the remaining IDs are missing
-      return super.struct(expected, message, fieldReaders);
-    }
-
-    @Override
-    public ParquetValueReader<?> struct(StructType expected, GroupType struct,
-                                        List<ParquetValueReader<?>> 
fieldReaders) {
-      // the expected struct is ignored because nested fields are never found 
when the
-      List<ParquetValueReader<?>> newFields = 
Lists.newArrayListWithExpectedSize(
-          fieldReaders.size());
-      List<Type> types = 
Lists.newArrayListWithExpectedSize(fieldReaders.size());
-      List<Type> fields = struct.getFields();
-      for (int i = 0; i < fields.size(); i += 1) {
-        Type fieldType = fields.get(i);
-        int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 
1;
-        newFields.add(ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
-        types.add(fieldType);
-      }
-
-      return createStructReader(types, newFields, expected);
-    }
-  }
-
-  protected static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
-    private final MessageType type;
-    private final Map<Integer, ?> idToConstant;
-
-    protected ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
-      this.type = type;
-      this.idToConstant = idToConstant;
-    }
-
-    @Override
-    public ParquetValueReader<?> message(StructType expected, MessageType 
message,
-                                         List<ParquetValueReader<?>> 
fieldReaders) {
-      return struct(expected, message.asGroupType(), fieldReaders);
-    }
-
-    protected StructReader<?, ?> createStructReader(List<Type> types,
-                                                    
List<ParquetValueReader<?>> readers,
-                                                    StructType struct) {
-      return new RecordReader(types, readers, struct);
-    }
-
-    @Override
-    public ParquetValueReader<?> struct(StructType expected, GroupType struct,
-                                        List<ParquetValueReader<?>> 
fieldReaders) {
-      // match the expected struct's order
-      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
-      Map<Integer, Type> typesById = Maps.newHashMap();
-      List<Type> fields = struct.getFields();
-      for (int i = 0; i < fields.size(); i += 1) {
-        Type fieldType = fields.get(i);
-        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
-        int id = fieldType.getId().intValue();
-        readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
-        typesById.put(id, fieldType);
-      }
-
-      List<Types.NestedField> expectedFields = expected != null ?
-          expected.fields() : ImmutableList.of();
-      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
-          expectedFields.size());
-      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
-      for (Types.NestedField field : expectedFields) {
-        int id = field.fieldId();
-        if (idToConstant.containsKey(id)) {
-          // containsKey is used because the constant may be null
-          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
-          types.add(null);
-        } else {
-          ParquetValueReader<?> reader = readersById.get(id);
-          if (reader != null) {
-            reorderedFields.add(reader);
-            types.add(typesById.get(id));
-          } else {
-            reorderedFields.add(ParquetValueReaders.nulls());
-            types.add(null);
-          }
-        }
-      }
-
-      return createStructReader(types, reorderedFields, expected);
-    }
-
-    @Override
-    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
-                                      ParquetValueReader<?> elementReader) {
-      GroupType repeated = array.getFields().get(0).asGroupType();
-      String[] repeatedPath = currentPath();
-
-      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
-      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
-
-      Type elementType = repeated.getType(0);
-      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
-
-      return new ListReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
-    }
-
-    @Override
-    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
-                                     ParquetValueReader<?> keyReader,
-                                     ParquetValueReader<?> valueReader) {
-      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
-      String[] repeatedPath = currentPath();
-
-      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
-      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
-
-      Type keyType = repeatedKeyValue.getType(0);
-      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
-      Type valueType = repeatedKeyValue.getType(1);
-      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
-
-      return new MapReader<>(repeatedD, repeatedR,
-          ParquetValueReaders.option(keyType, keyD, keyReader),
-          ParquetValueReaders.option(valueType, valueD, valueReader));
-    }
-
-    @Override
-    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
-                                           PrimitiveType primitive) {
-      ColumnDescriptor desc = type.getColumnDescription(currentPath());
-
-      if (primitive.getOriginalType() != null) {
-        switch (primitive.getOriginalType()) {
-          case ENUM:
-          case JSON:
-          case UTF8:
-            return new StringReader(desc);
-          case INT_8:
-          case INT_16:
-          case INT_32:
-            if (expected.typeId() == TypeID.LONG) {
-              return new IntAsLongReader(desc);
-            } else {
-              return new UnboxedReader<>(desc);
-            }
-          case INT_64:
-            return new UnboxedReader<>(desc);
-          case DATE:
-            return new DateReader(desc);
-          case TIMESTAMP_MICROS:
-            TimestampType tsMicrosType = (TimestampType) expected;
-            if (tsMicrosType.shouldAdjustToUTC()) {
-              return new TimestamptzReader(desc);
-            } else {
-              return new TimestampReader(desc);
-            }
-          case TIMESTAMP_MILLIS:
-            TimestampType tsMillisType = (TimestampType) expected;
-            if (tsMillisType.shouldAdjustToUTC()) {
-              return new TimestamptzMillisReader(desc);
-            } else {
-              return new TimestampMillisReader(desc);
-            }
-          case TIME_MICROS:
-            return new TimeReader(desc);
-          case TIME_MILLIS:
-            return new TimeMillisReader(desc);
-          case DECIMAL:
-            DecimalMetadata decimal = primitive.getDecimalMetadata();
-            switch (primitive.getPrimitiveTypeName()) {
-              case BINARY:
-              case FIXED_LEN_BYTE_ARRAY:
-                return new BinaryAsDecimalReader(desc, decimal.getScale());
-              case INT64:
-                return new LongAsDecimalReader(desc, decimal.getScale());
-              case INT32:
-                return new IntegerAsDecimalReader(desc, decimal.getScale());
-              default:
-                throw new UnsupportedOperationException(
-                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
-            }
-          case BSON:
-            return new BytesReader(desc);
-          default:
-            throw new UnsupportedOperationException(
-                "Unsupported logical type: " + primitive.getOriginalType());
-        }
-      }
-
-      switch (primitive.getPrimitiveTypeName()) {
-        case FIXED_LEN_BYTE_ARRAY:
-          return new FixedReader(desc);
-        case BINARY:
-          return new BytesReader(desc);
-        case INT32:
-          if (expected != null && expected.typeId() == TypeID.LONG) {
-            return new IntAsLongReader(desc);
-          } else {
-            return new UnboxedReader<>(desc);
-          }
-        case FLOAT:
-          if (expected != null && expected.typeId() == TypeID.DOUBLE) {
-            return new ParquetValueReaders.FloatAsDoubleReader(desc);
-          } else {
-            return new UnboxedReader<>(desc);
-          }
-        case BOOLEAN:
-        case INT64:
-        case DOUBLE:
-          return new UnboxedReader<>(desc);
-        default:
-          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
-      }
-    }
-
-    MessageType type() {
-      return type;
-    }
-  }
-
-  private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
-  private static class DateReader extends PrimitiveReader<LocalDate> {
-    private DateReader(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public LocalDate read(LocalDate reuse) {
-      return EPOCH_DAY.plusDays(column.nextInteger());
-    }
-  }
-
-  private static class TimestampReader extends PrimitiveReader<LocalDateTime> {
-    private TimestampReader(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public LocalDateTime read(LocalDateTime reuse) {
-      return EPOCH.plus(column.nextLong(), 
ChronoUnit.MICROS).toLocalDateTime();
-    }
-  }
-
-  private static class TimestampMillisReader extends 
PrimitiveReader<LocalDateTime> {
-    private TimestampMillisReader(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public LocalDateTime read(LocalDateTime reuse) {
-      return EPOCH.plus(column.nextLong() * 1000, 
ChronoUnit.MICROS).toLocalDateTime();
-    }
-  }
-
-  private static class TimestamptzReader extends 
PrimitiveReader<OffsetDateTime> {
-    private TimestamptzReader(ColumnDescriptor desc) {
-      super(desc);
-    }
+public class GenericParquetReaders extends BaseParquetReaders<Record> {
 
-    @Override
-    public OffsetDateTime read(OffsetDateTime reuse) {
-      return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
-    }
-  }
+  private static final GenericParquetReaders INSTANCE = new 
GenericParquetReaders();
 
-  private static class TimestamptzMillisReader extends 
PrimitiveReader<OffsetDateTime> {
-    private TimestamptzMillisReader(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public OffsetDateTime read(OffsetDateTime reuse) {
-      return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
-    }
+  private GenericParquetReaders() {
   }
 
-  private static class TimeMillisReader extends PrimitiveReader<LocalTime> {
-    private TimeMillisReader(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public LocalTime read(LocalTime reuse) {
-      return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
-    }
+  public static ParquetValueReader<Record> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return INSTANCE.createReader(expectedSchema, fileSchema);
   }
 
-  private static class TimeReader extends PrimitiveReader<LocalTime> {
-    private TimeReader(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public LocalTime read(LocalTime reuse) {
-      return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
-    }
+  public static ParquetValueReader<Record> buildReader(Schema expectedSchema, 
MessageType fileSchema,
+                                                       Map<Integer, ?> 
idToConstant) {
+    return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
   }
 
-  private static class FixedReader extends PrimitiveReader<byte[]> {
-    private FixedReader(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public byte[] read(byte[] reuse) {
-      if (reuse != null) {
-        column.nextBinary().toByteBuffer().duplicate().get(reuse);
-        return reuse;
-      } else {
-        return column.nextBinary().getBytes();
-      }
-    }
+  @Override
+  protected ParquetValueReader<Record> createStructReader(List<Type> types, 
List<ParquetValueReader<?>> fieldReaders,
+                                                          StructType 
structType) {
+    return new RecordReader(types, fieldReaders, structType);
   }
 
-  static class RecordReader extends StructReader<Record, Record> {
+  private static class RecordReader extends StructReader<Record, Record> {
     private final StructType structType;
 
     RecordReader(List<Type> types,
diff --git 
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java 
b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
index 4cdb2de..5bb878f 100644
--- 
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
+++ 
b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
@@ -19,273 +19,25 @@
 
 package org.apache.iceberg.data.parquet;
 
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
 import java.util.List;
-import java.util.Optional;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.parquet.ParquetTypeVisitor;
 import org.apache.iceberg.parquet.ParquetValueWriter;
-import org.apache.iceberg.parquet.ParquetValueWriters;
-import org.apache.iceberg.parquet.ParquetValueWriters.PrimitiveWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.LogicalTypeAnnotation;
-import 
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
 
-public class GenericParquetWriter {
-  protected GenericParquetWriter() {
-  }
-
-  @SuppressWarnings("unchecked")
-  public static ParquetValueWriter<Record> buildWriter(MessageType type) {
-    return (ParquetValueWriter<Record>) ParquetTypeVisitor.visit(type, new 
WriteBuilder(type));
-  }
-
-  protected static class WriteBuilder extends 
ParquetTypeVisitor<ParquetValueWriter<?>> {
-    private final MessageType type;
-
-    protected WriteBuilder(MessageType type) {
-      this.type = type;
-    }
-
-    @Override
-    public ParquetValueWriter<?> message(MessageType message,
-                                         List<ParquetValueWriter<?>> 
fieldWriters) {
-      return struct(message.asGroupType(), fieldWriters);
-    }
-
-    protected StructWriter<?> createStructWriter(List<ParquetValueWriter<?>> 
writers) {
-      return new RecordWriter(writers);
-    }
-
-    @Override
-    public ParquetValueWriter<?> struct(GroupType struct,
-                                        List<ParquetValueWriter<?>> 
fieldWriters) {
-      List<Type> fields = struct.getFields();
-      List<ParquetValueWriter<?>> writers = 
Lists.newArrayListWithExpectedSize(fieldWriters.size());
-      for (int i = 0; i < fields.size(); i += 1) {
-        Type fieldType = struct.getType(i);
-        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName()));
-        writers.add(ParquetValueWriters.option(fieldType, fieldD, 
fieldWriters.get(i)));
-      }
-
-      return createStructWriter(writers);
-    }
-
-    @Override
-    public ParquetValueWriter<?> list(GroupType array, ParquetValueWriter<?> 
elementWriter) {
-      GroupType repeated = array.getFields().get(0).asGroupType();
-      String[] repeatedPath = currentPath();
-
-      int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
-      int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
-
-      org.apache.parquet.schema.Type elementType = repeated.getType(0);
-      int elementD = type.getMaxDefinitionLevel(path(elementType.getName()));
-
-      return ParquetValueWriters.collections(repeatedD, repeatedR,
-          ParquetValueWriters.option(elementType, elementD, elementWriter));
-    }
-
-    @Override
-    public ParquetValueWriter<?> map(GroupType map,
-                                     ParquetValueWriter<?> keyWriter,
-                                     ParquetValueWriter<?> valueWriter) {
-      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
-      String[] repeatedPath = currentPath();
-
-      int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
-      int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
-
-      org.apache.parquet.schema.Type keyType = repeatedKeyValue.getType(0);
-      int keyD = type.getMaxDefinitionLevel(path(keyType.getName()));
-      org.apache.parquet.schema.Type valueType = repeatedKeyValue.getType(1);
-      int valueD = type.getMaxDefinitionLevel(path(valueType.getName()));
-
-      return ParquetValueWriters.maps(repeatedD, repeatedR,
-          ParquetValueWriters.option(keyType, keyD, keyWriter),
-          ParquetValueWriters.option(valueType, valueD, valueWriter));
-    }
-
-    @Override
-    public ParquetValueWriter<?> primitive(PrimitiveType primitive) {
-      ColumnDescriptor desc = type.getColumnDescription(currentPath());
-      LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation();
-      if (logicalType != null) {
-        Optional<PrimitiveWriter<?>> writer = logicalType.accept(new 
LogicalTypeWriterVisitor(desc));
-        if (writer.isPresent()) {
-          return writer.get();
-        }
-      }
-
-      switch (primitive.getPrimitiveTypeName()) {
-        case FIXED_LEN_BYTE_ARRAY:
-          return new FixedWriter(desc);
-        case BINARY:
-          return ParquetValueWriters.byteBuffers(desc);
-        case BOOLEAN:
-          return ParquetValueWriters.booleans(desc);
-        case INT32:
-          return ParquetValueWriters.ints(desc);
-        case INT64:
-          return ParquetValueWriters.longs(desc);
-        case FLOAT:
-          return ParquetValueWriters.floats(desc);
-        case DOUBLE:
-          return ParquetValueWriters.doubles(desc);
-        default:
-          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
-      }
-    }
-  }
-
-  private static class LogicalTypeWriterVisitor implements 
LogicalTypeAnnotationVisitor<PrimitiveWriter<?>> {
-    private final ColumnDescriptor desc;
-
-    private LogicalTypeWriterVisitor(ColumnDescriptor desc) {
-      this.desc = desc;
-    }
-
-    @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
-      return Optional.of(ParquetValueWriters.strings(desc));
-    }
-
-    @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
-      return Optional.of(ParquetValueWriters.strings(desc));
-    }
-
-    @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
-      switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
-        case INT32:
-          return Optional.of(ParquetValueWriters.decimalAsInteger(
-              desc, decimalType.getPrecision(), decimalType.getScale()));
-        case INT64:
-          return Optional.of(ParquetValueWriters.decimalAsLong(
-              desc, decimalType.getPrecision(), decimalType.getScale()));
-        case BINARY:
-        case FIXED_LEN_BYTE_ARRAY:
-          return Optional.of(ParquetValueWriters.decimalAsFixed(
-              desc, decimalType.getPrecision(), decimalType.getScale()));
-      }
-      return Optional.empty();
-    }
+public class GenericParquetWriter extends BaseParquetWriter<Record> {
+  private static final GenericParquetWriter INSTANCE = new 
GenericParquetWriter();
 
-    @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
-      return Optional.of(new DateWriter(desc));
-    }
-
-    @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
-      return Optional.of(new TimeWriter(desc));
-    }
-
-    @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
-      
Preconditions.checkArgument(LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
-          "Cannot write timestamp in %s, only MICROS is supported", 
timestampType.getUnit());
-      if (timestampType.isAdjustedToUTC()) {
-        return Optional.of(new TimestamptzWriter(desc));
-      } else {
-        return Optional.of(new TimestampWriter(desc));
-      }
-    }
-
-    @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
-      Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth() 
< 64,
-          "Cannot read uint64: not a supported Java type");
-      if (intType.getBitWidth() < 64) {
-        return Optional.of(ParquetValueWriters.ints(desc));
-      } else {
-        return Optional.of(ParquetValueWriters.longs(desc));
-      }
-    }
-
-    @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
-      return Optional.of(ParquetValueWriters.strings(desc));
-    }
-
-    @Override
-    public Optional<PrimitiveWriter<?>> 
visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
-      return Optional.of(ParquetValueWriters.byteBuffers(desc));
-    }
+  private GenericParquetWriter() {
   }
 
-  private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
-  private static class DateWriter extends PrimitiveWriter<LocalDate> {
-    private DateWriter(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public void write(int repetitionLevel, LocalDate value) {
-      column.writeInteger(repetitionLevel, (int) 
ChronoUnit.DAYS.between(EPOCH_DAY, value));
-    }
-  }
-
-  private static class TimeWriter extends PrimitiveWriter<LocalTime> {
-    private TimeWriter(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public void write(int repetitionLevel, LocalTime value) {
-      column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000);
-    }
-  }
-
-  private static class TimestampWriter extends PrimitiveWriter<LocalDateTime> {
-    private TimestampWriter(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public void write(int repetitionLevel, LocalDateTime value) {
-      column.writeLong(repetitionLevel,
-          ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC)));
-    }
-  }
-
-  private static class TimestamptzWriter extends 
PrimitiveWriter<OffsetDateTime> {
-    private TimestamptzWriter(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public void write(int repetitionLevel, OffsetDateTime value) {
-      column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, 
value));
-    }
+  public static ParquetValueWriter<Record> buildWriter(MessageType type) {
+    return INSTANCE.createWriter(type);
   }
 
-  private static class FixedWriter extends PrimitiveWriter<byte[]> {
-    private FixedWriter(ColumnDescriptor desc) {
-      super(desc);
-    }
-
-    @Override
-    public void write(int repetitionLevel, byte[] value) {
-      column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value));
-    }
+  @Override
+  protected StructWriter<Record> 
createStructWriter(List<ParquetValueWriter<?>> writers) {
+    return new RecordWriter(writers);
   }
 
   private static class RecordWriter extends StructWriter<Record> {
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java 
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index e9d9bc0..ccea5d6 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -20,66 +20,34 @@
 package org.apache.iceberg.flink.data;
 
 import java.util.List;
-import java.util.Map;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.data.parquet.BaseParquetReaders;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
-import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends GenericParquetReaders {
-  private FlinkParquetReaders() {
-  }
-
-  @SuppressWarnings("unchecked")
-  public static ParquetValueReader<Row> buildRowReader(Schema expectedSchema,
-                                                       MessageType fileSchema) 
{
-    if (ParquetSchemaUtil.hasIds(fileSchema)) {
-      return (ParquetValueReader<Row>)
-          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-              new ReadBuilder(fileSchema, ImmutableMap.of()));
-    } else {
-      return (ParquetValueReader<Row>)
-          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-              new FallbackReadBuilder(fileSchema, ImmutableMap.of()));
-    }
-  }
-
-  private static class FallbackReadBuilder extends 
GenericParquetReaders.FallbackReadBuilder {
+public class FlinkParquetReaders extends BaseParquetReaders<Row> {
 
-    private FallbackReadBuilder(MessageType type, Map<Integer, ?> 
idToConstant) {
-      super(type, idToConstant);
-    }
+  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
 
-    @Override
-    protected ParquetValueReaders.StructReader<?, ?> 
createStructReader(List<Type> types,
-                                                                        
List<ParquetValueReader<?>> readers,
-                                                                        
Types.StructType struct) {
-      return new RowReader(types, readers, struct);
-    }
+  private FlinkParquetReaders() {
   }
 
-  private static class ReadBuilder extends GenericParquetReaders.ReadBuilder {
-
-    private ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
-      super(type, idToConstant);
-    }
+  public static ParquetValueReader<Row> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return INSTANCE.createReader(expectedSchema, fileSchema);
+  }
 
-    @Override
-    protected ParquetValueReaders.StructReader<Row, Row> 
createStructReader(List<Type> types,
-                                                                            
List<ParquetValueReader<?>> readers,
-                                                                            
Types.StructType struct) {
-      return new RowReader(types, readers, struct);
-    }
+  @Override
+  protected ParquetValueReader<Row> createStructReader(List<Type> types,
+                                                       
List<ParquetValueReader<?>> fieldReaders,
+                                                       Types.StructType 
structType) {
+    return new RowReader(types, fieldReaders, structType);
   }
 
-  static class RowReader extends ParquetValueReaders.StructReader<Row, Row> {
+  private static class RowReader extends ParquetValueReaders.StructReader<Row, 
Row> {
     private final Types.StructType structType;
 
     RowReader(List<Type> types, List<ParquetValueReader<?>> readers, 
Types.StructType struct) {
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java 
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 2e36b4a..54b4fea 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -21,31 +21,25 @@ package org.apache.iceberg.flink.data;
 
 import java.util.List;
 import org.apache.flink.types.Row;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.parquet.ParquetTypeVisitor;
+import org.apache.iceberg.data.parquet.BaseParquetWriter;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
 import org.apache.parquet.schema.MessageType;
 
-public class FlinkParquetWriters extends GenericParquetWriter {
+public class FlinkParquetWriters extends BaseParquetWriter<Row> {
+
+  private static final FlinkParquetWriters INSTANCE = new 
FlinkParquetWriters();
+
   private FlinkParquetWriters() {
   }
 
-  @SuppressWarnings("unchecked")
-  public static ParquetValueWriter<Row> buildRowWriter(MessageType type) {
-    return (ParquetValueWriter<Row>) ParquetTypeVisitor.visit(type, new 
WriteBuilder(type));
+  public static ParquetValueWriter<Row> buildWriter(MessageType type) {
+    return INSTANCE.createWriter(type);
   }
 
-  private static class WriteBuilder extends GenericParquetWriter.WriteBuilder {
-
-    private WriteBuilder(MessageType type) {
-      super(type);
-    }
-
-    @Override
-    protected ParquetValueWriters.StructWriter<Row> 
createStructWriter(List<ParquetValueWriter<?>> writers) {
-      return new RowWriter(writers);
-    }
+  @Override
+  protected ParquetValueWriters.StructWriter<Row> 
createStructWriter(List<ParquetValueWriter<?>> writers) {
+    return new RowWriter(writers);
   }
 
   private static class RowWriter extends ParquetValueWriters.StructWriter<Row> 
{
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
 
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
index f8bf6a5..e0cb3f0 100644
--- 
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
+++ 
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
@@ -76,14 +76,14 @@ public class TestFlinkParquetReaderWriter {
 
     try (FileAppender<Row> writer = Parquet.write(Files.localOutput(testFile))
         .schema(schema)
-        .createWriterFunc(FlinkParquetWriters::buildRowWriter)
+        .createWriterFunc(FlinkParquetWriters::buildWriter)
         .build()) {
       writer.addAll(iterable);
     }
 
     try (CloseableIterable<Row> reader = 
Parquet.read(Files.localInput(testFile))
         .project(schema)
-        .createReaderFunc(type -> FlinkParquetReaders.buildRowReader(schema, 
type))
+        .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, 
type))
         .build()) {
       Iterator<Row> expected = iterable.iterator();
       Iterator<Row> rows = reader.iterator();

Reply via email to