This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 41cc133 Add abstract BaseParquetReaders for Iceberg generics and
Flink (#1162)
41cc133 is described below
commit 41cc1334bfe50c7dccb58baeb664b236732f0e2b
Author: openinx <[email protected]>
AuthorDate: Fri Jul 3 03:42:27 2020 +0800
Add abstract BaseParquetReaders for Iceberg generics and Flink (#1162)
---
...ParquetReaders.java => BaseParquetReaders.java} | 154 +++------
...icParquetWriter.java => BaseParquetWriter.java} | 79 ++---
.../data/parquet/GenericParquetReaders.java | 377 +--------------------
.../iceberg/data/parquet/GenericParquetWriter.java | 264 +--------------
.../iceberg/flink/data/FlinkParquetReaders.java | 58 +---
.../iceberg/flink/data/FlinkParquetWriters.java | 26 +-
.../flink/data/TestFlinkParquetReaderWriter.java | 4 +-
7 files changed, 132 insertions(+), 830 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
similarity index 68%
copy from
data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
copy to
data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
index 3e4e7c7..36d0144 100644
---
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
@@ -29,31 +29,15 @@ import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
-import org.apache.iceberg.parquet.ParquetValueReaders.BinaryAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.BytesReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.IntAsLongReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.IntegerAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.ListReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.LongAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.MapReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.PrimitiveReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.StringReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.StructReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.UnboxedReader;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.types.Types.TimestampType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
@@ -61,44 +45,48 @@ import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class GenericParquetReaders {
- protected GenericParquetReaders() {
+public abstract class BaseParquetReaders<T> {
+ protected BaseParquetReaders() {
}
- public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
- MessageType fileSchema)
{
- return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ protected ParquetValueReader<T> createReader(Schema expectedSchema,
+ MessageType fileSchema) {
+ return createReader(expectedSchema, fileSchema, ImmutableMap.of());
}
@SuppressWarnings("unchecked")
- public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
- MessageType fileSchema,
- Map<Integer, ?>
idToConstant) {
+ protected ParquetValueReader<T> createReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?> idToConstant) {
if (ParquetSchemaUtil.hasIds(fileSchema)) {
- return (ParquetValueReader<Record>)
+ return (ParquetValueReader<T>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new ReadBuilder(fileSchema, idToConstant));
} else {
- return (ParquetValueReader<Record>)
+ return (ParquetValueReader<T>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new FallbackReadBuilder(fileSchema, idToConstant));
}
}
- protected static class FallbackReadBuilder extends ReadBuilder {
- protected FallbackReadBuilder(MessageType type, Map<Integer, ?>
idToConstant) {
+ protected abstract ParquetValueReader<T> createStructReader(List<Type> types,
+
List<ParquetValueReader<?>> fieldReaders,
+ Types.StructType
structType);
+
+ private class FallbackReadBuilder extends ReadBuilder {
+ private FallbackReadBuilder(MessageType type, Map<Integer, ?>
idToConstant) {
super(type, idToConstant);
}
@Override
- public ParquetValueReader<?> message(StructType expected, MessageType
message,
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
List<ParquetValueReader<?>>
fieldReaders) {
// the top level matches by ID, but the remaining IDs are missing
return super.struct(expected, message, fieldReaders);
}
@Override
- public ParquetValueReader<?> struct(StructType expected, GroupType struct,
+ public ParquetValueReader<?> struct(Types.StructType expected, GroupType
struct,
List<ParquetValueReader<?>>
fieldReaders) {
// the expected struct is ignored because nested fields are never found
when the
List<ParquetValueReader<?>> newFields =
Lists.newArrayListWithExpectedSize(
@@ -116,29 +104,23 @@ public class GenericParquetReaders {
}
}
- protected static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
private final Map<Integer, ?> idToConstant;
- protected ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ private ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
this.type = type;
this.idToConstant = idToConstant;
}
@Override
- public ParquetValueReader<?> message(StructType expected, MessageType
message,
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
List<ParquetValueReader<?>>
fieldReaders) {
return struct(expected, message.asGroupType(), fieldReaders);
}
- protected StructReader<?, ?> createStructReader(List<Type> types,
-
List<ParquetValueReader<?>> readers,
- StructType struct) {
- return new RecordReader(types, readers, struct);
- }
-
@Override
- public ParquetValueReader<?> struct(StructType expected, GroupType struct,
+ public ParquetValueReader<?> struct(Types.StructType expected, GroupType
struct,
List<ParquetValueReader<?>>
fieldReaders) {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
@@ -190,7 +172,8 @@ public class GenericParquetReaders {
Type elementType = repeated.getType(0);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) -
1;
- return new ListReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(elementType, elementD, elementReader));
+ return new ParquetValueReaders.ListReader<>(repeatedD, repeatedR,
+ ParquetValueReaders.option(elementType, elementD, elementReader));
}
@Override
@@ -208,7 +191,7 @@ public class GenericParquetReaders {
Type valueType = repeatedKeyValue.getType(1);
int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
- return new MapReader<>(repeatedD, repeatedR,
+ return new ParquetValueReaders.MapReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(keyType, keyD, keyReader),
ParquetValueReaders.option(valueType, valueD, valueReader));
}
@@ -223,28 +206,28 @@ public class GenericParquetReaders {
case ENUM:
case JSON:
case UTF8:
- return new StringReader(desc);
+ return new ParquetValueReaders.StringReader(desc);
case INT_8:
case INT_16:
case INT_32:
- if (expected.typeId() == TypeID.LONG) {
- return new IntAsLongReader(desc);
+ if (expected.typeId() ==
org.apache.iceberg.types.Type.TypeID.LONG) {
+ return new ParquetValueReaders.IntAsLongReader(desc);
} else {
- return new UnboxedReader<>(desc);
+ return new ParquetValueReaders.UnboxedReader<>(desc);
}
case INT_64:
- return new UnboxedReader<>(desc);
+ return new ParquetValueReaders.UnboxedReader<>(desc);
case DATE:
return new DateReader(desc);
case TIMESTAMP_MICROS:
- TimestampType tsMicrosType = (TimestampType) expected;
+ Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
if (tsMicrosType.shouldAdjustToUTC()) {
return new TimestamptzReader(desc);
} else {
return new TimestampReader(desc);
}
case TIMESTAMP_MILLIS:
- TimestampType tsMillisType = (TimestampType) expected;
+ Types.TimestampType tsMillisType = (Types.TimestampType) expected;
if (tsMillisType.shouldAdjustToUTC()) {
return new TimestamptzMillisReader(desc);
} else {
@@ -259,17 +242,17 @@ public class GenericParquetReaders {
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
- return new BinaryAsDecimalReader(desc, decimal.getScale());
+ return new ParquetValueReaders.BinaryAsDecimalReader(desc,
decimal.getScale());
case INT64:
- return new LongAsDecimalReader(desc, decimal.getScale());
+ return new ParquetValueReaders.LongAsDecimalReader(desc,
decimal.getScale());
case INT32:
- return new IntegerAsDecimalReader(desc, decimal.getScale());
+ return new ParquetValueReaders.IntegerAsDecimalReader(desc,
decimal.getScale());
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
}
case BSON:
- return new BytesReader(desc);
+ return new ParquetValueReaders.BytesReader(desc);
default:
throw new UnsupportedOperationException(
"Unsupported logical type: " + primitive.getOriginalType());
@@ -280,23 +263,23 @@ public class GenericParquetReaders {
case FIXED_LEN_BYTE_ARRAY:
return new FixedReader(desc);
case BINARY:
- return new BytesReader(desc);
+ return new ParquetValueReaders.BytesReader(desc);
case INT32:
- if (expected != null && expected.typeId() == TypeID.LONG) {
- return new IntAsLongReader(desc);
+ if (expected != null && expected.typeId() ==
org.apache.iceberg.types.Type.TypeID.LONG) {
+ return new ParquetValueReaders.IntAsLongReader(desc);
} else {
- return new UnboxedReader<>(desc);
+ return new ParquetValueReaders.UnboxedReader<>(desc);
}
case FLOAT:
- if (expected != null && expected.typeId() == TypeID.DOUBLE) {
+ if (expected != null && expected.typeId() ==
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
return new ParquetValueReaders.FloatAsDoubleReader(desc);
} else {
- return new UnboxedReader<>(desc);
+ return new ParquetValueReaders.UnboxedReader<>(desc);
}
case BOOLEAN:
case INT64:
case DOUBLE:
- return new UnboxedReader<>(desc);
+ return new ParquetValueReaders.UnboxedReader<>(desc);
default:
throw new UnsupportedOperationException("Unsupported type: " +
primitive);
}
@@ -310,7 +293,7 @@ public class GenericParquetReaders {
private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
- private static class DateReader extends PrimitiveReader<LocalDate> {
+ private static class DateReader extends
ParquetValueReaders.PrimitiveReader<LocalDate> {
private DateReader(ColumnDescriptor desc) {
super(desc);
}
@@ -321,7 +304,7 @@ public class GenericParquetReaders {
}
}
- private static class TimestampReader extends PrimitiveReader<LocalDateTime> {
+ private static class TimestampReader extends
ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampReader(ColumnDescriptor desc) {
super(desc);
}
@@ -332,7 +315,7 @@ public class GenericParquetReaders {
}
}
- private static class TimestampMillisReader extends
PrimitiveReader<LocalDateTime> {
+ private static class TimestampMillisReader extends
ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}
@@ -343,7 +326,7 @@ public class GenericParquetReaders {
}
}
- private static class TimestamptzReader extends
PrimitiveReader<OffsetDateTime> {
+ private static class TimestamptzReader extends
ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}
@@ -354,7 +337,7 @@ public class GenericParquetReaders {
}
}
- private static class TimestamptzMillisReader extends
PrimitiveReader<OffsetDateTime> {
+ private static class TimestamptzMillisReader extends
ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}
@@ -365,7 +348,7 @@ public class GenericParquetReaders {
}
}
- private static class TimeMillisReader extends PrimitiveReader<LocalTime> {
+ private static class TimeMillisReader extends
ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}
@@ -376,7 +359,7 @@ public class GenericParquetReaders {
}
}
- private static class TimeReader extends PrimitiveReader<LocalTime> {
+ private static class TimeReader extends
ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeReader(ColumnDescriptor desc) {
super(desc);
}
@@ -387,7 +370,7 @@ public class GenericParquetReaders {
}
}
- private static class FixedReader extends PrimitiveReader<byte[]> {
+ private static class FixedReader extends
ParquetValueReaders.PrimitiveReader<byte[]> {
private FixedReader(ColumnDescriptor desc) {
super(desc);
}
@@ -402,39 +385,4 @@ public class GenericParquetReaders {
}
}
}
-
- static class RecordReader extends StructReader<Record, Record> {
- private final StructType structType;
-
- RecordReader(List<Type> types,
- List<ParquetValueReader<?>> readers,
- StructType struct) {
- super(types, readers);
- this.structType = struct;
- }
-
- @Override
- protected Record newStructData(Record reuse) {
- if (reuse != null) {
- return reuse;
- } else {
- return GenericRecord.create(structType);
- }
- }
-
- @Override
- protected Object getField(Record intermediate, int pos) {
- return intermediate.get(pos);
- }
-
- @Override
- protected Record buildStruct(Record struct) {
- return struct;
- }
-
- @Override
- protected void set(Record struct, int pos, Object value) {
- struct.set(pos, value);
- }
- }
}
diff --git
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java
similarity index 75%
copy from
data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
copy to
data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java
index 4cdb2de..282a92d 100644
---
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
+++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java
@@ -28,49 +28,40 @@ import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
-import org.apache.iceberg.data.Record;
import org.apache.iceberg.parquet.ParquetTypeVisitor;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
-import org.apache.iceberg.parquet.ParquetValueWriters.PrimitiveWriter;
-import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
-import
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class GenericParquetWriter {
- protected GenericParquetWriter() {
- }
+public abstract class BaseParquetWriter<T> {
@SuppressWarnings("unchecked")
- public static ParquetValueWriter<Record> buildWriter(MessageType type) {
- return (ParquetValueWriter<Record>) ParquetTypeVisitor.visit(type, new
WriteBuilder(type));
+ protected ParquetValueWriter<T> createWriter(MessageType type) {
+ return (ParquetValueWriter<T>) ParquetTypeVisitor.visit(type, new
WriteBuilder(type));
}
- protected static class WriteBuilder extends
ParquetTypeVisitor<ParquetValueWriter<?>> {
+ protected abstract ParquetValueWriters.StructWriter<T>
createStructWriter(List<ParquetValueWriter<?>> writers);
+
+ private class WriteBuilder extends ParquetTypeVisitor<ParquetValueWriter<?>>
{
private final MessageType type;
- protected WriteBuilder(MessageType type) {
+ private WriteBuilder(MessageType type) {
this.type = type;
}
@Override
- public ParquetValueWriter<?> message(MessageType message,
- List<ParquetValueWriter<?>>
fieldWriters) {
+ public ParquetValueWriter<?> message(MessageType message,
List<ParquetValueWriter<?>> fieldWriters) {
return struct(message.asGroupType(), fieldWriters);
}
- protected StructWriter<?> createStructWriter(List<ParquetValueWriter<?>>
writers) {
- return new RecordWriter(writers);
- }
-
@Override
public ParquetValueWriter<?> struct(GroupType struct,
List<ParquetValueWriter<?>>
fieldWriters) {
@@ -125,7 +116,8 @@ public class GenericParquetWriter {
ColumnDescriptor desc = type.getColumnDescription(currentPath());
LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation();
if (logicalType != null) {
- Optional<PrimitiveWriter<?>> writer = logicalType.accept(new
LogicalTypeWriterVisitor(desc));
+ Optional<ParquetValueWriters.PrimitiveWriter<?>> writer =
+ logicalType.accept(new LogicalTypeWriterVisitor(desc));
if (writer.isPresent()) {
return writer.get();
}
@@ -152,7 +144,8 @@ public class GenericParquetWriter {
}
}
- private static class LogicalTypeWriterVisitor implements
LogicalTypeAnnotationVisitor<PrimitiveWriter<?>> {
+ private static class LogicalTypeWriterVisitor implements
+
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueWriters.PrimitiveWriter<?>>
{
private final ColumnDescriptor desc;
private LogicalTypeWriterVisitor(ColumnDescriptor desc) {
@@ -160,17 +153,20 @@ public class GenericParquetWriter {
}
@Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
+ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+ LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
return Optional.of(ParquetValueWriters.strings(desc));
}
@Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
+ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+ LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
return Optional.of(ParquetValueWriters.strings(desc));
}
@Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
+ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+ LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
return Optional.of(ParquetValueWriters.decimalAsInteger(
@@ -187,17 +183,20 @@ public class GenericParquetWriter {
}
@Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
+ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+ LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
return Optional.of(new DateWriter(desc));
}
@Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
+ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+ LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
return Optional.of(new TimeWriter(desc));
}
@Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
+ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
Preconditions.checkArgument(LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
"Cannot write timestamp in %s, only MICROS is supported",
timestampType.getUnit());
if (timestampType.isAdjustedToUTC()) {
@@ -208,7 +207,8 @@ public class GenericParquetWriter {
}
@Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
+ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+ LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth()
< 64,
"Cannot read uint64: not a supported Java type");
if (intType.getBitWidth() < 64) {
@@ -219,12 +219,14 @@ public class GenericParquetWriter {
}
@Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+ LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(ParquetValueWriters.strings(desc));
}
@Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
+ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
+ LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
return Optional.of(ParquetValueWriters.byteBuffers(desc));
}
}
@@ -232,7 +234,7 @@ public class GenericParquetWriter {
private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
- private static class DateWriter extends PrimitiveWriter<LocalDate> {
+ private static class DateWriter extends
ParquetValueWriters.PrimitiveWriter<LocalDate> {
private DateWriter(ColumnDescriptor desc) {
super(desc);
}
@@ -243,7 +245,7 @@ public class GenericParquetWriter {
}
}
- private static class TimeWriter extends PrimitiveWriter<LocalTime> {
+ private static class TimeWriter extends
ParquetValueWriters.PrimitiveWriter<LocalTime> {
private TimeWriter(ColumnDescriptor desc) {
super(desc);
}
@@ -254,7 +256,7 @@ public class GenericParquetWriter {
}
}
- private static class TimestampWriter extends PrimitiveWriter<LocalDateTime> {
+ private static class TimestampWriter extends
ParquetValueWriters.PrimitiveWriter<LocalDateTime> {
private TimestampWriter(ColumnDescriptor desc) {
super(desc);
}
@@ -266,7 +268,7 @@ public class GenericParquetWriter {
}
}
- private static class TimestamptzWriter extends
PrimitiveWriter<OffsetDateTime> {
+ private static class TimestamptzWriter extends
ParquetValueWriters.PrimitiveWriter<OffsetDateTime> {
private TimestamptzWriter(ColumnDescriptor desc) {
super(desc);
}
@@ -277,7 +279,7 @@ public class GenericParquetWriter {
}
}
- private static class FixedWriter extends PrimitiveWriter<byte[]> {
+ private static class FixedWriter extends
ParquetValueWriters.PrimitiveWriter<byte[]> {
private FixedWriter(ColumnDescriptor desc) {
super(desc);
}
@@ -287,15 +289,4 @@ public class GenericParquetWriter {
column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value));
}
}
-
- private static class RecordWriter extends StructWriter<Record> {
- private RecordWriter(List<ParquetValueWriter<?>> writers) {
- super(writers);
- }
-
- @Override
- protected Object get(Record struct, int index) {
- return struct.get(index);
- }
- }
}
diff --git
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
index 3e4e7c7..e8a914a 100644
---
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
+++
b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java
@@ -19,391 +19,40 @@
package org.apache.iceberg.data.parquet;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
-import org.apache.iceberg.parquet.ParquetValueReaders;
-import org.apache.iceberg.parquet.ParquetValueReaders.BinaryAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.BytesReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.IntAsLongReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.IntegerAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.ListReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.LongAsDecimalReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.MapReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.PrimitiveReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.StringReader;
import org.apache.iceberg.parquet.ParquetValueReaders.StructReader;
-import org.apache.iceberg.parquet.ParquetValueReaders.UnboxedReader;
-import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Type.TypeID;
-import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.types.Types.TimestampType;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.schema.DecimalMetadata;
-import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class GenericParquetReaders {
- protected GenericParquetReaders() {
- }
-
- public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
- MessageType fileSchema)
{
- return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
- }
-
- @SuppressWarnings("unchecked")
- public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
- MessageType fileSchema,
- Map<Integer, ?>
idToConstant) {
- if (ParquetSchemaUtil.hasIds(fileSchema)) {
- return (ParquetValueReader<Record>)
- TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new ReadBuilder(fileSchema, idToConstant));
- } else {
- return (ParquetValueReader<Record>)
- TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new FallbackReadBuilder(fileSchema, idToConstant));
- }
- }
-
- protected static class FallbackReadBuilder extends ReadBuilder {
- protected FallbackReadBuilder(MessageType type, Map<Integer, ?>
idToConstant) {
- super(type, idToConstant);
- }
-
- @Override
- public ParquetValueReader<?> message(StructType expected, MessageType
message,
- List<ParquetValueReader<?>>
fieldReaders) {
- // the top level matches by ID, but the remaining IDs are missing
- return super.struct(expected, message, fieldReaders);
- }
-
- @Override
- public ParquetValueReader<?> struct(StructType expected, GroupType struct,
- List<ParquetValueReader<?>>
fieldReaders) {
- // the expected struct is ignored because nested fields are never found
when the
- List<ParquetValueReader<?>> newFields =
Lists.newArrayListWithExpectedSize(
- fieldReaders.size());
- List<Type> types =
Lists.newArrayListWithExpectedSize(fieldReaders.size());
- List<Type> fields = struct.getFields();
- for (int i = 0; i < fields.size(); i += 1) {
- Type fieldType = fields.get(i);
- int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) -
1;
- newFields.add(ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
- types.add(fieldType);
- }
-
- return createStructReader(types, newFields, expected);
- }
- }
-
- protected static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
- private final MessageType type;
- private final Map<Integer, ?> idToConstant;
-
- protected ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
- this.type = type;
- this.idToConstant = idToConstant;
- }
-
- @Override
- public ParquetValueReader<?> message(StructType expected, MessageType
message,
- List<ParquetValueReader<?>>
fieldReaders) {
- return struct(expected, message.asGroupType(), fieldReaders);
- }
-
- protected StructReader<?, ?> createStructReader(List<Type> types,
-
List<ParquetValueReader<?>> readers,
- StructType struct) {
- return new RecordReader(types, readers, struct);
- }
-
- @Override
- public ParquetValueReader<?> struct(StructType expected, GroupType struct,
- List<ParquetValueReader<?>>
fieldReaders) {
- // match the expected struct's order
- Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
- Map<Integer, Type> typesById = Maps.newHashMap();
- List<Type> fields = struct.getFields();
- for (int i = 0; i < fields.size(); i += 1) {
- Type fieldType = fields.get(i);
- int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
- int id = fieldType.getId().intValue();
- readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
- typesById.put(id, fieldType);
- }
-
- List<Types.NestedField> expectedFields = expected != null ?
- expected.fields() : ImmutableList.of();
- List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(
- expectedFields.size());
- List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
- for (Types.NestedField field : expectedFields) {
- int id = field.fieldId();
- if (idToConstant.containsKey(id)) {
- // containsKey is used because the constant may be null
-
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
- types.add(null);
- } else {
- ParquetValueReader<?> reader = readersById.get(id);
- if (reader != null) {
- reorderedFields.add(reader);
- types.add(typesById.get(id));
- } else {
- reorderedFields.add(ParquetValueReaders.nulls());
- types.add(null);
- }
- }
- }
-
- return createStructReader(types, reorderedFields, expected);
- }
-
- @Override
- public ParquetValueReader<?> list(Types.ListType expectedList, GroupType
array,
- ParquetValueReader<?> elementReader) {
- GroupType repeated = array.getFields().get(0).asGroupType();
- String[] repeatedPath = currentPath();
-
- int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
- int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
-
- Type elementType = repeated.getType(0);
- int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) -
1;
-
- return new ListReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(elementType, elementD, elementReader));
- }
-
- @Override
- public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
- ParquetValueReader<?> keyReader,
- ParquetValueReader<?> valueReader) {
- GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
- String[] repeatedPath = currentPath();
-
- int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
- int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
-
- Type keyType = repeatedKeyValue.getType(0);
- int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
- Type valueType = repeatedKeyValue.getType(1);
- int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
-
- return new MapReader<>(repeatedD, repeatedR,
- ParquetValueReaders.option(keyType, keyD, keyReader),
- ParquetValueReaders.option(valueType, valueD, valueReader));
- }
-
- @Override
- public ParquetValueReader<?>
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
- PrimitiveType primitive) {
- ColumnDescriptor desc = type.getColumnDescription(currentPath());
-
- if (primitive.getOriginalType() != null) {
- switch (primitive.getOriginalType()) {
- case ENUM:
- case JSON:
- case UTF8:
- return new StringReader(desc);
- case INT_8:
- case INT_16:
- case INT_32:
- if (expected.typeId() == TypeID.LONG) {
- return new IntAsLongReader(desc);
- } else {
- return new UnboxedReader<>(desc);
- }
- case INT_64:
- return new UnboxedReader<>(desc);
- case DATE:
- return new DateReader(desc);
- case TIMESTAMP_MICROS:
- TimestampType tsMicrosType = (TimestampType) expected;
- if (tsMicrosType.shouldAdjustToUTC()) {
- return new TimestamptzReader(desc);
- } else {
- return new TimestampReader(desc);
- }
- case TIMESTAMP_MILLIS:
- TimestampType tsMillisType = (TimestampType) expected;
- if (tsMillisType.shouldAdjustToUTC()) {
- return new TimestamptzMillisReader(desc);
- } else {
- return new TimestampMillisReader(desc);
- }
- case TIME_MICROS:
- return new TimeReader(desc);
- case TIME_MILLIS:
- return new TimeMillisReader(desc);
- case DECIMAL:
- DecimalMetadata decimal = primitive.getDecimalMetadata();
- switch (primitive.getPrimitiveTypeName()) {
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- return new BinaryAsDecimalReader(desc, decimal.getScale());
- case INT64:
- return new LongAsDecimalReader(desc, decimal.getScale());
- case INT32:
- return new IntegerAsDecimalReader(desc, decimal.getScale());
- default:
- throw new UnsupportedOperationException(
- "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
- }
- case BSON:
- return new BytesReader(desc);
- default:
- throw new UnsupportedOperationException(
- "Unsupported logical type: " + primitive.getOriginalType());
- }
- }
-
- switch (primitive.getPrimitiveTypeName()) {
- case FIXED_LEN_BYTE_ARRAY:
- return new FixedReader(desc);
- case BINARY:
- return new BytesReader(desc);
- case INT32:
- if (expected != null && expected.typeId() == TypeID.LONG) {
- return new IntAsLongReader(desc);
- } else {
- return new UnboxedReader<>(desc);
- }
- case FLOAT:
- if (expected != null && expected.typeId() == TypeID.DOUBLE) {
- return new ParquetValueReaders.FloatAsDoubleReader(desc);
- } else {
- return new UnboxedReader<>(desc);
- }
- case BOOLEAN:
- case INT64:
- case DOUBLE:
- return new UnboxedReader<>(desc);
- default:
- throw new UnsupportedOperationException("Unsupported type: " +
primitive);
- }
- }
-
- MessageType type() {
- return type;
- }
- }
-
- private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
- private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
- private static class DateReader extends PrimitiveReader<LocalDate> {
- private DateReader(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public LocalDate read(LocalDate reuse) {
- return EPOCH_DAY.plusDays(column.nextInteger());
- }
- }
-
- private static class TimestampReader extends PrimitiveReader<LocalDateTime> {
- private TimestampReader(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public LocalDateTime read(LocalDateTime reuse) {
- return EPOCH.plus(column.nextLong(),
ChronoUnit.MICROS).toLocalDateTime();
- }
- }
-
- private static class TimestampMillisReader extends
PrimitiveReader<LocalDateTime> {
- private TimestampMillisReader(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public LocalDateTime read(LocalDateTime reuse) {
- return EPOCH.plus(column.nextLong() * 1000,
ChronoUnit.MICROS).toLocalDateTime();
- }
- }
-
- private static class TimestamptzReader extends
PrimitiveReader<OffsetDateTime> {
- private TimestamptzReader(ColumnDescriptor desc) {
- super(desc);
- }
+public class GenericParquetReaders extends BaseParquetReaders<Record> {
- @Override
- public OffsetDateTime read(OffsetDateTime reuse) {
- return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
- }
- }
+ private static final GenericParquetReaders INSTANCE = new
GenericParquetReaders();
- private static class TimestamptzMillisReader extends
PrimitiveReader<OffsetDateTime> {
- private TimestamptzMillisReader(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public OffsetDateTime read(OffsetDateTime reuse) {
- return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
- }
+ private GenericParquetReaders() {
}
- private static class TimeMillisReader extends PrimitiveReader<LocalTime> {
- private TimeMillisReader(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public LocalTime read(LocalTime reuse) {
- return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
- }
+ public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return INSTANCE.createReader(expectedSchema, fileSchema);
}
- private static class TimeReader extends PrimitiveReader<LocalTime> {
- private TimeReader(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public LocalTime read(LocalTime reuse) {
- return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
- }
+ public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
}
- private static class FixedReader extends PrimitiveReader<byte[]> {
- private FixedReader(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public byte[] read(byte[] reuse) {
- if (reuse != null) {
- column.nextBinary().toByteBuffer().duplicate().get(reuse);
- return reuse;
- } else {
- return column.nextBinary().getBytes();
- }
- }
+ @Override
+ protected ParquetValueReader<Record> createStructReader(List<Type> types,
List<ParquetValueReader<?>> fieldReaders,
+ StructType
structType) {
+ return new RecordReader(types, fieldReaders, structType);
}
- static class RecordReader extends StructReader<Record, Record> {
+ private static class RecordReader extends StructReader<Record, Record> {
private final StructType structType;
RecordReader(List<Type> types,
diff --git
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
index 4cdb2de..5bb878f 100644
---
a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
+++
b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java
@@ -19,273 +19,25 @@
package org.apache.iceberg.data.parquet;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
import java.util.List;
-import java.util.Optional;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.parquet.ParquetTypeVisitor;
import org.apache.iceberg.parquet.ParquetValueWriter;
-import org.apache.iceberg.parquet.ParquetValueWriters;
-import org.apache.iceberg.parquet.ParquetValueWriters.PrimitiveWriter;
import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.LogicalTypeAnnotation;
-import
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-public class GenericParquetWriter {
- protected GenericParquetWriter() {
- }
-
- @SuppressWarnings("unchecked")
- public static ParquetValueWriter<Record> buildWriter(MessageType type) {
- return (ParquetValueWriter<Record>) ParquetTypeVisitor.visit(type, new
WriteBuilder(type));
- }
-
- protected static class WriteBuilder extends
ParquetTypeVisitor<ParquetValueWriter<?>> {
- private final MessageType type;
-
- protected WriteBuilder(MessageType type) {
- this.type = type;
- }
-
- @Override
- public ParquetValueWriter<?> message(MessageType message,
- List<ParquetValueWriter<?>>
fieldWriters) {
- return struct(message.asGroupType(), fieldWriters);
- }
-
- protected StructWriter<?> createStructWriter(List<ParquetValueWriter<?>>
writers) {
- return new RecordWriter(writers);
- }
-
- @Override
- public ParquetValueWriter<?> struct(GroupType struct,
- List<ParquetValueWriter<?>>
fieldWriters) {
- List<Type> fields = struct.getFields();
- List<ParquetValueWriter<?>> writers =
Lists.newArrayListWithExpectedSize(fieldWriters.size());
- for (int i = 0; i < fields.size(); i += 1) {
- Type fieldType = struct.getType(i);
- int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName()));
- writers.add(ParquetValueWriters.option(fieldType, fieldD,
fieldWriters.get(i)));
- }
-
- return createStructWriter(writers);
- }
-
- @Override
- public ParquetValueWriter<?> list(GroupType array, ParquetValueWriter<?>
elementWriter) {
- GroupType repeated = array.getFields().get(0).asGroupType();
- String[] repeatedPath = currentPath();
-
- int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
- int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
-
- org.apache.parquet.schema.Type elementType = repeated.getType(0);
- int elementD = type.getMaxDefinitionLevel(path(elementType.getName()));
-
- return ParquetValueWriters.collections(repeatedD, repeatedR,
- ParquetValueWriters.option(elementType, elementD, elementWriter));
- }
-
- @Override
- public ParquetValueWriter<?> map(GroupType map,
- ParquetValueWriter<?> keyWriter,
- ParquetValueWriter<?> valueWriter) {
- GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
- String[] repeatedPath = currentPath();
-
- int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
- int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
-
- org.apache.parquet.schema.Type keyType = repeatedKeyValue.getType(0);
- int keyD = type.getMaxDefinitionLevel(path(keyType.getName()));
- org.apache.parquet.schema.Type valueType = repeatedKeyValue.getType(1);
- int valueD = type.getMaxDefinitionLevel(path(valueType.getName()));
-
- return ParquetValueWriters.maps(repeatedD, repeatedR,
- ParquetValueWriters.option(keyType, keyD, keyWriter),
- ParquetValueWriters.option(valueType, valueD, valueWriter));
- }
-
- @Override
- public ParquetValueWriter<?> primitive(PrimitiveType primitive) {
- ColumnDescriptor desc = type.getColumnDescription(currentPath());
- LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation();
- if (logicalType != null) {
- Optional<PrimitiveWriter<?>> writer = logicalType.accept(new
LogicalTypeWriterVisitor(desc));
- if (writer.isPresent()) {
- return writer.get();
- }
- }
-
- switch (primitive.getPrimitiveTypeName()) {
- case FIXED_LEN_BYTE_ARRAY:
- return new FixedWriter(desc);
- case BINARY:
- return ParquetValueWriters.byteBuffers(desc);
- case BOOLEAN:
- return ParquetValueWriters.booleans(desc);
- case INT32:
- return ParquetValueWriters.ints(desc);
- case INT64:
- return ParquetValueWriters.longs(desc);
- case FLOAT:
- return ParquetValueWriters.floats(desc);
- case DOUBLE:
- return ParquetValueWriters.doubles(desc);
- default:
- throw new UnsupportedOperationException("Unsupported type: " +
primitive);
- }
- }
- }
-
- private static class LogicalTypeWriterVisitor implements
LogicalTypeAnnotationVisitor<PrimitiveWriter<?>> {
- private final ColumnDescriptor desc;
-
- private LogicalTypeWriterVisitor(ColumnDescriptor desc) {
- this.desc = desc;
- }
-
- @Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
- return Optional.of(ParquetValueWriters.strings(desc));
- }
-
- @Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
- return Optional.of(ParquetValueWriters.strings(desc));
- }
-
- @Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
- switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
- case INT32:
- return Optional.of(ParquetValueWriters.decimalAsInteger(
- desc, decimalType.getPrecision(), decimalType.getScale()));
- case INT64:
- return Optional.of(ParquetValueWriters.decimalAsLong(
- desc, decimalType.getPrecision(), decimalType.getScale()));
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- return Optional.of(ParquetValueWriters.decimalAsFixed(
- desc, decimalType.getPrecision(), decimalType.getScale()));
- }
- return Optional.empty();
- }
+public class GenericParquetWriter extends BaseParquetWriter<Record> {
+ private static final GenericParquetWriter INSTANCE = new
GenericParquetWriter();
- @Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
- return Optional.of(new DateWriter(desc));
- }
-
- @Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
- return Optional.of(new TimeWriter(desc));
- }
-
- @Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
-
Preconditions.checkArgument(LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
- "Cannot write timestamp in %s, only MICROS is supported",
timestampType.getUnit());
- if (timestampType.isAdjustedToUTC()) {
- return Optional.of(new TimestamptzWriter(desc));
- } else {
- return Optional.of(new TimestampWriter(desc));
- }
- }
-
- @Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
- Preconditions.checkArgument(intType.isSigned() || intType.getBitWidth()
< 64,
- "Cannot read uint64: not a supported Java type");
- if (intType.getBitWidth() < 64) {
- return Optional.of(ParquetValueWriters.ints(desc));
- } else {
- return Optional.of(ParquetValueWriters.longs(desc));
- }
- }
-
- @Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
- return Optional.of(ParquetValueWriters.strings(desc));
- }
-
- @Override
- public Optional<PrimitiveWriter<?>>
visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
- return Optional.of(ParquetValueWriters.byteBuffers(desc));
- }
+ private GenericParquetWriter() {
}
- private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
- private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
- private static class DateWriter extends PrimitiveWriter<LocalDate> {
- private DateWriter(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public void write(int repetitionLevel, LocalDate value) {
- column.writeInteger(repetitionLevel, (int)
ChronoUnit.DAYS.between(EPOCH_DAY, value));
- }
- }
-
- private static class TimeWriter extends PrimitiveWriter<LocalTime> {
- private TimeWriter(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public void write(int repetitionLevel, LocalTime value) {
- column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000);
- }
- }
-
- private static class TimestampWriter extends PrimitiveWriter<LocalDateTime> {
- private TimestampWriter(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public void write(int repetitionLevel, LocalDateTime value) {
- column.writeLong(repetitionLevel,
- ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC)));
- }
- }
-
- private static class TimestamptzWriter extends
PrimitiveWriter<OffsetDateTime> {
- private TimestamptzWriter(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public void write(int repetitionLevel, OffsetDateTime value) {
- column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH,
value));
- }
+ public static ParquetValueWriter<Record> buildWriter(MessageType type) {
+ return INSTANCE.createWriter(type);
}
- private static class FixedWriter extends PrimitiveWriter<byte[]> {
- private FixedWriter(ColumnDescriptor desc) {
- super(desc);
- }
-
- @Override
- public void write(int repetitionLevel, byte[] value) {
- column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value));
- }
+ @Override
+ protected StructWriter<Record>
createStructWriter(List<ParquetValueWriter<?>> writers) {
+ return new RecordWriter(writers);
}
private static class RecordWriter extends StructWriter<Record> {
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index e9d9bc0..ccea5d6 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -20,66 +20,34 @@
package org.apache.iceberg.flink.data;
import java.util.List;
-import java.util.Map;
import org.apache.flink.types.Row;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.data.parquet.BaseParquetReaders;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
-import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends GenericParquetReaders {
- private FlinkParquetReaders() {
- }
-
- @SuppressWarnings("unchecked")
- public static ParquetValueReader<Row> buildRowReader(Schema expectedSchema,
- MessageType fileSchema)
{
- if (ParquetSchemaUtil.hasIds(fileSchema)) {
- return (ParquetValueReader<Row>)
- TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new ReadBuilder(fileSchema, ImmutableMap.of()));
- } else {
- return (ParquetValueReader<Row>)
- TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
- new FallbackReadBuilder(fileSchema, ImmutableMap.of()));
- }
- }
-
- private static class FallbackReadBuilder extends
GenericParquetReaders.FallbackReadBuilder {
+public class FlinkParquetReaders extends BaseParquetReaders<Row> {
- private FallbackReadBuilder(MessageType type, Map<Integer, ?>
idToConstant) {
- super(type, idToConstant);
- }
+ private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
- @Override
- protected ParquetValueReaders.StructReader<?, ?>
createStructReader(List<Type> types,
-
List<ParquetValueReader<?>> readers,
-
Types.StructType struct) {
- return new RowReader(types, readers, struct);
- }
+ private FlinkParquetReaders() {
}
- private static class ReadBuilder extends GenericParquetReaders.ReadBuilder {
-
- private ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
- super(type, idToConstant);
- }
+ public static ParquetValueReader<Row> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return INSTANCE.createReader(expectedSchema, fileSchema);
+ }
- @Override
- protected ParquetValueReaders.StructReader<Row, Row>
createStructReader(List<Type> types,
-
List<ParquetValueReader<?>> readers,
-
Types.StructType struct) {
- return new RowReader(types, readers, struct);
- }
+ @Override
+ protected ParquetValueReader<Row> createStructReader(List<Type> types,
+
List<ParquetValueReader<?>> fieldReaders,
+ Types.StructType
structType) {
+ return new RowReader(types, fieldReaders, structType);
}
- static class RowReader extends ParquetValueReaders.StructReader<Row, Row> {
+ private static class RowReader extends ParquetValueReaders.StructReader<Row,
Row> {
private final Types.StructType structType;
RowReader(List<Type> types, List<ParquetValueReader<?>> readers,
Types.StructType struct) {
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 2e36b4a..54b4fea 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -21,31 +21,25 @@ package org.apache.iceberg.flink.data;
import java.util.List;
import org.apache.flink.types.Row;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.parquet.ParquetTypeVisitor;
+import org.apache.iceberg.data.parquet.BaseParquetWriter;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
import org.apache.parquet.schema.MessageType;
-public class FlinkParquetWriters extends GenericParquetWriter {
+public class FlinkParquetWriters extends BaseParquetWriter<Row> {
+
+ private static final FlinkParquetWriters INSTANCE = new
FlinkParquetWriters();
+
private FlinkParquetWriters() {
}
- @SuppressWarnings("unchecked")
- public static ParquetValueWriter<Row> buildRowWriter(MessageType type) {
- return (ParquetValueWriter<Row>) ParquetTypeVisitor.visit(type, new
WriteBuilder(type));
+ public static ParquetValueWriter<Row> buildWriter(MessageType type) {
+ return INSTANCE.createWriter(type);
}
- private static class WriteBuilder extends GenericParquetWriter.WriteBuilder {
-
- private WriteBuilder(MessageType type) {
- super(type);
- }
-
- @Override
- protected ParquetValueWriters.StructWriter<Row>
createStructWriter(List<ParquetValueWriter<?>> writers) {
- return new RowWriter(writers);
- }
+ @Override
+ protected ParquetValueWriters.StructWriter<Row>
createStructWriter(List<ParquetValueWriter<?>> writers) {
+ return new RowWriter(writers);
}
private static class RowWriter extends ParquetValueWriters.StructWriter<Row>
{
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
index f8bf6a5..e0cb3f0 100644
---
a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
+++
b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
@@ -76,14 +76,14 @@ public class TestFlinkParquetReaderWriter {
try (FileAppender<Row> writer = Parquet.write(Files.localOutput(testFile))
.schema(schema)
- .createWriterFunc(FlinkParquetWriters::buildRowWriter)
+ .createWriterFunc(FlinkParquetWriters::buildWriter)
.build()) {
writer.addAll(iterable);
}
try (CloseableIterable<Row> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
- .createReaderFunc(type -> FlinkParquetReaders.buildRowReader(schema,
type))
+ .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema,
type))
.build()) {
Iterator<Row> expected = iterable.iterator();
Iterator<Row> rows = reader.iterator();