[GitHub] [hudi] danny0405 commented on a change in pull request #4548: [HUDI-3184] hudi-flink support timestamp-micros

2022-01-10 Thread GitBox


danny0405 commented on a change in pull request #4548:
URL: https://github.com/apache/hudi/pull/4548#discussion_r781801961



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java
##
@@ -75,25 +90,29 @@ protected void readBatchFromDictionaryIds(
 for (int i = rowId; i < rowId + num; ++i) {
   if (!column.isNullAt(i)) {
 column.setTimestamp(i, decodeInt64ToTimestamp(
-utcTimestamp, dictionary, dictionaryIds.getInt(i)));
+utcTimestamp, dictionary, dictionaryIds.getInt(i), chronoUnit));
   }
 }
   }
 
   public static TimestampData decodeInt64ToTimestamp(
   boolean utcTimestamp,
   org.apache.parquet.column.Dictionary dictionary,
-  int id) {
+  int id,
+  ChronoUnit unit) {
 long value = dictionary.decodeToLong(id);
-return int64ToTimestamp(utcTimestamp, value);
+return int64ToTimestamp(utcTimestamp, value, unit);
   }
 
-  private static TimestampData int64ToTimestamp(boolean utcTimestamp, long 
millionsOfDay) {
+  private static TimestampData int64ToTimestamp(
+  boolean utcTimestamp,
+  long millionsOfDay,
+  ChronoUnit unit) {

Review comment:
   millionsOfDay => interval




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #4548: [HUDI-3184] hudi-flink support timestamp-micros

2022-01-10 Thread GitBox


danny0405 commented on a change in pull request #4548:
URL: https://github.com/apache/hudi/pull/4548#discussion_r781046612



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java
##
@@ -75,22 +84,38 @@ protected void readBatchFromDictionaryIds(
 for (int i = rowId; i < rowId + num; ++i) {
   if (!column.isNullAt(i)) {
 column.setTimestamp(i, decodeInt64ToTimestamp(
-utcTimestamp, dictionary, dictionaryIds.getInt(i)));
+utcTimestamp, dictionary, dictionaryIds.getInt(i), unit));
   }
 }
   }
 
   public static TimestampData decodeInt64ToTimestamp(
   boolean utcTimestamp,
   org.apache.parquet.column.Dictionary dictionary,
-  int id) {
+  int id,
+  LogicalTypeAnnotation.TimeUnit unit) {
 long value = dictionary.decodeToLong(id);
-return int64ToTimestamp(utcTimestamp, value);
+return int64ToTimestamp(utcTimestamp, value, unit);
   }
 
-  private static TimestampData int64ToTimestamp(boolean utcTimestamp, long 
millionsOfDay) {
+  private static TimestampData int64ToTimestamp(
+  boolean utcTimestamp,
+  long millionsOfDay,
+  LogicalTypeAnnotation.TimeUnit unit) {
 if (utcTimestamp) {
-  return TimestampData.fromEpochMillis(millionsOfDay, 0);
+  final ChronoUnit chronoUnit;
+  switch (unit) {
+case MILLIS:
+  chronoUnit = ChronoUnit.MILLIS;
+  break;

Review comment:
   I guess the `chronoUnit` can be a class member variable right ?
   And, we should also fix the non-utc timezone code path.

##
File path: 
hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
##
@@ -104,4 +109,21 @@ private static Converter getConverter(LogicalType 
logicalType) {
 "Unsupported type " + logicalType.getTypeRoot() + " for " + 
StringToRowDataConverter.class.getName());
 }
   }
+
+  private static TimestampData convertToTimestamp(long timestamp, 
TimestampType logicalType) {
+final int precision = logicalType.getPrecision();

Review comment:
   Same as `RowDataToAvroConverters`, instantiate two converters here.

##
File path: 
hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
##
@@ -125,7 +129,7 @@ private static AvroToRowDataConverter 
createConverter(LogicalType type) {
   case TIME_WITHOUT_TIME_ZONE:
 return AvroToRowDataConverters::convertToTime;
   case TIMESTAMP_WITHOUT_TIME_ZONE:
-return AvroToRowDataConverters::convertToTimestamp;
+return avroObject -> convertToTimestamp(avroObject, (TimestampType) 
type);
   case CHAR:

Review comment:
   We can also instantiate the `chronoUnit` first based on the `type` 
precision.

##
File path: 
hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
##
@@ -200,22 +204,36 @@ private static AvroToRowDataConverter 
createMapConverter(LogicalType type) {
 };
   }
 
-  private static TimestampData convertToTimestamp(Object object) {
-final long millis;
+  private static TimestampData convertToTimestamp(Object object, TimestampType 
logicalType) {
+final long timestamp;
 if (object instanceof Long) {
-  millis = (Long) object;
+  timestamp = (Long) object;
 } else if (object instanceof Instant) {
-  millis = ((Instant) object).toEpochMilli();
+  timestamp = ((Instant) object).toEpochMilli();
 } else {
   JodaConverter jodaConverter = JodaConverter.getConverter();
   if (jodaConverter != null) {
-millis = jodaConverter.convertTimestamp(object);
+timestamp = jodaConverter.convertTimestamp(object);
   } else {
 throw new IllegalArgumentException(
 "Unexpected object type for TIMESTAMP logical type. Received: " + 
object);
   }
 }
-return TimestampData.fromEpochMillis(millis);
+
+final int precision = logicalType.getPrecision();
+final ChronoUnit chronoUnit;
+if (precision <= 3) {
+  chronoUnit = ChronoUnit.MILLIS;
+} else if (precision <= 6) {
+  chronoUnit = ChronoUnit.MICROS;
+} else {
+  throw new IllegalArgumentException(
+  "Avro does not support TIMESTAMP type with precision: "
+  + precision
+  + ", it only supports precision less than 6.");
+}
+return TimestampData.fromLocalDateTime(
+LocalDateTime.ofInstant(Instant.ofEpochSecond(0).plus(timestamp, 
chronoUnit), ZoneId.systemDefault()));

Review comment:
   We should utc timezone here:
   ```java
   TimestampData.fromInstant(Instant.EPOCH.plus(timestamp, chronoUnit))
   ```

##
File path: 
hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
##
@@ -157,7 +162,15 @@ public Object convert(Schema schema, Object object) {
 
   @Override
   public Object convert(Schema schema, Object object) {
-return