This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fb7b1a5d9111 feat(metadata-table): Add count validation for record 
index bootstrap (#18029)
fb7b1a5d9111 is described below

commit fb7b1a5d9111f41fbcd741edc62f036cc9e344ed
Author: Prashant Wason <[email protected]>
AuthorDate: Wed Feb 25 16:08:02 2026 -0800

    feat(metadata-table): Add count validation for record index bootstrap 
(#18029)
    
    * Add count validation for record index bootstrap of a table
    
    This commit adds validation for record index bootstrap by comparing
    the expected record count with the actual record count stored in the
    metadata table. This helps ensure data integrity during the bootstrap
    process.
    
    Changes:
    - Added validateRecordIndex method in HoodieBackedTableMetadataWriter
      to validate record counts after bootstrap
    - Added getTotalRecordIndexRecords method in HoodieBackedTableMetadata
      to get total records from file slice base files
    - Updated initializeFilegroupsAndCommitToRecordIndexPartition to call
      validation after commit when duplicates are not allowed
    
    * Address PR review comments for record index validation
    
    - Add config flag hoodie.metadata.record.index.bootstrap.validation.enable
      (disabled by default) to explicitly control validation
    - Use getPartitionLatestMergedFileSlices() to handle pending compactions
    - Make validation distributed using engineContext.parallelize() instead
      of iterating in driver
    - Remove unused getTotalRecordIndexRecords() from HoodieBackedTableMetadata
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    * Address review comments: close FSV, rename config, enable in test
    
    1. Close FSView at the end of validateRecordIndex method using try-finally
    2. Renamed config from 
'hoodie.metadata.record.index.bootstrap.validation.enable'
       to 'hoodie.metadata.record.index.enable.validation.on.initialization'
    3. Enabled validation in testRollbackPendingCommitWithRecordIndex test
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 97 +++++++++++++++++++++-
 .../hudi/common/config/HoodieMetadataConfig.java   | 13 +++
 .../hudi/functional/TestHoodieBackedMetadata.java  |  2 +
 3 files changed, 108 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a78fd998866e..327cdc1b4b05 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -24,8 +24,10 @@ import org.apache.hudi.avro.model.HoodieIndexPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.EngineType;
@@ -79,6 +81,7 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.index.record.HoodieRecordIndex;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil.DirectoryInfo;
 import org.apache.hudi.storage.HoodieStorage;
@@ -740,18 +743,26 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
                                                                    
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList, 
boolean isPartitionedRLI) throws IOException {
     createRecordIndexDefinition(dataMetaClient, 
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, 
String.valueOf(isPartitionedRLI)));
     HoodieData<HoodieRecord> recordIndexRecords;
+    int fileGroupCount;
     if (isPartitionedRLI) {
-      recordIndexRecords = 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
 lazyLatestMergedPartitionFileSliceList);
+      Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecords = 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
 lazyLatestMergedPartitionFileSliceList);
+      fileGroupCount = fgCountAndRecords.getKey();
+      recordIndexRecords = fgCountAndRecords.getValue();
     } else {
       Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecordIndexRecords = 
initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(),
           dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+      fileGroupCount = fgCountAndRecordIndexRecords.getKey();
       recordIndexRecords = fgCountAndRecordIndexRecords.getRight();
       initializeFilegroupsAndCommit(RECORD_INDEX, 
RECORD_INDEX.getPartitionPath(), fgCountAndRecordIndexRecords, 
commitTimeForPartition);
     }
+    // Validate record index after commit if validation is enabled
+    if 
(dataWriteConfig.getMetadataConfig().isRecordIndexInitializationValidationEnabled())
 {
+      validateRecordIndex(recordIndexRecords, fileGroupCount);
+    }
     recordIndexRecords.unpersist();
   }
 
-  private HoodieData<HoodieRecord> 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(String 
commitTimeForPartition,
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(String 
commitTimeForPartition,
                                                                               
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList) 
throws IOException {
     Map<String, List<Pair<String, FileSlice>>> partitionFileSlicePairsMap = 
lazyLatestMergedPartitionFileSliceList.get().stream()
         .collect(Collectors.groupingBy(Pair::getKey));
@@ -762,8 +773,9 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
       fileGroupCountAndRecordsPairMap.put(partition, 
initializeRecordIndexPartition(partitionFileSlicePairsMap.get(partition), 
maxParallelismPerHudiPartition));
     }
 
+    int totalFileGroupCount = 
fileGroupCountAndRecordsPairMap.values().stream().mapToInt(Pair::getLeft).sum();
     if (LOG.isInfoEnabled()) {
-      LOG.info("Initializing partitioned record index with {} mappings", 
fileGroupCountAndRecordsPairMap.values().stream().mapToInt(Pair::getLeft).sum());
+      LOG.info("Initializing partitioned record index with {} mappings", 
totalFileGroupCount);
     }
 
     HoodieTimer partitionInitTimer = HoodieTimer.start();
@@ -787,7 +799,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
     initMetadataReader();
     long totalInitTime = partitionInitTimer.endTimer();
     LOG.info("Initializing partitioned record index in metadata table took {} 
in ms", totalInitTime);
-    return records;
+    return Pair.of(totalFileGroupCount, records);
   }
 
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition(
@@ -835,6 +847,83 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
     );
   }
 
