hudi-agent commented on code in PR #18826:
URL: https://github.com/apache/hudi/pull/18826#discussion_r3292093962
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition(
dataMetaClient,
dataWriteConfig);
- // Initialize the file groups
- final int fileGroupCount = estimateFileGroupCount(records);
+ Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
Review Comment:
🤖 The old code persisted `records` as a side effect inside
`recordCountSupplier`, which meant `validateRecordIndex` (when
`hoodie.metadata.record.index.enable.validation.on.initialization=true`) could
call `recordIndexRecords.count()` cheaply from cache. After this change
`records` is never persisted, so the validation path will recompute the entire
RLI records pipeline (re-reading base + log files) to satisfy `count()`. Is
this regression for validation users intentional? If so, a comment noting the
trade-off would help future readers.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Sums row counts read from each base file's footer metadata, in parallel
via the engine context.
+ * Used in place of materializing and counting an RDD of records during RLI
bootstrap.
+ */
+ private long estimateRecordCountFromBaseFiles(List<Pair<String,
HoodieBaseFile>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return 0L;
+ }
+ int parallelism = Math.min(partitionBaseFilePairs.size(),
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ StorageConfiguration<?> storageConfBroadcast = storageConf;
+ return engineContext.parallelize(partitionBaseFilePairs, parallelism)
+ .map(partitionAndBaseFile -> {
+ HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
+ StoragePath path = baseFile.getStoragePath();
+ try {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(path,
storageConfBroadcast);
+ return
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(path).getRowCount(storage,
path);
+ } catch (Exception e) {
+ LOG.warn("Failed to read row count from base file footer: {}",
path, e);
+ return 0L;
Review Comment:
🤖 Swallowing the exception and returning 0L per failed file means many
failing files would silently under-estimate `estimatedRecordCount` (and
therefore under-provision file groups) with only a per-file WARN as evidence.
The analogous `countRecordsInHFiles` above throws `HoodieIOException` on read
failure. Should this at least surface the failure count somehow, or fail fast
on persistent errors, since under-provisioned RLI file groups can be painful to
fix later?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Sums row counts read from each base file's footer metadata, in parallel
via the engine context.
+ * Used in place of materializing and counting an RDD of records during RLI
bootstrap.
+ */
+ private long estimateRecordCountFromBaseFiles(List<Pair<String,
HoodieBaseFile>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return 0L;
+ }
+ int parallelism = Math.min(partitionBaseFilePairs.size(),
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ StorageConfiguration<?> storageConfBroadcast = storageConf;
+ return engineContext.parallelize(partitionBaseFilePairs, parallelism)
+ .map(partitionAndBaseFile -> {
+ HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
Review Comment:
🤖 nit: the rest of this diff uses `getRight()` on `Pair` (e.g. line 836), so
`partitionAndBaseFile.getValue()` is a bit inconsistent — could you change it
to `getRight()` for uniformity?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition(
dataMetaClient,
dataWriteConfig);
- // Initialize the file groups
- final int fileGroupCount = estimateFileGroupCount(records);
+ Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
+ int minFileGroupCount = bounds.getLeft();
+ int maxFileGroupCount = bounds.getRight();
+
+ int fileGroupCount;
+ if (minFileGroupCount != maxFileGroupCount) {
+ // Estimate file group count based on record count read from base file
footer metadata.
+ // Avoids the expensive records.persist() + records.count() pass over
materialized records.
+ List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs =
latestMergedPartitionFileSliceList.stream()
Review Comment:
🤖 For MOR, this drops any file slice that has no base file (log-only slice)
and ignores log-file record contributions for slices that do have base files.
Tables with heavy log-only inserts, or RLI bootstrap on a long-uncompacted MOR
table, could see significant undercount that the growth factor may not absorb.
The PR description acknowledges this; could you confirm whether realistic
large-MOR scenarios were considered, or add a follow-up to factor in log files
(e.g. via `HoodieLogFileReader` block header record counts)?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Sums row counts read from each base file's footer metadata, in parallel
via the engine context.
+ * Used in place of materializing and counting an RDD of records during RLI
bootstrap.
+ */
+ private long estimateRecordCountFromBaseFiles(List<Pair<String,
HoodieBaseFile>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return 0L;
+ }
+ int parallelism = Math.min(partitionBaseFilePairs.size(),
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ StorageConfiguration<?> storageConfBroadcast = storageConf;
Review Comment:
🤖 nit: `storageConfBroadcast` reads like a Spark `Broadcast<T>` variable,
but it's just a local copy of the instance field for lambda capture. Could you
rename it to something like `localStorageConf` or `capturedStorageConf` to
avoid that confusion?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]