danny0405 commented on a change in pull request #4548: URL: https://github.com/apache/hudi/pull/4548#discussion_r781046612
########## File path: hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java ########## @@ -75,22 +84,38 @@ protected void readBatchFromDictionaryIds( for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.setTimestamp(i, decodeInt64ToTimestamp( - utcTimestamp, dictionary, dictionaryIds.getInt(i))); + utcTimestamp, dictionary, dictionaryIds.getInt(i), unit)); } } } public static TimestampData decodeInt64ToTimestamp( boolean utcTimestamp, org.apache.parquet.column.Dictionary dictionary, - int id) { + int id, + LogicalTypeAnnotation.TimeUnit unit) { long value = dictionary.decodeToLong(id); - return int64ToTimestamp(utcTimestamp, value); + return int64ToTimestamp(utcTimestamp, value, unit); } - private static TimestampData int64ToTimestamp(boolean utcTimestamp, long millionsOfDay) { + private static TimestampData int64ToTimestamp( + boolean utcTimestamp, + long millionsOfDay, + LogicalTypeAnnotation.TimeUnit unit) { if (utcTimestamp) { - return TimestampData.fromEpochMillis(millionsOfDay, 0); + final ChronoUnit chronoUnit; + switch (unit) { + case MILLIS: + chronoUnit = ChronoUnit.MILLIS; + break; Review comment: I guess the `chronoUnit` can be a class member variable right ? And, we should also fix the non-utc timezone code path. ########## File path: hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java ########## @@ -104,4 +109,21 @@ private static Converter getConverter(LogicalType logicalType) { "Unsupported type " + logicalType.getTypeRoot() + " for " + StringToRowDataConverter.class.getName()); } } + + private static TimestampData convertToTimestamp(long timestamp, TimestampType logicalType) { + final int precision = logicalType.getPrecision(); Review comment: Same as `RowDataToAvroConverters`, instantiate two converters here. ########## File path: hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java ########## @@ -125,7 +129,7 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { case TIME_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: - return AvroToRowDataConverters::convertToTimestamp; + return avroObject -> convertToTimestamp(avroObject, (TimestampType) type); case CHAR: Review comment: We can also instantiate the `chronoUnit` first based on the `type` precision. ########## File path: hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java ########## @@ -200,22 +204,36 @@ private static AvroToRowDataConverter createMapConverter(LogicalType type) { }; } - private static TimestampData convertToTimestamp(Object object) { - final long millis; + private static TimestampData convertToTimestamp(Object object, TimestampType logicalType) { + final long timestamp; if (object instanceof Long) { - millis = (Long) object; + timestamp = (Long) object; } else if (object instanceof Instant) { - millis = ((Instant) object).toEpochMilli(); + timestamp = ((Instant) object).toEpochMilli(); } else { JodaConverter jodaConverter = JodaConverter.getConverter(); if (jodaConverter != null) { - millis = jodaConverter.convertTimestamp(object); + timestamp = jodaConverter.convertTimestamp(object); } else { throw new IllegalArgumentException( "Unexpected object type for TIMESTAMP logical type. Received: " + object); } } - return TimestampData.fromEpochMillis(millis); + + final int precision = logicalType.getPrecision(); + final ChronoUnit chronoUnit; + if (precision <= 3) { + chronoUnit = ChronoUnit.MILLIS; + } else if (precision <= 6) { + chronoUnit = ChronoUnit.MICROS; + } else { + throw new IllegalArgumentException( + "Avro does not support TIMESTAMP type with precision: " + + precision + + ", it only supports precision less than 6."); + } + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant(Instant.ofEpochSecond(0).plus(timestamp, chronoUnit), ZoneId.systemDefault())); Review comment: We should utc timezone here: ```java TimestampData.fromInstant(Instant.EPOCH.plus(timestamp, chronoUnit)) ``` ########## File path: hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java ########## @@ -157,7 +162,15 @@ public Object convert(Schema schema, Object object) { @Override public Object convert(Schema schema, Object object) { - return ((TimestampData) object).toInstant().toEpochMilli(); + final LocalDateTime now = ((TimestampData) object).toLocalDateTime(); + final ZoneOffset offset = ZoneOffset.systemDefault().getRules().getOffset(LocalDateTime.now()); + if (schema.getLogicalType() instanceof LogicalTypes.TimestampMillis) { + return now.toInstant(offset).toEpochMilli(); + } else if (schema.getLogicalType() instanceof LogicalTypes.TimestampMicros) { Review comment: We can instantiate two converters like this: ```java TimestampType timestampType = (TimestampType) type; if (timestampType.getPrecision() <= 3) { converter = new RowDataToAvroConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Schema schema, Object object) { return ((TimestampData) object).toInstant().toEpochMilli(); } }; } else if (timestampType.getPrecision() <= 6) { converter = new RowDataToAvroConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Schema schema, Object object) { return ChronoUnit.MICROS.between(Instant.EPOCH, ((TimestampData) object).toInstant()); } }; } else { throw new UnsupportedOperationException("Unsupported type: " + type); } ``` ########## File path: hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java ########## @@ -98,10 +108,30 @@ void testRowDataToAvroStringToRowData() { final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey); Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys); - GenericRowData converted = new GenericRowData(6); - for (int i = 0; i < 6; i++) { + GenericRowData converted = new GenericRowData(7); + for (int i = 0; i < 7; i++) { converted.setField(i, convertedKeys[i]); } assertThat(converted, is(rowData)); } + + private TimestampData stringToTimestampData(String timestamp) { + return TimestampData.fromLocalDateTime(LocalDateTime.parse(timestamp)); + } + + private long timestampDataToLong(TimestampData timestamp, ChronoUnit unit) { + final LocalDateTime now = timestamp.toLocalDateTime(); + final ZoneOffset offset = ZoneOffset.systemDefault().getRules().getOffset(LocalDateTime.now()); + switch (unit) { + case MILLIS: + return TimeUnit.SECONDS.toMillis(now.toEpochSecond(offset)) + + now.toInstant(offset).getLong(ChronoField.MILLI_OF_SECOND); Review comment: Can we convert the `TimestampData` to `Instant` first then to the long then ? It is more concise i think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org