hudi-agent commented on code in PR #18826:
URL: https://github.com/apache/hudi/pull/18826#discussion_r3292093962


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition(
         dataMetaClient,
         dataWriteConfig);
 
-    // Initialize the file groups
-    final int fileGroupCount = estimateFileGroupCount(records);
+    Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();

Review Comment:
   🤖 The old code persisted `records` as a side effect inside 
`recordCountSupplier`, which meant `validateRecordIndex` (when 
`hoodie.metadata.record.index.enable.validation.on.initialization=true`) could 
call `recordIndexRecords.count()` cheaply from cache. After this change 
`records` is never persisted, so the validation path will recompute the entire 
RLI records pipeline (re-reading base + log files) to satisfy `count()`. Is 
this regression for validation users intentional? If so, a comment noting the 
trade-off would help future readers.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Sums row counts read from each base file's footer metadata, in parallel 
via the engine context.
+   * Used in place of materializing and counting an RDD of records during RLI 
bootstrap.
+   */
+  private long estimateRecordCountFromBaseFiles(List<Pair<String, 
HoodieBaseFile>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return 0L;
+    }
+    int parallelism = Math.min(partitionBaseFilePairs.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = storageConf;
+    return engineContext.parallelize(partitionBaseFilePairs, parallelism)
+        .map(partitionAndBaseFile -> {
+          HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
+          StoragePath path = baseFile.getStoragePath();
+          try {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(path, 
storageConfBroadcast);
+            return 
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(path).getRowCount(storage,
 path);
+          } catch (Exception e) {
+            LOG.warn("Failed to read row count from base file footer: {}", 
path, e);
+            return 0L;

Review Comment:
   🤖 Swallowing the exception and returning 0L per failed file means many 
failing files would silently under-estimate `estimatedRecordCount` (and 
therefore under-provision file groups) with only a per-file WARN as evidence. 
The analogous `countRecordsInHFiles` above throws `HoodieIOException` on read 
failure. Should this at least surface the failure count somehow, or fail fast 
on persistent errors, since under-provisioned RLI file groups can be painful to 
fix later?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Sums row counts read from each base file's footer metadata, in parallel 
via the engine context.
+   * Used in place of materializing and counting an RDD of records during RLI 
bootstrap.
+   */
+  private long estimateRecordCountFromBaseFiles(List<Pair<String, 
HoodieBaseFile>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return 0L;
+    }
+    int parallelism = Math.min(partitionBaseFilePairs.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = storageConf;
+    return engineContext.parallelize(partitionBaseFilePairs, parallelism)
+        .map(partitionAndBaseFile -> {
+          HoodieBaseFile baseFile = partitionAndBaseFile.getValue();

Review Comment:
   🤖 nit: the rest of this diff uses `getRight()` on `Pair` (e.g. line 836), so 
`partitionAndBaseFile.getValue()` is a bit inconsistent — could you change it 
to `getRight()` for uniformity?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition(
         dataMetaClient,
         dataWriteConfig);
 
-    // Initialize the file groups
-    final int fileGroupCount = estimateFileGroupCount(records);
+    Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
+    int minFileGroupCount = bounds.getLeft();
+    int maxFileGroupCount = bounds.getRight();
+
+    int fileGroupCount;
+    if (minFileGroupCount != maxFileGroupCount) {
+      // Estimate file group count based on record count read from base file 
footer metadata.
+      // Avoids the expensive records.persist() + records.count() pass over 
materialized records.
+      List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = 
latestMergedPartitionFileSliceList.stream()

Review Comment:
   🤖 For MOR, this drops any file slice that has no base file (log-only slice) 
and ignores log-file record contributions for slices that do have base files. 
Tables with heavy log-only inserts, or RLI bootstrap on a long-uncompacted MOR 
table, could see significant undercount that the growth factor may not absorb. 
The PR description acknowledges this; could you confirm whether realistic 
large-MOR scenarios were considered, or add a follow-up to factor in log files 
(e.g. via `HoodieLogFileReader` block header record counts)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Sums row counts read from each base file's footer metadata, in parallel 
via the engine context.
+   * Used in place of materializing and counting an RDD of records during RLI 
bootstrap.
+   */
+  private long estimateRecordCountFromBaseFiles(List<Pair<String, 
HoodieBaseFile>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return 0L;
+    }
+    int parallelism = Math.min(partitionBaseFilePairs.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = storageConf;

Review Comment:
   🤖 nit: `storageConfBroadcast` reads like a Spark `Broadcast<T>` variable, 
but it's just a local copy of the instance field for lambda capture. Could you 
rename it to something like `localStorageConf` or `capturedStorageConf` to 
avoid that confusion?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to