nsivabalan commented on code in PR #18826:
URL: https://github.com/apache/hudi/pull/18826#discussion_r3479804379
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Sums row counts read from each base file's footer metadata, in parallel
via the engine context.
+ * Used in place of materializing and counting an RDD of records during RLI
bootstrap.
+ */
+ private long estimateRecordCountFromBaseFiles(List<Pair<String,
HoodieBaseFile>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return 0L;
+ }
+ int parallelism = Math.min(partitionBaseFilePairs.size(),
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ StorageConfiguration<?> storageConfBroadcast = storageConf;
+ return engineContext.parallelize(partitionBaseFilePairs, parallelism)
+ .map(partitionAndBaseFile -> {
+ HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
+ StoragePath path = baseFile.getStoragePath();
+ try {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(path,
storageConfBroadcast);
+ return
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(path).getRowCount(storage,
path);
+ } catch (Exception e) {
+ LOG.warn("Failed to read row count from base file footer: {}",
path, e);
+ return 0L;
Review Comment:
Addressed in bec52e4dc96c: removed the catch-all. `getRowCount` is unchecked
so `UnsupportedOperationException` (from `HFileUtils.getRowCount`) and
`HoodieIOException` (from format-specific readers) now propagate, matching
`countRecordsInHFiles`. An HFile-formatted base file will fail fast rather than
silently estimate 0; an HFile-friendly estimator path can be a follow-up if
needed.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition(
dataMetaClient,
dataWriteConfig);
- // Initialize the file groups
- final int fileGroupCount = estimateFileGroupCount(records);
+ Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
Review Comment:
Addressed in bec52e4dc96c: persist is now declared in the caller
(`initializeFilegroupsAndCommitToRecordIndexPartition`) only when
`isRecordIndexInitializationValidationEnabled()` is true, and *before* the
bulk-insert in both partitioned and non-partitioned branches — so bulkCommit's
materialization populates the cache and `validateRecordIndex#count()` reads
back from it. The matching `unpersist()` lives in the same validation-only
branch. Happy path is deliberately left unpersisted (the RDD is consumed
exactly once by bulk-insert), which is the original optimization.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition(
dataMetaClient,
dataWriteConfig);
- // Initialize the file groups
- final int fileGroupCount = estimateFileGroupCount(records);
+ Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
+ int minFileGroupCount = bounds.getLeft();
+ int maxFileGroupCount = bounds.getRight();
+
+ int fileGroupCount;
+ if (minFileGroupCount != maxFileGroupCount) {
+ // Estimate file group count based on record count read from base file
footer metadata.
+ // Avoids the expensive records.persist() + records.count() pass over
materialized records.
+ List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs =
latestMergedPartitionFileSliceList.stream()
Review Comment:
Addressed in bec52e4dc96c with a code comment: RLI is keyed by record key,
and log files in a slice can only carry updates or deletes for existing records
(RLI updates rewrite the entry rather than add a new one). Log files never
contribute new distinct record-key cardinality, so filtering to base-file-only
slices for the row-count estimate cannot undercount the file-group sizing. No
follow-up needed unless we revisit the model that log files only carry
updates/deletes.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Sums row counts read from each base file's footer metadata, in parallel
via the engine context.
+ * Used in place of materializing and counting an RDD of records during RLI
bootstrap.
+ */
+ private long estimateRecordCountFromBaseFiles(List<Pair<String,
HoodieBaseFile>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return 0L;
+ }
+ int parallelism = Math.min(partitionBaseFilePairs.size(),
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ StorageConfiguration<?> storageConfBroadcast = storageConf;
Review Comment:
As wombatu-kun noted, this name is the existing pattern at
`countRecordsInHFiles` (HoodieBackedTableMetadataWriter.java:908) which this
method mirrors. Renaming only the new copy diverges from the established
convention; happy to do a separate rename of both call sites as a minor cleanup
PR if you prefer.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Sums row counts read from each base file's footer metadata, in parallel
via the engine context.
+ * Used in place of materializing and counting an RDD of records during RLI
bootstrap.
+ */
+ private long estimateRecordCountFromBaseFiles(List<Pair<String,
HoodieBaseFile>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return 0L;
+ }
+ int parallelism = Math.min(partitionBaseFilePairs.size(),
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ StorageConfiguration<?> storageConfBroadcast = storageConf;
+ return engineContext.parallelize(partitionBaseFilePairs, parallelism)
+ .map(partitionAndBaseFile -> {
+ HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
Review Comment:
Addressed in 4c398477 (now part of bec52e4dc96c after rebase) — switched to
`getRight()` for uniformity.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -4379,6 +4379,104 @@ private void changeTableVersion(HoodieTableVersion
version) throws IOException {
}
}
+ /**
+ * Validates that RLI initialization estimates file group count from base
file footer metadata
+ * (instead of materializing and counting records) when min != max file
group count.
+ */
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testRecordIndexFileGroupEstimation(HoodieTableType tableType)
throws Exception {
Review Comment:
Good catch. Fixed in bec52e4dc96c: pinned `growthFactor=2.0` and
`maxFileGroupSizeBytes=2400` in `testRecordIndexFileGroupEstimation`. The math
now lands at 8 file groups (200 records × 2.0 / (2400/48 = 50) = 8, between
min=1 and max=10) and the assertion is bumped to `fileGroupCount > 1` — if the
estimator returns 0 or its output is ignored, the count clamps to
minFileGroupCount=1 and the test fails.
`testRecordIndexWithFixedFileGroupCount` intentionally exercises the bypass
branch (the Javadoc already calls that out) so it's left as-is.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition(
dataMetaClient,
dataWriteConfig);
- // Initialize the file groups
- final int fileGroupCount = estimateFileGroupCount(records);
+ Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
+ int minFileGroupCount = bounds.getLeft();
+ int maxFileGroupCount = bounds.getRight();
+
+ int fileGroupCount;
+ if (minFileGroupCount != maxFileGroupCount) {
+ // Estimate file group count based on record count read from base file
footer metadata.
+ // Avoids the expensive records.persist() + records.count() pass over
materialized records.
+ List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs =
latestMergedPartitionFileSliceList.stream()
+ .filter(p -> p.getRight().getBaseFile().isPresent())
+ .map(p -> Pair.of(p.getLeft(), p.getRight().getBaseFile().get()))
+ .collect(Collectors.toList());
+ fileGroupCount = estimateFileGroupCountFromBaseFiles(
+ partitionBaseFilePairs, minFileGroupCount, maxFileGroupCount);
+ LOG.info("Estimated {} file groups from base file footer metadata",
fileGroupCount);
+ } else {
+ // min == max: skip estimation, use the fixed value directly.
+ fileGroupCount = minFileGroupCount;
+ LOG.info("Using user-configured file group count: {}", fileGroupCount);
+ }
+
LOG.info("Initializing record index with {} file groups.", fileGroupCount);
return Pair.of(fileGroupCount, records);
}
- private int estimateFileGroupCount(HoodieData<HoodieRecord> records) {
- int minFileGroupCount;
- int maxFileGroupCount;
+ /**
+ * Returns the (min, max) file group count bounds for RLI based on which RLI
variant is enabled.
+ */
+ private Pair<Integer, Integer> getRLIFileGroupCountBounds() {
if (dataWriteConfig.isRecordLevelIndexEnabled()) {
- minFileGroupCount =
dataWriteConfig.getRecordLevelIndexMinFileGroupCount();
- maxFileGroupCount =
dataWriteConfig.getRecordLevelIndexMaxFileGroupCount();
- } else {
- minFileGroupCount =
dataWriteConfig.getGlobalRecordLevelIndexMinFileGroupCount();
- maxFileGroupCount =
dataWriteConfig.getGlobalRecordLevelIndexMaxFileGroupCount();
- }
- Supplier<Long> recordCountSupplier = () -> {
- records.persist("MEMORY_AND_DISK_SER");
- long count = records.count();
- LOG.info("Initializing record index with {} mappings", count);
- return count;
- };
+ return Pair.of(dataWriteConfig.getRecordLevelIndexMinFileGroupCount(),
Review Comment:
Good question. Yes — addressed in bec52e4dc96c. The persist now happens in
the caller before `bulkCommit` (only when validation is enabled), so
`validateRecordIndex#count()` reads from the cached RDD instead of re-deriving
keys. See the reply on the related thread above for the full plumbing.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -859,6 +885,32 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Sums row counts read from each base file's footer metadata, in parallel
via the engine context.
+ * Used in place of materializing and counting an RDD of records during RLI
bootstrap.
+ */
+ private long estimateRecordCountFromBaseFiles(List<Pair<String,
HoodieBaseFile>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return 0L;
+ }
+ int parallelism = Math.min(partitionBaseFilePairs.size(),
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ StorageConfiguration<?> storageConfBroadcast = storageConf;
Review Comment:
Same response as the earlier thread on this name — mirroring
`countRecordsInHFiles`'s convention. Open to a rename across both call sites in
a separate cleanup PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]