vinothchandar commented on code in PR #10352: URL: https://github.com/apache/hudi/pull/10352#discussion_r1459591053
########## hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java: ########## @@ -64,6 +67,51 @@ public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) { throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); } + /** + * Aggregate column range statistics across files in a partition. + * + * @param fileRanges List of column range statistics for each file in a partition + */ + public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> fileRanges) { + if (fileRanges.size() == 1) { + // Only one parquet file, we can just return that range. + return fileRanges.get(0); + } + // There are multiple files. Compute min(file_mins) and max(file_maxs) + return fileRanges.stream() + .sequential() + .reduce(BaseFileUtils::mergeRanges).get(); + } + + private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> mergeRanges(HoodieColumnRangeMetadata<T> one, + HoodieColumnRangeMetadata<T> another) { + final T minValue; + final T maxValue; + if (one.getMinValue() != null && another.getMinValue() != null) { + minValue = one.getMinValue().toString().compareTo(another.getMinValue().toString()) < 0 ? one.getMinValue() : another.getMinValue(); + } else if (one.getMinValue() == null) { + minValue = another.getMinValue(); + } else { + minValue = one.getMinValue(); + } + + if (one.getMaxValue() != null && another.getMaxValue() != null) { + maxValue = one.getMaxValue().toString().compareTo(another.getMaxValue().toString()) < 0 ? another.getMaxValue() : one.getMaxValue(); + } else if (one.getMaxValue() == null) { + maxValue = another.getMaxValue(); + } else { + maxValue = one.getMaxValue(); + } + + return HoodieColumnRangeMetadata.create( + one.getFilePath(), Review Comment: the file path itself is dummy right. Lets avoid code like this. Can we pass in `null` ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String partition, String filename) return new Path(basePath, partition + Path.SEPARATOR + filename); } } + + public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, + List<DirectoryInfo> partitionInfoList, + MetadataRecordsGenerationParams recordsGenerationParams) { + // Find the columns to index + HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); + final List<String> columnsToIndex = getColumnsToIndex( + recordsGenerationParams, + Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); + if (columnsToIndex.isEmpty()) { + return engineContext.emptyHoodieData(); + } + LOG.debug(String.format("Indexing %d columns for partition stats index", columnsToIndex.size())); Review Comment: slf4j should support arg substitution without the String.format? ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java: ########## @@ -641,6 +642,36 @@ public static Stream<HoodieRecord> createColumnStatsRecords(String partitionName }); } + public static Stream<HoodieRecord> createPartitionStatsRecords(String partitionName, Review Comment: partitionName or partitionPath? ########## hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java: ########## @@ -64,6 +67,51 @@ public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) { throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); } + /** + * Aggregate column range statistics across files in a partition. + * + * @param fileRanges List of column range statistics for each file in a partition + */ + public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> fileRanges) { + if (fileRanges.size() == 1) { + // Only one parquet file, we can just return that range. + return fileRanges.get(0); + } + // There are multiple files. Compute min(file_mins) and max(file_maxs) + return fileRanges.stream() + .sequential() + .reduce(BaseFileUtils::mergeRanges).get(); + } + + private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> mergeRanges(HoodieColumnRangeMetadata<T> one, + HoodieColumnRangeMetadata<T> another) { + final T minValue; + final T maxValue; + if (one.getMinValue() != null && another.getMinValue() != null) { + minValue = one.getMinValue().toString().compareTo(another.getMinValue().toString()) < 0 ? one.getMinValue() : another.getMinValue(); Review Comment: does a string comparison help here? what if the partition columns are long/int? ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String partition, String filename) return new Path(basePath, partition + Path.SEPARATOR + filename); } } + + public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, + List<DirectoryInfo> partitionInfoList, + MetadataRecordsGenerationParams recordsGenerationParams) { Review Comment: this seems deprecated? ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java: ########## @@ -641,6 +642,36 @@ public static Stream<HoodieRecord> createColumnStatsRecords(String partitionName }); } + public static Stream<HoodieRecord> createPartitionStatsRecords(String partitionName, + Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList, + boolean isDeleted) { + return columnRangeMetadataList.stream().map(columnRangeMetadata -> { + HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionName, columnRangeMetadata), + MetadataPartitionType.PARTITION_STATS.getPartitionPath()); + + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), + HoodieMetadataColumnStats.newBuilder() + .setFileName(new Path(columnRangeMetadata.getFilePath()).getName()) Review Comment: remove the file name? ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String partition, String filename) return new Path(basePath, partition + Path.SEPARATOR + filename); } } + + public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, Review Comment: I see that you are using `recordsGenParams.getTargetColumnsForColumnStatsIndex();` ultimately to generate the stats. do you just add the partition fields into the target columns? to make partition stats aggregation happen automatically. ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1901,4 +1910,162 @@ private static Path filePath(String basePath, String partition, String filename) return new Path(basePath, partition + Path.SEPARATOR + filename); } } + + public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, + List<DirectoryInfo> partitionInfoList, + MetadataRecordsGenerationParams recordsGenerationParams) { + // Find the columns to index + HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); + final List<String> columnsToIndex = getColumnsToIndex( + recordsGenerationParams, + Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); + if (columnsToIndex.isEmpty()) { + return engineContext.emptyHoodieData(); + } + LOG.debug(String.format("Indexing %d columns for partition stats index", columnsToIndex.size())); + // Create records for MDT + int parallelism = Math.max(Math.min(partitionInfoList.size(), recordsGenerationParams.getPartitionStatsIndexParallelism()), 1); + return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionFiles -> { + final String partitionName = partitionFiles.getRelativePath(); + Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata = partitionFiles.getFileNameToSizeMap().keySet().stream() + .map(fileName -> getFileStatsRangeMetadata(partitionName, partitionName + "/" + fileName, dataTableMetaClient, columnsToIndex, false)) + .map(BaseFileUtils::getColumnRangeInPartition); + return HoodieMetadataPayload.createPartitionStatsRecords(partitionName, partitionStatsRangeMetadata.collect(toList()), false).iterator(); + }); + } + + private static List<HoodieColumnRangeMetadata<Comparable>> getFileStatsRangeMetadata(String partitionPath, + String filePath, + HoodieTableMetaClient datasetMetaClient, + List<String> columnsToIndex, + boolean isDeleted) { + String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath; Review Comment: what are the cases where we would and would not get a "/". or is this just defensive ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org