pvary commented on code in PR #9719:
URL: https://github.com/apache/iceberg/pull/9719#discussion_r1501356650
##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java:
##########
@@ -193,6 +195,122 @@ public ParquetValueReader<?> map(
ParquetValueReaders.option(valueType, valueD, valueReader));
}
+ private static class LogicalTypeAnnotationParquetValueReaderVisitor
+ implements
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
+
+ private final PrimitiveType primitive;
+ private final ColumnDescriptor desc;
+ private final org.apache.iceberg.types.Type.PrimitiveType expected;
+
+ LogicalTypeAnnotationParquetValueReaderVisitor(
+ PrimitiveType primitive,
+ ColumnDescriptor desc,
+ org.apache.iceberg.types.Type.PrimitiveType expected) {
+ this.primitive = primitive;
+ this.desc = desc;
+ this.expected = expected;
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType)
{
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+ return Optional.of(new StringReader(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ DecimalLogicalTypeAnnotation decimalLogicalType) {
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return Optional.of(
+ new BinaryDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ case INT64:
+ return Optional.of(
+ new LongDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ case INT32:
+ return Optional.of(
+ new IntegerDecimalReader(
+ desc, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale()));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(decimalLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+ if (timeLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ return Optional.of(new MillisTimeReader(desc));
+ } else if (timeLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MICROS) {
+ return Optional.of(new LossyMicrosToMillisTimeReader(desc));
+ }
+
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timeLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
timestampLogicalType) {
+ if (timestampLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ if (timestampLogicalType.isAdjustedToUTC()) {
+ return Optional.of(new MillisToTimestampTzReader(desc));
+ } else {
+ return Optional.of(new MillisToTimestampReader(desc));
+ }
+ } else if (timestampLogicalType.getUnit() ==
LogicalTypeAnnotation.TimeUnit.MICROS) {
+ if (timestampLogicalType.isAdjustedToUTC()) {
+ return Optional.of(new MicrosToTimestampTzReader(desc));
+ } else {
+ return Optional.of(new MicrosToTimestampReader(desc));
+ }
+ }
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
+ }
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+ int width = intLogicalType.getBitWidth();
+ if (width <= 32) {
+ if (expected.typeId() == Types.LongType.get().typeId()) {
+ return Optional.of(new ParquetValueReaders.IntAsLongReader(desc));
+ } else {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+ } else if (width <= 64) {
+ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
+ }
+ return
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(intLogicalType);
Review Comment:
Nit: newline
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]