skywalker0618 commented on code in PR #18809:
URL: https://github.com/apache/hudi/pull/18809#discussion_r3284821973
##########
hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java:
##########
@@ -252,21 +256,115 @@ public TimestampData readTimestamp() {
}
}
+ /**
+ * Reader for Parquet INT64 timestamp values (MILLIS / MICROS / NANOS), i.e.
the standard
+ * timestamp encoding defined by Parquet's
+ * {@link LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} and the
legacy
+ * {@link OriginalType#TIMESTAMP_MILLIS} / {@link
OriginalType#TIMESTAMP_MICROS} annotations.
+ * (The older INT96 encoding is marked deprecated by the Parquet format spec
— see
+ * <a
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp">
+ * LogicalTypes.md</a> — but is still supported here via {@link
TypesFromInt96PageReader} for
+ * backwards compatibility with files written by older Hive / Spark / Impala
versions.)
+ *
+ * <p>Used by {@link NestedPrimitiveColumnReader} when a TIMESTAMP column
sits inside a
+ * {@code Row}, {@code Array} or {@code Map}; the top-level path continues
to use
+ * {@link Int64TimestampColumnReader} for batched-vector efficiency.
+ */
+ public static class TypesFromInt64PageReader extends
DefaultParquetDataColumnReader {
+ private final boolean isUtcTimestamp;
+ private final ChronoUnit chronoUnit;
+
+ public TypesFromInt64PageReader(
+ ValuesReader realReader, boolean isUtcTimestamp, ChronoUnit
chronoUnit) {
+ super(realReader);
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.chronoUnit = chronoUnit;
+ }
+
+ public TypesFromInt64PageReader(
+ Dictionary dict, boolean isUtcTimestamp, ChronoUnit chronoUnit) {
+ super(dict);
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.chronoUnit = chronoUnit;
+ }
+
+ @Override
+ public TimestampData readTimestamp() {
+ return int64ToTimestamp(isUtcTimestamp, valuesReader.readLong(),
chronoUnit);
+ }
+
+ @Override
+ public TimestampData readTimestamp(int id) {
+ return int64ToTimestamp(isUtcTimestamp, dict.decodeToLong(id),
chronoUnit);
+ }
+ }
+
private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(
boolean isDictionary,
PrimitiveType parquetType,
Dictionary dictionary,
ValuesReader valuesReader,
boolean isUtcTimestamp) {
- if (parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.INT96) {
+ PrimitiveType.PrimitiveTypeName typeName =
parquetType.getPrimitiveTypeName();
+ if (typeName == PrimitiveType.PrimitiveTypeName.INT96) {
return isDictionary
? new TypesFromInt96PageReader(dictionary, isUtcTimestamp)
: new TypesFromInt96PageReader(valuesReader, isUtcTimestamp);
- } else {
- return isDictionary
- ? new DefaultParquetDataColumnReader(dictionary)
- : new DefaultParquetDataColumnReader(valuesReader);
}
+ if (typeName == PrimitiveType.PrimitiveTypeName.INT64) {
+ ChronoUnit unit = resolveInt64TimestampUnit(parquetType);
+ if (unit != null) {
+ return isDictionary
+ ? new TypesFromInt64PageReader(dictionary, isUtcTimestamp, unit)
+ : new TypesFromInt64PageReader(valuesReader, isUtcTimestamp, unit);
+ }
+ }
+ return isDictionary
+ ? new DefaultParquetDataColumnReader(dictionary)
+ : new DefaultParquetDataColumnReader(valuesReader);
+ }
+
+ /**
+ * Returns the {@link ChronoUnit} for a Parquet INT64 TIMESTAMP column, or
{@code null} if the
+ * column is a plain INT64 (not a timestamp).
+ *
+ * <p>Supports both the modern {@link
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} and
+ * the legacy {@link OriginalType#TIMESTAMP_MILLIS} / {@link
OriginalType#TIMESTAMP_MICROS}
+ * encodings.
+ */
+ private static ChronoUnit resolveInt64TimestampUnit(PrimitiveType
parquetType) {
+ LogicalTypeAnnotation annotation = parquetType.getLogicalTypeAnnotation();
+ if (annotation instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeUnit unit =
+ ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
annotation).getUnit();
+ switch (unit) {
+ case MILLIS:
+ return ChronoUnit.MILLIS;
+ case MICROS:
+ return ChronoUnit.MICROS;
+ case NANOS:
+ return ChronoUnit.NANOS;
+ default:
+ return null;
+ }
+ }
+ OriginalType originalType = parquetType.getOriginalType();
+ if (originalType == OriginalType.TIMESTAMP_MILLIS) {
+ return ChronoUnit.MILLIS;
+ }
+ if (originalType == OriginalType.TIMESTAMP_MICROS) {
+ return ChronoUnit.MICROS;
+ }
+ return null;
+ }
Review Comment:
Renamed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]