yihua commented on code in PR #10352:
URL: https://github.com/apache/hudi/pull/10352#discussion_r1585240628


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1872,4 +1883,175 @@ public HoodieRecord next() {
       }
     };
   }
+
+  public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
+                                                                             
List<DirectoryInfo> partitionInfoList,
+                                                                             
HoodieMetadataConfig metadataConfig,
+                                                                             
HoodieTableMetaClient dataTableMetaClient) {
+    final List<String> columnsToIndex = 
metadataConfig.getColumnsEnabledForColumnStatsIndex();
+    if (columnsToIndex.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+    LOG.debug("Indexing following columns for partition stats index: {}", 
columnsToIndex);
+    // Create records for MDT
+    int parallelism = Math.max(Math.min(partitionInfoList.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+    return engineContext.parallelize(partitionInfoList, 
parallelism).flatMap(partitionInfo -> {
+      final String partitionPath = partitionInfo.getRelativePath();
+      // Step 1: Collect Column Metadata for Each File (Your existing code 
does this)
+      List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = 
partitionInfo.getFileNameToSizeMap().keySet().stream()
+          .map(fileName -> getFileStatsRangeMetadata(partitionPath, 
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
+          .collect(toList());
+      // Step 2: Flatten and Group by Column Name
+      Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
columnMetadataMap = fileColumnMetadata.stream()
+          .flatMap(List::stream) // Flatten the list
+          
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName, 
toList())); // Group by column name
+      // Step 3: Aggregate Column Ranges
+      Stream<HoodieColumnRangeMetadata<Comparable>> 
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
+          .map(entry -> 
BaseFileUtils.getColumnRangeInPartition(entry.getValue()));
+      return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, 
partitionStatsRangeMetadata.collect(toList()), false).iterator();
+    });
+  }
+
+  private static List<HoodieColumnRangeMetadata<Comparable>> 
getFileStatsRangeMetadata(String partitionPath,
+                                                                               
        String filePath,
+                                                                               
        HoodieTableMetaClient datasetMetaClient,
+                                                                               
        List<String> columnsToIndex,
+                                                                               
        boolean isDeleted) {
+    String filePartitionPath = filePath.startsWith("/") ? 
filePath.substring(1) : filePath;
+    String fileName = FSUtils.getFileName(filePath, partitionPath);
+    if (isDeleted) {
+      return columnsToIndex.stream()
+          .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
+          .collect(Collectors.toList());
+    }
+    return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, 
columnsToIndex);
+  }
+
+  public static HoodieData<HoodieRecord> 
convertMetadataToPartitionStatsRecords(HoodieCommitMetadata commitMetadata,

Review Comment:
   Got it, makes sense.  We can revisit later why it needs two types of 
methods/code path for generating partition stats records (track it in JIRA).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to