+  /**
+   * Validates the record index after bootstrap by comparing the expected 
record count with the actual
+   * record count stored in the metadata table. The validation is performed in 
a distributed manner
+   * using the engine context to count records from HFiles in parallel.
+   *
+   * @param recordIndexRecords the HoodieData containing the expected records
+   * @param fileGroupCount the expected number of file groups
+   */
+  private void validateRecordIndex(HoodieData<HoodieRecord> 
recordIndexRecords, int fileGroupCount) {
+    String partitionName = 
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+    try {
+      // Use merged file slices to handle cases with pending compactions
+      List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
fsView, partitionName);
+
+      // Filter to only file slices with base files and extract their storage 
paths
+      List<StoragePath> baseFilePaths = fileSlices.stream()
+          .filter(fs -> fs.getBaseFile().isPresent())
+          .map(fs -> fs.getBaseFile().get().getStoragePath())
+          .collect(Collectors.toList());
+
+      // Count records in a distributed manner using the engine context
+      long totalRecords = countRecordsInHFiles(baseFilePaths);
+      long expectedRecordCount = recordIndexRecords.count();
+
+      ValidationUtils.checkArgument(totalRecords == expectedRecordCount, 
"Record Count Validation failed with "
+          + totalRecords + " present in record index vs the expected " + 
expectedRecordCount);
+      LOG.info(String.format("Record index initialized on %d shards (expected 
= %d) with %d records (expected = %d)",
+          fileSlices.size(), fileGroupCount, totalRecords, 
expectedRecordCount));
+    } finally {
+      fsView.close();
+    }
+  }
+
+  /**
+   * Counts the total number of records in HFiles in a distributed manner.
+   *
+   * @param baseFilePaths list of storage paths to HFiles
+   * @return total number of records across all HFiles
+   */
+  private long countRecordsInHFiles(List<StoragePath> baseFilePaths) {
+    if (baseFilePaths.isEmpty()) {
+      return 0L;
+    }
+
+    int parallelism = Math.min(baseFilePaths.size(), 
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = storageConf;
+    HoodieFileFormat baseFileFormat = 
metadataMetaClient.getTableConfig().getBaseFileFormat();
+
+    return engineContext.parallelize(baseFilePaths, parallelism)
+        .mapPartitions(pathIterator -> {
+          long count = 0L;
+          while (pathIterator.hasNext()) {
+            StoragePath path = pathIterator.next();
+            try {
+              HoodieStorage storage = HoodieStorageUtils.getStorage(path, 
storageConfBroadcast);
+              HoodieConfig readerConfig = new HoodieConfig();
+              HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(storage)
+                  .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+                  .getFileReader(readerConfig, path, baseFileFormat, 
Option.empty());
+              try {
+                count += reader.getTotalRecords();
+              } finally {
+                reader.close();
+              }
+            } catch (IOException e) {
+              throw new HoodieIOException("Error reading total records from 
file " + path, e);
+            }
+          }
+          return Collections.singletonList(count).iterator();
+        }, true)
+        .collectAsList()
+        .stream()
+        .mapToLong(Long::longValue)
+        .sum();
+  }
+
   /**
    * Fetch record locations from FileSlice snapshot.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index f0ad66449396..4b1447d3ca09 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -624,6 +624,15 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .withDocumentation("when set to true, it fails the job on metadata 
table's "
           + "table services operation failure");
 
+  public static final ConfigProperty<Boolean> 
RECORD_INDEX_INITIALIZATION_VALIDATION_ENABLE = ConfigProperty
+      .key(METADATA_PREFIX + 
".record.index.enable.validation.on.initialization")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Enable validation of record index after 
initialization by comparing the expected record count "
+          + "with the actual record count stored in the metadata table. This 
validation runs in a distributed manner "
+          + "using the compute engine. Disabled by default as it adds overhead 
to the initialization process.");
+
   public long getMaxLogFileSize() {
     return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
   }
@@ -768,6 +777,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getInt(RECORD_INDEX_MAX_PARALLELISM);
   }
 
+  public boolean isRecordIndexInitializationValidationEnabled() {
+    return getBooleanOrDefault(RECORD_INDEX_INITIALIZATION_VALIDATION_ENABLE);
+  }
+
   public boolean shouldAutoInitialize() {
     return getBoolean(AUTO_INITIALIZE);
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 7268b3849308..a766d30c2058 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -2916,6 +2916,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     Properties props = new Properties();
     
props.setProperty(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
     props.setProperty(HoodieIndexConfig.INDEX_TYPE.key(), "RECORD_INDEX");
+    // Enable validation of record index on initialization
+    
props.setProperty(HoodieMetadataConfig.RECORD_INDEX_INITIALIZATION_VALIDATION_ENABLE.key(),
 "true");
     HoodieWriteConfig cfg = getWriteConfigBuilder(true, true, false)
         .withProps(props).build();
     // Initialize write client.

Reply via email to