yihua commented on code in PR #10352: URL: https://github.com/apache/hudi/pull/10352#discussion_r1585240628
########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1872,4 +1883,175 @@ public HoodieRecord next() { } }; } + + public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext, + List<DirectoryInfo> partitionInfoList, + HoodieMetadataConfig metadataConfig, + HoodieTableMetaClient dataTableMetaClient) { + final List<String> columnsToIndex = metadataConfig.getColumnsEnabledForColumnStatsIndex(); + if (columnsToIndex.isEmpty()) { + return engineContext.emptyHoodieData(); + } + LOG.debug("Indexing following columns for partition stats index: {}", columnsToIndex); + // Create records for MDT + int parallelism = Math.max(Math.min(partitionInfoList.size(), metadataConfig.getPartitionStatsIndexParallelism()), 1); + return engineContext.parallelize(partitionInfoList, parallelism).flatMap(partitionInfo -> { + final String partitionPath = partitionInfo.getRelativePath(); + // Step 1: Collect Column Metadata for Each File (Your existing code does this) + List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = partitionInfo.getFileNameToSizeMap().keySet().stream() + .map(fileName -> getFileStatsRangeMetadata(partitionPath, partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false)) + .collect(toList()); + // Step 2: Flatten and Group by Column Name + Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap = fileColumnMetadata.stream() + .flatMap(List::stream) // Flatten the list + .collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName, toList())); // Group by column name + // Step 3: Aggregate Column Ranges + Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream() + .map(entry -> BaseFileUtils.getColumnRangeInPartition(entry.getValue())); + return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, partitionStatsRangeMetadata.collect(toList()), false).iterator(); + }); + } + + private static List<HoodieColumnRangeMetadata<Comparable>> getFileStatsRangeMetadata(String partitionPath, + String filePath, + HoodieTableMetaClient datasetMetaClient, + List<String> columnsToIndex, + boolean isDeleted) { + String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath; + String fileName = FSUtils.getFileName(filePath, partitionPath); + if (isDeleted) { + return columnsToIndex.stream() + .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry)) + .collect(Collectors.toList()); + } + return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, columnsToIndex); + } + + public static HoodieData<HoodieRecord> convertMetadataToPartitionStatsRecords(HoodieCommitMetadata commitMetadata, Review Comment: Got it, makes sense. We can revisit later why it needs two types of methods/code path for generating partition stats records (track it in JIRA). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org