rdblue commented on a change in pull request #1320:
URL: https://github.com/apache/iceberg/pull/1320#discussion_r469510777



##########
File path: flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
##########
@@ -85,52 +85,51 @@ public int size() {
   }
 
   private static PositionalGetter<?> buildGetter(LogicalType logicalType, Type 
type) {
-    switch (type.typeId()) {
-      case STRING:
+    switch (logicalType.getTypeRoot()) {
+      case TINYINT:
+        return (row, pos) -> (int) row.getByte(pos);
+      case SMALLINT:
+        return (row, pos) -> (int) row.getShort(pos);
+      case CHAR:
+      case VARCHAR:
         return (row, pos) -> row.getString(pos).toString();
 
-      case FIXED:
       case BINARY:
-        return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
-
-      case UUID:
-        return (row, pos) -> {
-          ByteBuffer bb = ByteBuffer.wrap(row.getBinary(pos));
-          long mostSigBits = bb.getLong();
-          long leastSigBits = bb.getLong();
-          return new UUID(mostSigBits, leastSigBits);
-        };
+      case VARBINARY:
+        if (Type.TypeID.UUID.equals(type.typeId())) {
+          return (row, pos) -> {
+            ByteBuffer bb = ByteBuffer.wrap(row.getBinary(pos));
+            long mostSigBits = bb.getLong();
+            long leastSigBits = bb.getLong();
+            return new UUID(mostSigBits, leastSigBits);
+          };
+        } else {
+          return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
+        }
 
       case DECIMAL:
         DecimalType decimalType = (DecimalType) logicalType;
         return (row, pos) -> row.getDecimal(pos, decimalType.getPrecision(), 
decimalType.getScale()).toBigDecimal();
 
-      case TIME:
+      case TIME_WITHOUT_TIME_ZONE:
         // Time in RowData is in milliseconds (Integer), while iceberg's time 
is microseconds (Long).
         return (row, pos) -> ((long) row.getInt(pos)) * 1_000;
 
-      case TIMESTAMP:
-        switch (logicalType.getTypeRoot()) {
-          case TIMESTAMP_WITHOUT_TIME_ZONE:
-            TimestampType timestampType = (TimestampType) logicalType;
-            return (row, pos) -> {
-              LocalDateTime localDateTime = row.getTimestamp(pos, 
timestampType.getPrecision()).toLocalDateTime();
-              return DateTimeUtil.microsFromTimestamp(localDateTime);
-            };
-
-          case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-            LocalZonedTimestampType lzTs = (LocalZonedTimestampType) 
logicalType;
-            return (row, pos) -> {
-              TimestampData timestampData = row.getTimestamp(pos, 
lzTs.getPrecision());
-              return timestampData.getMillisecond() * 1000 + 
timestampData.getNanoOfMillisecond() / 1000;
-            };
-
-          default:
-            throw new IllegalArgumentException("Unhandled iceberg type: " + 
type + " corresponding flink type: " +
-                logicalType);
-        }
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        TimestampType timestampType = (TimestampType) logicalType;
+        return (row, pos) -> {
+          LocalDateTime localDateTime = row.getTimestamp(pos, 
timestampType.getPrecision()).toLocalDateTime();
+          return DateTimeUtil.microsFromTimestamp(localDateTime);

Review comment:
       Why not use the same logic here that is used in the other timestamp 
type? Both of the values are `TimestampData` that is returned by 
`getTimestamp`. It seems like converting directly to a microsecond value is 
better than going through `LocalDateTime` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to