Abacn commented on code in PR #33422:
URL: https://github.com/apache/beam/pull/33422#discussion_r1916905368


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -69,11 +71,15 @@ public class AvroGenericRecordToStorageApiProto {
   // A map of supported logical types to the protobuf field type.
   static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
       ImmutableMap.<String, TableFieldSchema.Type>builder()
-          .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
-          .put(LogicalTypes.decimal(1).getName(), 
TableFieldSchema.Type.BIGNUMERIC)
-          .put(LogicalTypes.timestampMicros().getName(), 
TableFieldSchema.Type.TIMESTAMP)
-          .put(LogicalTypes.timestampMillis().getName(), 
TableFieldSchema.Type.TIMESTAMP)
-          .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+          .put("date", TableFieldSchema.Type.DATE)

Review Comment:
   what's the reason to change them to hard coded names here? I understand they 
should be equivalent? Or keep the getName() while note the resolved name as 
comments?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -213,7 +281,7 @@ public static DynamicMessage messageFromGenericRecord(
     return builder.build();
   }
 
-  private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field 
field) {
+  private static TableFieldSchema 
fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) {

Review Comment:
   fully qualified name not necessary here? same below



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -115,34 +120,90 @@ static String convertUUID(Object value) {
   }
 
   static Long convertTimestamp(Object value, boolean micros) {
-    if (value instanceof ReadableInstant) {
-      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    if (value instanceof org.joda.time.ReadableInstant) {
+      return ((org.joda.time.ReadableInstant) value).getMillis() * 1_000L;
+    } else if (value instanceof java.time.Instant) {
+      java.time.Instant instant = (java.time.Instant) value;
+      long seconds = instant.getEpochSecond();
+      int nanos = instant.getNano();
+
+      if (seconds < 0 && nanos > 0) {
+        long ms = Math.multiplyExact(seconds + 1, 1_000_000L);
+        long adjustment = (nanos / 1_000L) - 1_000_000L;
+        return Math.addExact(ms, adjustment);
+      } else {
+        long ms = Math.multiplyExact(seconds, 1_000_000L);
+        return Math.addExact(ms, nanos / 1_000L);
+      }
     } else {
       Preconditions.checkArgument(
-          value instanceof Long, "Expecting a value as Long type (millis).");
-      return (Long) value;
+          value instanceof Long, "Expecting a value as Long type 
(timestamp).");
+      return ((Long) value) * (micros ? 1 : 1_000L);
     }
   }
 
   static Integer convertDate(Object value) {
-    if (value instanceof ReadableInstant) {
-      return Days.daysBetween(Instant.EPOCH, (ReadableInstant) 
value).getDays();
+    if (value instanceof org.joda.time.LocalDate) {
+      return org.joda.time.Days.daysBetween(EPOCH_DATE, 
(org.joda.time.LocalDate) value).getDays();
+    } else if (value instanceof java.time.LocalDate) {
+      return (int) ((java.time.LocalDate) value).toEpochDay();
     } else {
       Preconditions.checkArgument(
           value instanceof Integer, "Expecting a value as Integer type 
(days).");
       return (Integer) value;
     }
   }
 
+  static Long convertTime(Object value, boolean micros) {
+    if (value instanceof org.joda.time.LocalTime) {
+      return 1_000L * (long) ((org.joda.time.LocalTime) 
value).getMillisOfDay();
+    } else if (value instanceof java.time.LocalTime) {
+      return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros(
+          ((java.time.LocalTime) value).toNanoOfDay());
+    } else {
+      Preconditions.checkArgument(value instanceof Long, "Expecting a value as 
Long type (time).");
+      return (Long) value * (micros ? 1 : 1_000L);
+    }
+  }
+
+  static Long convertDateTime(Object value, boolean micros) {
+    if (value instanceof org.joda.time.LocalDateTime) {
+      // we should never come here as local-timestamp has been added after 
joda deprecation
+      // implement nonetheless for consistency
+      org.joda.time.DateTime dateTime =
+          ((org.joda.time.LocalDateTime) 
value).toDateTime(org.joda.time.DateTimeZone.UTC);
+      return 1_000L * dateTime.getMillis();
+    } else if (value instanceof java.time.LocalDateTime) {
+      java.time.Instant instant =
+          ((java.time.LocalDateTime) 
value).toInstant(java.time.ZoneOffset.UTC);
+      return convertTimestamp(instant, micros);
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Long, "Expecting a value as Long type 
(local-timestamp).");
+      return ((Long) value) * (micros ? 1 : 1_000L);
+    }
+  }
+
   static ByteString convertDecimal(LogicalType logicalType, Object value) {
-    ByteBuffer byteBuffer = (ByteBuffer) value;
-    BigDecimal bigDecimal =
-        new Conversions.DecimalConversion()
-            .fromBytes(
-                byteBuffer.duplicate(),
-                Schema.create(Schema.Type.NULL), // dummy schema, not used
-                logicalType);
-    return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+    if (value instanceof BigDecimal) {
+      LogicalTypes.Decimal type = (LogicalTypes.Decimal) logicalType;
+      BigDecimal bigDecimal =
+          ((BigDecimal) value)
+              .setScale(type.getScale(), RoundingMode.DOWN)
+              .round(new MathContext(type.getPrecision(), RoundingMode.DOWN));

Review Comment:
   I understand previously it only accepts a ByteBuffer, now it's adding 
support of java.math.BigDecimal? If previously it would fail or it's not lose 
precision compared to the existing behavior I think it's fine.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -115,34 +120,97 @@ static String convertUUID(Object value) {
   }
 
   static Long convertTimestamp(Object value, boolean micros) {
-    if (value instanceof ReadableInstant) {
-      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    if (value instanceof org.joda.time.ReadableInstant) {

Review Comment:
   may worth add a comment noting the following helper functions support both 
joda time and java.time, and refer to #19215 as future clean up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to