rdblue commented on code in PR #12470:
URL: https://github.com/apache/iceberg/pull/12470#discussion_r1996281732
##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java:
##########
@@ -194,8 +177,104 @@ public ParquetValueWriter<?> primitive(LogicalType fType,
PrimitiveType primitiv
}
}
- private static ParquetValueWriters.PrimitiveWriter<?> ints(
- LogicalType type, ColumnDescriptor desc) {
+ private static class LogicalTypeWriterBuilder
+ implements LogicalTypeAnnotationVisitor<ParquetValueWriter<?>> {
+ private final LogicalType flinkType;
+ private final ColumnDescriptor desc;
+
+ private LogicalTypeWriterBuilder(LogicalType flinkType, ColumnDescriptor
desc) {
+ this.flinkType = flinkType;
+ this.desc = desc;
+ }
+
+ @Override
+ public Optional<ParquetValueWriter<?>> visit(StringLogicalTypeAnnotation
strings) {
+ return Optional.of(strings(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueWriter<?>> visit(EnumLogicalTypeAnnotation
enums) {
+ return Optional.of(strings(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueWriter<?>> visit(DecimalLogicalTypeAnnotation
decimal) {
+ ParquetValueWriter<DecimalData> writer;
+ switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ writer = decimalAsInteger(desc, decimal.getPrecision(),
decimal.getScale());
+ break;
+ case INT64:
+ writer = decimalAsLong(desc, decimal.getPrecision(),
decimal.getScale());
+ break;
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ writer = decimalAsFixed(desc, decimal.getPrecision(),
decimal.getScale());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported base type for decimal: "
+ + desc.getPrimitiveType().getPrimitiveTypeName());
+ }
+ return Optional.of(writer);
+ }
+
+ @Override
+ public Optional<ParquetValueWriter<?>> visit(DateLogicalTypeAnnotation
dates) {
+ return Optional.of(ints(flinkType, desc));
+ }
+
+ @Override
+ public Optional<ParquetValueWriter<?>> visit(TimeLogicalTypeAnnotation
times) {
+ Preconditions.checkArgument(
+ LogicalTypeAnnotation.TimeUnit.MICROS.equals(times.getUnit()),
+ "Cannot write time in %s, only MICROS is supported",
+ times.getUnit());
+ return Optional.of(timeMicros(desc));
+ }
+
+ @Override
+ public Optional<ParquetValueWriter<?>>
visit(TimestampLogicalTypeAnnotation timestamps) {
+ ParquetValueWriter<TimestampData> writer;
+ switch (timestamps.getUnit()) {
+ case NANOS:
+ writer = timestampNanos(desc);
+ break;
+ case MICROS:
+ writer = timestamps(desc);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported timestamp type:
" + timestamps);
+ }
+
+ return Optional.of(writer);
+ }
+
+ @Override
+ public Optional<ParquetValueWriter<?>> visit(IntLogicalTypeAnnotation
type) {
+ Preconditions.checkArgument(type.isSigned(), "Cannot write signed
integer type: %s", type);
Review Comment:
Yes. Good catch!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]