danielcweeks commented on code in PR #12470:
URL: https://github.com/apache/iceberg/pull/12470#discussion_r1995967404


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java:
##########
@@ -194,8 +177,104 @@ public ParquetValueWriter<?> primitive(LogicalType fType, 
PrimitiveType primitiv
     }
   }
 
-  private static ParquetValueWriters.PrimitiveWriter<?> ints(
-      LogicalType type, ColumnDescriptor desc) {
+  private static class LogicalTypeWriterBuilder
+      implements LogicalTypeAnnotationVisitor<ParquetValueWriter<?>> {
+    private final LogicalType flinkType;
+    private final ColumnDescriptor desc;
+
+    private LogicalTypeWriterBuilder(LogicalType flinkType, ColumnDescriptor 
desc) {
+      this.flinkType = flinkType;
+      this.desc = desc;
+    }
+
+    @Override
+    public Optional<ParquetValueWriter<?>> visit(StringLogicalTypeAnnotation 
strings) {
+      return Optional.of(strings(desc));
+    }
+
+    @Override
+    public Optional<ParquetValueWriter<?>> visit(EnumLogicalTypeAnnotation 
enums) {
+      return Optional.of(strings(desc));
+    }
+
+    @Override
+    public Optional<ParquetValueWriter<?>> visit(DecimalLogicalTypeAnnotation 
decimal) {
+      ParquetValueWriter<DecimalData> writer;
+      switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          writer = decimalAsInteger(desc, decimal.getPrecision(), 
decimal.getScale());
+          break;
+        case INT64:
+          writer = decimalAsLong(desc, decimal.getPrecision(), 
decimal.getScale());
+          break;
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          writer = decimalAsFixed(desc, decimal.getPrecision(), 
decimal.getScale());
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported base type for decimal: "
+                  + desc.getPrimitiveType().getPrimitiveTypeName());
+      }
+      return Optional.of(writer);
+    }
+
+    @Override
+    public Optional<ParquetValueWriter<?>> visit(DateLogicalTypeAnnotation 
dates) {
+      return Optional.of(ints(flinkType, desc));
+    }
+
+    @Override
+    public Optional<ParquetValueWriter<?>> visit(TimeLogicalTypeAnnotation 
times) {
+      Preconditions.checkArgument(
+          LogicalTypeAnnotation.TimeUnit.MICROS.equals(times.getUnit()),
+          "Cannot write time in %s, only MICROS is supported",
+          times.getUnit());
+      return Optional.of(timeMicros(desc));
+    }
+
+    @Override
+    public Optional<ParquetValueWriter<?>> 
visit(TimestampLogicalTypeAnnotation timestamps) {
+      ParquetValueWriter<TimestampData> writer;
+      switch (timestamps.getUnit()) {
+        case NANOS:
+          writer = timestampNanos(desc);
+          break;
+        case MICROS:
+          writer = timestamps(desc);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unsupported timestamp type: 
" + timestamps);
+      }
+
+      return Optional.of(writer);
+    }
+
+    @Override
+    public Optional<ParquetValueWriter<?>> visit(IntLogicalTypeAnnotation 
type) {
+      Preconditions.checkArgument(type.isSigned(), "Cannot write signed 
integer type: %s", type);

Review Comment:
   Do you mean "cannot write _unsigned_ integer type: %s"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to