danny0405 commented on a change in pull request #4548:
URL: https://github.com/apache/hudi/pull/4548#discussion_r781046612
##
File path:
hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java
##
@@ -75,22 +84,38 @@ protected void readBatchFromDictionaryIds(
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.setTimestamp(i, decodeInt64ToTimestamp(
-utcTimestamp, dictionary, dictionaryIds.getInt(i)));
+utcTimestamp, dictionary, dictionaryIds.getInt(i), unit));
}
}
}
public static TimestampData decodeInt64ToTimestamp(
boolean utcTimestamp,
org.apache.parquet.column.Dictionary dictionary,
- int id) {
+ int id,
+ LogicalTypeAnnotation.TimeUnit unit) {
long value = dictionary.decodeToLong(id);
-return int64ToTimestamp(utcTimestamp, value);
+return int64ToTimestamp(utcTimestamp, value, unit);
}
- private static TimestampData int64ToTimestamp(boolean utcTimestamp, long
millionsOfDay) {
+ private static TimestampData int64ToTimestamp(
+ boolean utcTimestamp,
+ long millionsOfDay,
+ LogicalTypeAnnotation.TimeUnit unit) {
if (utcTimestamp) {
- return TimestampData.fromEpochMillis(millionsOfDay, 0);
+ final ChronoUnit chronoUnit;
+ switch (unit) {
+case MILLIS:
+ chronoUnit = ChronoUnit.MILLIS;
+ break;
Review comment:
I guess the `chronoUnit` can be a class member variable right ?
And, we should also fix the non-utc timezone code path.
##
File path:
hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
##
@@ -104,4 +109,21 @@ private static Converter getConverter(LogicalType
logicalType) {
"Unsupported type " + logicalType.getTypeRoot() + " for " +
StringToRowDataConverter.class.getName());
}
}
+
+ private static TimestampData convertToTimestamp(long timestamp,
TimestampType logicalType) {
+final int precision = logicalType.getPrecision();
Review comment:
Same as `RowDataToAvroConverters`, instantiate two converters here.
##
File path:
hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
##
@@ -125,7 +129,7 @@ private static AvroToRowDataConverter
createConverter(LogicalType type) {
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
-return AvroToRowDataConverters::convertToTimestamp;
+return avroObject -> convertToTimestamp(avroObject, (TimestampType)
type);
case CHAR:
Review comment:
We can also instantiate the `chronoUnit` first based on the `type`
precision.
##
File path:
hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
##
@@ -200,22 +204,36 @@ private static AvroToRowDataConverter
createMapConverter(LogicalType type) {
};
}
- private static TimestampData convertToTimestamp(Object object) {
-final long millis;
+ private static TimestampData convertToTimestamp(Object object, TimestampType
logicalType) {
+final long timestamp;
if (object instanceof Long) {
- millis = (Long) object;
+ timestamp = (Long) object;
} else if (object instanceof Instant) {
- millis = ((Instant) object).toEpochMilli();
+ timestamp = ((Instant) object).toEpochMilli();
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
-millis = jodaConverter.convertTimestamp(object);
+timestamp = jodaConverter.convertTimestamp(object);
} else {
throw new IllegalArgumentException(
"Unexpected object type for TIMESTAMP logical type. Received: " +
object);
}
}
-return TimestampData.fromEpochMillis(millis);
+
+final int precision = logicalType.getPrecision();
+final ChronoUnit chronoUnit;
+if (precision <= 3) {
+ chronoUnit = ChronoUnit.MILLIS;
+} else if (precision <= 6) {
+ chronoUnit = ChronoUnit.MICROS;
+} else {
+ throw new IllegalArgumentException(
+ "Avro does not support TIMESTAMP type with precision: "
+ + precision
+ + ", it only supports precision less than 6.");
+}
+return TimestampData.fromLocalDateTime(
+LocalDateTime.ofInstant(Instant.ofEpochSecond(0).plus(timestamp,
chronoUnit), ZoneId.systemDefault()));
Review comment:
We should utc timezone here:
```java
TimestampData.fromInstant(Instant.EPOCH.plus(timestamp, chronoUnit))
```
##
File path:
hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
##
@@ -157,7 +162,15 @@ public Object convert(Schema schema, Object object) {
@Override
public Object convert(Schema schema, Object object) {
-return