danny0405 commented on a change in pull request #4548:
URL: https://github.com/apache/hudi/pull/4548#discussion_r781046612



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java
##########
@@ -75,22 +84,38 @@ protected void readBatchFromDictionaryIds(
     for (int i = rowId; i < rowId + num; ++i) {
       if (!column.isNullAt(i)) {
         column.setTimestamp(i, decodeInt64ToTimestamp(
-            utcTimestamp, dictionary, dictionaryIds.getInt(i)));
+            utcTimestamp, dictionary, dictionaryIds.getInt(i), unit));
       }
     }
   }
 
   public static TimestampData decodeInt64ToTimestamp(
       boolean utcTimestamp,
       org.apache.parquet.column.Dictionary dictionary,
-      int id) {
+      int id,
+      LogicalTypeAnnotation.TimeUnit unit) {
     long value = dictionary.decodeToLong(id);
-    return int64ToTimestamp(utcTimestamp, value);
+    return int64ToTimestamp(utcTimestamp, value, unit);
   }
 
-  private static TimestampData int64ToTimestamp(boolean utcTimestamp, long 
millionsOfDay) {
+  private static TimestampData int64ToTimestamp(
+      boolean utcTimestamp,
+      long millionsOfDay,
+      LogicalTypeAnnotation.TimeUnit unit) {
     if (utcTimestamp) {
-      return TimestampData.fromEpochMillis(millionsOfDay, 0);
+      final ChronoUnit chronoUnit;
+      switch (unit) {
+        case MILLIS:
+          chronoUnit = ChronoUnit.MILLIS;
+          break;

Review comment:
       I guess the `chronoUnit` can be a class member variable right ?
   And, we should also fix the non-utc timezone code path.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
##########
@@ -104,4 +109,21 @@ private static Converter getConverter(LogicalType 
logicalType) {
             "Unsupported type " + logicalType.getTypeRoot() + " for " + 
StringToRowDataConverter.class.getName());
     }
   }
+
+  private static TimestampData convertToTimestamp(long timestamp, 
TimestampType logicalType) {
+    final int precision = logicalType.getPrecision();

Review comment:
       Same as `RowDataToAvroConverters`, instantiate two converters here.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
##########
@@ -125,7 +129,7 @@ private static AvroToRowDataConverter 
createConverter(LogicalType type) {
       case TIME_WITHOUT_TIME_ZONE:
         return AvroToRowDataConverters::convertToTime;
       case TIMESTAMP_WITHOUT_TIME_ZONE:
-        return AvroToRowDataConverters::convertToTimestamp;
+        return avroObject -> convertToTimestamp(avroObject, (TimestampType) 
type);
       case CHAR:

Review comment:
       We can also instantiate the `chronoUnit` first based on the `type` 
precision.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
##########
@@ -200,22 +204,36 @@ private static AvroToRowDataConverter 
createMapConverter(LogicalType type) {
     };
   }
 
-  private static TimestampData convertToTimestamp(Object object) {
-    final long millis;
+  private static TimestampData convertToTimestamp(Object object, TimestampType 
logicalType) {
+    final long timestamp;
     if (object instanceof Long) {
-      millis = (Long) object;
+      timestamp = (Long) object;
     } else if (object instanceof Instant) {
-      millis = ((Instant) object).toEpochMilli();
+      timestamp = ((Instant) object).toEpochMilli();
     } else {
       JodaConverter jodaConverter = JodaConverter.getConverter();
       if (jodaConverter != null) {
-        millis = jodaConverter.convertTimestamp(object);
+        timestamp = jodaConverter.convertTimestamp(object);
       } else {
         throw new IllegalArgumentException(
             "Unexpected object type for TIMESTAMP logical type. Received: " + 
object);
       }
     }
-    return TimestampData.fromEpochMillis(millis);
+
+    final int precision = logicalType.getPrecision();
+    final ChronoUnit chronoUnit;
+    if (precision <= 3) {
+      chronoUnit = ChronoUnit.MILLIS;
+    } else if (precision <= 6) {
+      chronoUnit = ChronoUnit.MICROS;
+    } else {
+      throw new IllegalArgumentException(
+          "Avro does not support TIMESTAMP type with precision: "
+              + precision
+              + ", it only supports precision less than 6.");
+    }
+    return TimestampData.fromLocalDateTime(
+        LocalDateTime.ofInstant(Instant.ofEpochSecond(0).plus(timestamp, 
chronoUnit), ZoneId.systemDefault()));

Review comment:
       We should utc timezone here:
   ```java
   TimestampData.fromInstant(Instant.EPOCH.plus(timestamp, chronoUnit))
   ```

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
##########
@@ -157,7 +162,15 @@ public Object convert(Schema schema, Object object) {
 
               @Override
               public Object convert(Schema schema, Object object) {
-                return ((TimestampData) object).toInstant().toEpochMilli();
+                final LocalDateTime now = ((TimestampData) 
object).toLocalDateTime();
+                final ZoneOffset offset = 
ZoneOffset.systemDefault().getRules().getOffset(LocalDateTime.now());
+                if (schema.getLogicalType() instanceof 
LogicalTypes.TimestampMillis) {
+                  return now.toInstant(offset).toEpochMilli();
+                } else if (schema.getLogicalType() instanceof 
LogicalTypes.TimestampMicros) {

Review comment:
       We can instantiate two converters like this:
   ```java
   TimestampType timestampType = (TimestampType) type;
           if (timestampType.getPrecision() <= 3) {
             converter =
                 new RowDataToAvroConverter() {
                   private static final long serialVersionUID = 1L;
   
                   @Override
                   public Object convert(Schema schema, Object object) {
                     return ((TimestampData) object).toInstant().toEpochMilli();
                   }
                 };
           } else if (timestampType.getPrecision() <= 6) {
             converter =
                 new RowDataToAvroConverter() {
                   private static final long serialVersionUID = 1L;
   
                   @Override
                   public Object convert(Schema schema, Object object) {
                     return ChronoUnit.MICROS.between(Instant.EPOCH, 
((TimestampData) object).toInstant());
                   }
                 };
           } else {
             throw new UnsupportedOperationException("Unsupported type: " + 
type);
           }
   ```

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
##########
@@ -98,10 +108,30 @@ void testRowDataToAvroStringToRowData() {
     final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey);
     Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys);
 
-    GenericRowData converted = new GenericRowData(6);
-    for (int i = 0; i < 6; i++) {
+    GenericRowData converted = new GenericRowData(7);
+    for (int i = 0; i < 7; i++) {
       converted.setField(i, convertedKeys[i]);
     }
     assertThat(converted, is(rowData));
   }
+
+  private TimestampData stringToTimestampData(String timestamp) {
+    return TimestampData.fromLocalDateTime(LocalDateTime.parse(timestamp));
+  }
+
+  private long timestampDataToLong(TimestampData timestamp, ChronoUnit unit) {
+    final LocalDateTime now = timestamp.toLocalDateTime();
+    final ZoneOffset offset = 
ZoneOffset.systemDefault().getRules().getOffset(LocalDateTime.now());
+    switch (unit) {
+      case MILLIS:
+        return TimeUnit.SECONDS.toMillis(now.toEpochSecond(offset))
+            + now.toInstant(offset).getLong(ChronoField.MILLI_OF_SECOND);

Review comment:
       Can we convert the `TimestampData` to `Instant` first then to the long 
then ? It is more concise i think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to