nsivabalan commented on code in PR #12511:
URL: https://github.com/apache/hudi/pull/12511#discussion_r1901445887
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2550,9 +2578,20 @@ public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(Ho
return engineContext.parallelize(partitionedWriteStats,
parallelism).flatMap(partitionedWriteStat -> {
final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
// Step 1: Collect Column Metadata for Each File part of current
commit metadata
- List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionedWriteStat.stream()
+ List<List<HoodieColumnRangeMetadata<Comparable>>>
rawFileColumnMetadata = partitionedWriteStat.stream()
.map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, validColumnsToIndex, tableSchema))
.collect(Collectors.toList());
+ // convert to payload and reconstruct the stats to maintain parity for
certain data types where avro wrapping and unwrapping could change the types.
+ List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
rawFileColumnMetadata.stream()
+ .map(new Function<List<HoodieColumnRangeMetadata<Comparable>>,
List<HoodieColumnRangeMetadata<Comparable>>>() {
+ @Override
+ public List<HoodieColumnRangeMetadata<Comparable>>
apply(List<HoodieColumnRangeMetadata<Comparable>> hoodieColumnRangeMetadata) {
+ return
HoodieMetadataPayload.createColumnStatsRecords(partitionName,
hoodieColumnRangeMetadata, false).map(record -> {
+ return
HoodieColumnRangeMetadata.fromColumnStats((((HoodieMetadataPayload)
record.getData()).getColumnStatMetadata().get()));
+ }).collect(toList());
+ }
+ }).collect(toList());
Review Comment:
As I was telling you, I will try to avoid full payload serde.
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1487,4 +1488,20 @@ public static Comparable<?>
unwrapAvroValueWrapper(Object avroValueWrapper) {
throw new UnsupportedOperationException(String.format("Unsupported type
of the value (%s)", avroValueWrapper.getClass()));
}
}
+
+ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper,
String wrapperClassName) {
+ if (avroValueWrapper == null) {
+ return null;
+ } else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
+ return LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0));
+ } else if
(TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) {
+ Instant instant = microsToInstant((Long)((Record)
avroValueWrapper).get(0));
Review Comment:
lets sync up on this.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -117,10 +117,10 @@ class ColumnStatsIndexSupport(spark: SparkSession,
shouldReadInMemory: Boolean,
prunedPartitions: Option[Set[String]] = None,
prunedFileNamesOpt: Option[Set[String]] = None)(block:
DataFrame => T): T = {
- cachedColumnStatsIndexViews.get(targetColumns) match {
+ /*cachedColumnStatsIndexViews.get(targetColumns) match {
case Some(cachedDF) =>
block(cachedDF)
- case None =>
+ case None =>*/
Review Comment:
nothing intentional. will revert it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]