This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 69fa35b1015f feat(metadata): Defer RLI initialization for fresh tables 
to optimize file group allocation (#18353)
69fa35b1015f is described below

commit 69fa35b1015fcc39e762c29b03bef8fa2ec59d58
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Mar 25 12:14:53 2026 -0700

    feat(metadata): Defer RLI initialization for fresh tables to optimize file 
group allocation (#18353)
    
    This PR optimizes Record Level Index (RLI) initialization for fresh Hudi 
tables by deferring RLI bootstrapping to the second commit. This enhancement 
allows for dynamic file group count determination based on actual data 
characteristics, improving both performance and resource utilization. Users can 
configure "hoodie.metadata.record.level.index.defer.init" for the purpose. 
Default value is false.
    
    
    ---------
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  11 +-
 .../hudi/common/config/HoodieMetadataConfig.java   |  23 ++++
 .../hudi/functional/TestHoodieBackedMetadata.java  | 132 +++++++++++++++++++++
 3 files changed, 164 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 327cdc1b4b05..8da89553fcf9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -24,7 +24,6 @@ import org.apache.hudi.avro.model.HoodieIndexPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieConfig;
@@ -81,8 +80,9 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.index.record.HoodieRecordIndex;
 import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil.DirectoryInfo;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
@@ -450,6 +450,13 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
       }
     }
 
+    // For a fresh table, defer RLI initialization
+    if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() 
&& this.enabledPartitionTypes.contains(RECORD_INDEX)
+        && 
dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 
0) {
+      this.enabledPartitionTypes.remove(RECORD_INDEX);
+      partitionsToInit.remove(RECORD_INDEX);
+    }
+
     Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList 
= getLazyLatestMergedPartitionFileSliceList();
     for (MetadataPartitionType partitionType : partitionsToInit) {
       // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 7b5b27c1a6a5..5fa3201d2ffe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -371,6 +371,14 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .withDocumentation("The current number of records are multiplied by this 
number when estimating the number of "
           + "file groups to create automatically. This helps account for 
growth in the number of records in the dataset.");
 
+  public static final ConfigProperty<Boolean> DEFER_RLI_INIT_FOR_FRESH_TABLE = 
ConfigProperty
+      .key(METADATA_PREFIX + ".record.level.index.defer.init")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.2.0")
+      .withDocumentation("When enabled, defers RLI initialization to 2nd 
commit for a fresh table. This should help with determining the file group "
+          + "count dynamically for Record Index (global and non-global RLI)");
+
   public static final ConfigProperty<Integer> RECORD_INDEX_MAX_PARALLELISM = 
ConfigProperty
       .key(METADATA_PREFIX + ".max.init.parallelism")
       .defaultValue(100000)
@@ -690,6 +698,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return isEnabled() && getBooleanOrDefault(RECORD_LEVEL_INDEX_ENABLE_PROP);
   }
 
+  public boolean shouldDeferRliInitForFreshTable() {
+    return getBooleanOrDefault(DEFER_RLI_INIT_FOR_FRESH_TABLE);
+  }
+
   public List<String> getColumnsEnabledForColumnStatsIndex() {
     return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS), 
CONFIG_VALUES_DELIMITER);
   }
