codope commented on a change in pull request #4761:
URL: https://github.com/apache/hudi/pull/4761#discussion_r803315237



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -599,4 +600,34 @@ public static Object getRecordColumnValues(HoodieRecord<? 
extends HoodieRecordPa
                                              SerializableSchema schema, 
boolean consistentLogicalTimestampEnabled) {
     return getRecordColumnValues(record, columns, schema.get(), 
consistentLogicalTimestampEnabled);
   }
+
+  /**
+   * Accumulate column range statistics for the requested record.
+   *
+   * @param record   - Record to get the column range statistics for
+   * @param schema   - Schema for the record
+   * @param filePath - File that record belongs to
+   */
+  public static void accumulateColumnRanges(IndexedRecord record, Schema 
schema, String filePath,
+                                            Map<String, 
HoodieColumnRangeMetadata<Comparable>> columnRangeMap) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column 
range metadata!");
+    }
+
+    schema.getFields().forEach(field -> {
+      final String fieldVal = getNestedFieldValAsString((GenericRecord) 
record, field.name(), true, true);
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      final HoodieColumnRangeMetadata<Comparable> fieldRange = new 
HoodieColumnRangeMetadata<>(
+          filePath,
+          field.name(),
+          fieldVal,
+          fieldVal,
+          fieldVal == null ? 1 : 0,

Review comment:
       nit: better to declare `1` and `0` as meaningful constant to help 
readers?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -599,4 +600,34 @@ public static Object getRecordColumnValues(HoodieRecord<? 
extends HoodieRecordPa
                                              SerializableSchema schema, 
boolean consistentLogicalTimestampEnabled) {
     return getRecordColumnValues(record, columns, schema.get(), 
consistentLogicalTimestampEnabled);
   }
+
+  /**
+   * Accumulate column range statistics for the requested record.
+   *
+   * @param record   - Record to get the column range statistics for
+   * @param schema   - Schema for the record
+   * @param filePath - File that record belongs to
+   */
+  public static void accumulateColumnRanges(IndexedRecord record, Schema 
schema, String filePath,

Review comment:
       Shall we move this method out of HoodieAvroUtils? I don't think avro 
utils should be concerned with construction of column range metadata. Moreover, 
we can define this method where write config is available so that 
`consistentLogicalTimestampEnabled` is not hardcoded in the call 
`HoodieAvroUtils#getNestedFieldValAsString`. I am okay with keeping this method 
private in `HoodieAppendHandle` as that's the only place it's being used 
currently.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
##########
@@ -33,6 +38,24 @@
   private final long totalSize;
   private final long totalUncompressedSize;
 
+  public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, 
HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> 
COLUMN_RANGE_MERGE_FUNCTION =
+      (oldColumnRange, newColumnRange) -> {
+        
ValidationUtils.checkArgument(oldColumnRange.getColumnName().equals(newColumnRange.getColumnName()));
+        
ValidationUtils.checkArgument(oldColumnRange.getFilePath().equals(newColumnRange.getFilePath()));
+        return new HoodieColumnRangeMetadata<>(
+            newColumnRange.getFilePath(),
+            newColumnRange.getColumnName(),
+            (Comparable) Arrays.asList(oldColumnRange.getMinValue(), 
newColumnRange.getMinValue())

Review comment:
       nit: remove redundant cast to comparable?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -874,45 +869,53 @@ public static HoodieTableFileSystemView 
getFileSystemView(HoodieTableMetaClient
     }
   }
 
-  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient) {
-    return getLatestColumns(datasetMetaClient, false);
+  private static List<String> getColumnsToIndex(HoodieTableMetaClient 
datasetMetaClient) {
+    return getColumnsToIndex(datasetMetaClient, false);
   }
 
   public static Stream<HoodieRecord> 
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
                                                                      
HoodieTableMetaClient datasetMetaClient,
-                                                                     
List<String> latestColumns) {
-    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, latestColumns, false);
+                                                                     
List<String> columnsToIndex) {
+    Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> columnRangeMap 
= Option.empty();
+    if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) 
writeStat).getRecordsStats().isPresent()) {
+      columnRangeMap = Option.of(((HoodieDeltaWriteStat) 
writeStat).getRecordsStats().get().getStats());
+    }
+    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, columnsToIndex,
+        columnRangeMap, false);
 
   }
 
   private static Stream<HoodieRecord> getColumnStats(final String 
partitionPath, final String filePathWithPartition,
                                                      HoodieTableMetaClient 
datasetMetaClient,
-                                                     List<String> columns, 
boolean isDeleted) {
+                                                     List<String> 
columnsToIndex,
+                                                     Option<Map<String, 
HoodieColumnRangeMetadata<Comparable>>> columnRangeMap,
+                                                     boolean isDeleted) {
     final String partition = partitionPath.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionPath;
     final int offset = partition.equals(NON_PARTITIONED_NAME) ? 
(filePathWithPartition.startsWith("/") ? 1 : 0)
         : partition.length() + 1;
     final String fileName = filePathWithPartition.substring(offset);
-    if (!FSUtils.isBaseFile(new Path(fileName))) {
-      return Stream.empty();
-    }
 
     if 
(filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
       List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = 
new ArrayList<>();
       final Path fullFilePath = new Path(datasetMetaClient.getBasePath(), 
filePathWithPartition);
       if (!isDeleted) {
         try {
           columnRangeMetadataList = new 
ParquetUtils().readRangeFromParquetMetadata(
-              datasetMetaClient.getHadoopConf(), fullFilePath, columns);
+              datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex);
         } catch (Exception e) {
           LOG.error("Failed to read column stats for " + fullFilePath, e);
         }
       } else {
         columnRangeMetadataList =
-            columns.stream().map(entry -> new 
HoodieColumnRangeMetadata<Comparable>(fileName,
+            columnsToIndex.stream().map(entry -> new 
HoodieColumnRangeMetadata<Comparable>(fileName,
                     entry, null, null, 0, 0, 0, 0))
                 .collect(Collectors.toList());
       }
       return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, 
columnRangeMetadataList, isDeleted);
+    } else if (columnRangeMap.isPresent()) {

Review comment:
       Should we also check that the stat map is non-empty?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -118,6 +121,26 @@
   private HoodieMetadataBloomFilter bloomFilterMetadata = null;
   private HoodieMetadataColumnStats columnStatMetadata = null;
 
+  public static final BiFunction<HoodieMetadataColumnStats, 
HoodieMetadataColumnStats, HoodieMetadataColumnStats> 
COLUMN_STATS_MERGE_FUNCTION =
+      (oldColumnStats, newColumnStats) -> {
+        
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+        if (newColumnStats.getIsDeleted()) {
+          return newColumnStats;
+        }
+        return new HoodieMetadataColumnStats(
+            newColumnStats.getFileName(),
+            Arrays.asList(oldColumnStats.getMinValue(), 
newColumnStats.getMinValue())
+                
.stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null),
+            Arrays.asList(oldColumnStats.getMinValue(), 
newColumnStats.getMinValue())
+                
.stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null),
+            oldColumnStats.getNullCount() + newColumnStats.getNullCount(),

Review comment:
       Is my understanding correct that since this is append handle and we 
don't expect duplicates so simply add these stats?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -118,6 +121,26 @@
   private HoodieMetadataBloomFilter bloomFilterMetadata = null;
   private HoodieMetadataColumnStats columnStatMetadata = null;
 
+  public static final BiFunction<HoodieMetadataColumnStats, 
HoodieMetadataColumnStats, HoodieMetadataColumnStats> 
COLUMN_STATS_MERGE_FUNCTION =
+      (oldColumnStats, newColumnStats) -> {
+        
ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName()));
+        if (newColumnStats.getIsDeleted()) {
+          return newColumnStats;
+        }
+        return new HoodieMetadataColumnStats(
+            newColumnStats.getFileName(),

Review comment:
       So this field is called `fileName` in HoodieMetadataColumnStats but 
`filePath` in HoodieColumnRangeMetadata. If possible, can we keep the names 
consistent?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to