vinothchandar commented on code in PR #10352:
URL: https://github.com/apache/hudi/pull/10352#discussion_r1459591053


##########
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java:
##########
@@ -64,6 +67,51 @@ public static BaseFileUtils getInstance(HoodieFileFormat 
fileFormat) {
     throw new UnsupportedOperationException(fileFormat.name() + " format not 
supported yet.");
   }
 
+  /**
+   * Aggregate column range statistics across files in a partition.
+   *
+   * @param fileRanges List of column range statistics for each file in a 
partition
+   */
+  public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> 
fileRanges) {
+    if (fileRanges.size() == 1) {
+      // Only one parquet file, we can just return that range.
+      return fileRanges.get(0);
+    }
+    // There are multiple files. Compute min(file_mins) and max(file_maxs)
+    return fileRanges.stream()
+        .sequential()
+        .reduce(BaseFileUtils::mergeRanges).get();
+  }
+
+  private static  <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
mergeRanges(HoodieColumnRangeMetadata<T> one,
+                                                                               
      HoodieColumnRangeMetadata<T> another) {
+    final T minValue;
+    final T maxValue;
+    if (one.getMinValue() != null && another.getMinValue() != null) {
+      minValue = 
one.getMinValue().toString().compareTo(another.getMinValue().toString()) < 0 ? 
one.getMinValue() : another.getMinValue();
+    } else if (one.getMinValue() == null) {
+      minValue = another.getMinValue();
+    } else {
+      minValue = one.getMinValue();
+    }
+
+    if (one.getMaxValue() != null && another.getMaxValue() != null) {
+      maxValue = 
one.getMaxValue().toString().compareTo(another.getMaxValue().toString()) < 0 ? 
another.getMaxValue() : one.getMaxValue();
+    } else if (one.getMaxValue() == null) {
+      maxValue = another.getMaxValue();
+    } else {
+      maxValue = one.getMaxValue();
+    }
+
+    return HoodieColumnRangeMetadata.create(
+        one.getFilePath(),

Review Comment:
   the file path itself is dummy right. Lets avoid code like this. Can we pass 
in `null`



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String 
partition, String filename)
       return new Path(basePath, partition + Path.SEPARATOR + filename);
     }
   }
+
+  public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+                                                                             
List<DirectoryInfo> partitionInfoList,
+                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
+    // Find the columns to index
+    HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
+    final List<String> columnsToIndex = getColumnsToIndex(
+        recordsGenerationParams,
+        Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+    if (columnsToIndex.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+    LOG.debug(String.format("Indexing %d columns for partition stats index", 
columnsToIndex.size()));

Review Comment:
   slf4j should support arg substitution without the String.format?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -641,6 +642,36 @@ public static Stream<HoodieRecord> 
createColumnStatsRecords(String partitionName
     });
   }
 
+  public static Stream<HoodieRecord> createPartitionStatsRecords(String 
partitionName,

Review Comment:
   partitionName or partitionPath?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java:
##########
@@ -64,6 +67,51 @@ public static BaseFileUtils getInstance(HoodieFileFormat 
fileFormat) {
     throw new UnsupportedOperationException(fileFormat.name() + " format not 
supported yet.");
   }
 
+  /**
+   * Aggregate column range statistics across files in a partition.
+   *
+   * @param fileRanges List of column range statistics for each file in a 
partition
+   */
+  public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> 
fileRanges) {
+    if (fileRanges.size() == 1) {
+      // Only one parquet file, we can just return that range.
+      return fileRanges.get(0);
+    }
+    // There are multiple files. Compute min(file_mins) and max(file_maxs)
+    return fileRanges.stream()
+        .sequential()
+        .reduce(BaseFileUtils::mergeRanges).get();
+  }
+
+  private static  <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
mergeRanges(HoodieColumnRangeMetadata<T> one,
+                                                                               
      HoodieColumnRangeMetadata<T> another) {
+    final T minValue;
+    final T maxValue;
+    if (one.getMinValue() != null && another.getMinValue() != null) {
+      minValue = 
one.getMinValue().toString().compareTo(another.getMinValue().toString()) < 0 ? 
one.getMinValue() : another.getMinValue();

Review Comment:
   does a string comparison help here? what if the partition columns are 
long/int?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String 
partition, String filename)
       return new Path(basePath, partition + Path.SEPARATOR + filename);
     }
   }
+
+  public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+                                                                             
List<DirectoryInfo> partitionInfoList,
+                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {

Review Comment:
   this seems deprecated?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -641,6 +642,36 @@ public static Stream<HoodieRecord> 
createColumnStatsRecords(String partitionName
     });
   }
 
+  public static Stream<HoodieRecord> createPartitionStatsRecords(String 
partitionName,
+                                                                 
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+                                                                 boolean 
isDeleted) {
+    return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
+      HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionName, 
columnRangeMetadata),
+          MetadataPartitionType.PARTITION_STATS.getPartitionPath());
+
+      HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(),
+          HoodieMetadataColumnStats.newBuilder()
+              .setFileName(new 
Path(columnRangeMetadata.getFilePath()).getName())

Review Comment:
   remove the file name?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String 
partition, String filename)
       return new Path(basePath, partition + Path.SEPARATOR + filename);
     }
   }
+
+  public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,

Review Comment:
   I see that you are using 
`recordsGenParams.getTargetColumnsForColumnStatsIndex();` ultimately to 
generate the stats. do you just add the partition fields into the target 
columns? to make partition stats aggregation happen automatically. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String 
partition, String filename)
       return new Path(basePath, partition + Path.SEPARATOR + filename);
     }
   }
+
+  public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+                                                                             
List<DirectoryInfo> partitionInfoList,
+                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
+    // Find the columns to index
+    HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
+    final List<String> columnsToIndex = getColumnsToIndex(
+        recordsGenerationParams,
+        Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+    if (columnsToIndex.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+    LOG.debug(String.format("Indexing %d columns for partition stats index", 
columnsToIndex.size()));
+    // Create records for MDT
+    int parallelism = Math.max(Math.min(partitionInfoList.size(), 
recordsGenerationParams.getPartitionStatsIndexParallelism()), 1);
+    return engineContext.parallelize(partitionInfoList, 
parallelism).flatMap(partitionFiles -> {
+      final String partitionName = partitionFiles.getRelativePath();
+      Stream<HoodieColumnRangeMetadata<Comparable>> 
partitionStatsRangeMetadata = 
partitionFiles.getFileNameToSizeMap().keySet().stream()
+          .map(fileName -> getFileStatsRangeMetadata(partitionName, 
partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
+          .map(BaseFileUtils::getColumnRangeInPartition);
+      return HoodieMetadataPayload.createPartitionStatsRecords(partitionName, 
partitionStatsRangeMetadata.collect(toList()), false).iterator();
+    });
+  }
+
+  private static List<HoodieColumnRangeMetadata<Comparable>> 
getFileStatsRangeMetadata(String partitionPath,
+                                                                               
        String filePath,
+                                                                               
        HoodieTableMetaClient datasetMetaClient,
+                                                                               
        List<String> columnsToIndex,
+                                                                               
        boolean isDeleted) {
+    String filePartitionPath = filePath.startsWith("/") ? 
filePath.substring(1) : filePath;

Review Comment:
   what are the cases where we would and would not get a "/". or is this just 
defensive ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to