@@ -1114,6 +1126,11 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withDeferRliInitializationForFreshTable(boolean 
deferRliInitializationForFreshTable) {
+      metadataConfig.setValue(DEFER_RLI_INIT_FOR_FRESH_TABLE, 
String.valueOf(deferRliInitializationForFreshTable));
+      return this;
+    }
+
     public Builder withEnableGlobalRecordLevelIndex(boolean enabled) {
       metadataConfig.setValue(GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP, 
String.valueOf(enabled));
       return this;
@@ -1130,6 +1147,12 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withPartitionedRecordIndexFileGroupCount(int minCount, int 
maxCount) {
+      metadataConfig.setValue(RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP, 
String.valueOf(minCount));
+      metadataConfig.setValue(RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP, 
String.valueOf(maxCount));
+      return this;
+    }
+
     public Builder withRecordIndexGrowthFactor(float factor) {
       metadataConfig.setValue(RECORD_INDEX_GROWTH_FACTOR_PROP, 
String.valueOf(factor));
       return this;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index a9e98f8898ac..b3beca5fa18c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -2031,6 +2031,138 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
   }
 
+  /**
+   * Test that partitioned RLI initialization is deferred for fresh tables.
+   * Partitioned RLI should NOT be initialized on the first commit but should 
be initialized
+   * on the second commit with programmatically determined file group count.
+   */
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void 
testPartitionedRecordIndexDeferredInitializationForFreshTable(HoodieTableType 
tableType) throws Exception {
+    init(tableType);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Config with partitioned record index enabled (not global)
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .withIndexType(HoodieIndex.IndexType.RECORD_LEVEL_INDEX)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordLevelIndex(true)
+            .withPartitionedRecordIndexFileGroupCount(1, 10)
+            .withDeferRliInitializationForFreshTable(true)
+            .build())
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // First commit - Partitioned RLI should NOT be initialized yet for a 
fresh table
+      String firstCommitTime = client.startCommit();
+      List<HoodieRecord> records = dataGen.generateInserts(firstCommitTime, 
1000);
+      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
2), firstCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
+
+      // Verify metadata table exists
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      assertTrue(metaClient.getTableConfig().isMetadataTableAvailable());
+
+      // Verify partitioned RLI partition is NOT initialized after first commit
+      
assertFalse(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+          "Partitioned RLI should NOT be initialized on first commit for a 
fresh table");
+
+      // Files partition should be initialized
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES),
+          "Files partition should be initialized");
+
+      // Second commit - Partitioned RLI should NOW be initialized
+      String secondCommitTime = client.startCommit();
+      List<HoodieRecord> moreRecords = 
dataGen.generateInserts(secondCommitTime, 500);
+      writeStatuses = client.insert(jsc.parallelize(moreRecords, 2), 
secondCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
+
+      // Reload and verify partitioned RLI is now initialized
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+          "Partitioned RLI should be initialized after second commit");
+
+      // Verify file group count is 3 (1 for each data table partition and we 
have 3 partitions)
+      HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata) 
metadata(client, storage);
+      int fileGroupCount = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(
+          metadataReader.getMetadataMetaClient(), Option.empty(),
+          RECORD_INDEX.getPartitionPath()).size();
+
+      // For partitioned RLI with small data, file group count should be 3 (1 
as default for 3 partitions)
+      assertEquals(3, fileGroupCount,
+          "File group count should be 3 for partitioned RLI table (1 per 
partition x 3 partitions), but got: " + fileGroupCount);
+
+      // Validate metadata integrity
+      validateMetadata(client);
+    }
+  }
+
+  /**
+   * Test that global RLI with larger data results in appropriate file group 
count.
+   * This validates that the file group count is determined programmatically 
based on data size,
+   * not using a hardcoded default.
+   */
+  @Test
+  public void testGlobalRecordIndexDeferredInitialization() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Config with global record index enabled
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .withIndexType(HoodieIndex.IndexType.RECORD_INDEX)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableGlobalRecordLevelIndex(true)
+            .withDeferRliInitializationForFreshTable(true)
+            .build())
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // First commit with moderate data size (4000 records)
+      String firstCommitTime = client.startCommit();
+      List<HoodieRecord> records = dataGen.generateInserts(firstCommitTime, 
4000);
+      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
5), firstCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
+
+      // Verify RLI is NOT initialized after first commit
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      
assertFalse(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+          "Global RLI should NOT be initialized on first commit");
+
+      // Second commit to trigger RLI initialization
+      String secondCommitTime = client.startCommit();
+      List<HoodieRecord> moreRecords = 
dataGen.generateInserts(secondCommitTime, 2000);
+      writeStatuses = client.insert(jsc.parallelize(moreRecords, 3), 
secondCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
+
+      // Verify global RLI is now initialized
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX));
+
+      // Verify file group count is determined based on data size
+      HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata) 
metadata(client, storage);
+      int fileGroupCount = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(
+          metadataReader.getMetadataMetaClient(), Option.empty(),
+          RECORD_INDEX.getPartitionPath()).size();
+
+      // For 4000 records with global RLI, file group count should be within 
configured bounds
+      int maxFileGroupCount = 
writeConfig.getMetadataConfig().getGlobalRecordLevelIndexMaxFileGroupCount();
+      assertTrue(fileGroupCount > 1 && fileGroupCount <= maxFileGroupCount,
+          "File group count should be between 1 and " + maxFileGroupCount + ", 
but got: " + fileGroupCount);
+      // Validate metadata integrity
+      validateMetadata(client);
+    }
+  }
+
   // Some operations are not feasible with test table infra. hence using write 
client to test those cases.
 
   /**

Reply via email to