nsivabalan commented on a change in pull request #4848: URL: https://github.com/apache/hudi/pull/4848#discussion_r818222679
########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -799,30 +824,20 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient /** * Create column stats from write status. * - * @param engineContext - Engine context - * @param datasetMetaClient - Dataset meta client - * @param allWriteStats - Write status to convert - * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for indexing + * @param engineContext - Engine context + * @param allWriteStats - Write status to convert + * @param recordsGenerationParams - Parameters for columns stats record generation */ - public static List<HoodieRecord> createColumnStatsFromWriteStats(HoodieEngineContext engineContext, - HoodieTableMetaClient datasetMetaClient, - List<HoodieWriteStat> allWriteStats, - boolean isMetaIndexColumnStatsForAllColumns) throws Exception { + public static HoodieData<HoodieRecord> createColumnStatsFromWriteStats(HoodieEngineContext engineContext, + List<HoodieWriteStat> allWriteStats, + MetadataRecordsGenerationParams recordsGenerationParams) { if (allWriteStats.isEmpty()) { - return Collections.emptyList(); - } - - List<HoodieWriteStat> prunedWriteStats = allWriteStats.stream().filter(writeStat -> { - return !(writeStat instanceof HoodieDeltaWriteStat); - }).collect(Collectors.toList()); - if (prunedWriteStats.isEmpty()) { - return Collections.emptyList(); + return engineContext.emptyHoodieData(); } - - return engineContext.flatMap(prunedWriteStats, - writeStat -> translateWriteStatToColumnStats(writeStat, datasetMetaClient, - getLatestColumns(datasetMetaClient, isMetaIndexColumnStatsForAllColumns)), - prunedWriteStats.size()); + HoodieData<HoodieWriteStat> allWriteStatsRDD = engineContext.parallelize( + allWriteStats, Math.max(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism())); + return allWriteStatsRDD.flatMap(writeStat -> translateWriteStatToColumnStats(writeStat, recordsGenerationParams.getDataMetaClient(), + getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled())).iterator()); Review comment: should we move getColumnsToIndex to driver above (line 837 may be) and avoid computing getColumnsToIndex in every executor ? ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -922,4 +952,39 @@ public static int getPartitionFileGroupCount(final MetadataPartitionType partiti } } + /** + * Computes column range metadata + * + * @param recordList - list of records from which column range statistics will be computed + * @param field - column name for which statistics will be computed + * @param filePath - data file path + * @param columnRangeMap - old column range statistics, which will be merged in this computation + * @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value + */ + public static void accumulateColumnRanges(List<IndexedRecord> recordList, Schema.Field field, String filePath, Review comment: I see this is getting called from HoodieAppendHandle and we call it for every field/column. ie for every field -> accumulatecolumnRanges { iterate through every record and find cols stats } Since this is avro/row based format, why can't we collect stats for fields/cols at once per record and keep iterating through every record to eventually find col stats for all fields. essentially we are doing a columnar read across records for N no of columns. I am proposing if we can flip that to read entire record, fetch all stats and proceed further. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org