This is an automated email from the ASF dual-hosted git repository.

nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e4ee7e3023ef test(metadata): Add test coverage for deferred RLI init 
and bulk_insert (#18865)
e4ee7e3023ef is described below

commit e4ee7e3023efea1268b0b41510c82430fb6f2fcc
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jun 25 20:03:03 2026 -0700

    test(metadata): Add test coverage for deferred RLI init and bulk_insert 
(#18865)
    
    Follow-up to #18353 (defer RLI init for fresh tables) and #18836 (robust
    schema resolution during RLI bootstrap). Adds test coverage for the
    deferred RLI init flow:
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../hudi/functional/TestRecordLevelIndex.scala     | 116 ++++++++++++++++++++-
 1 file changed, 114 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index e7015b5ab81e..336886b44ba8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -146,7 +146,8 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
       "Metadata files partition count should be lower than data table file 
count after rebootstrap")
   }
 
-  def testRecordLevelIndex(tableType: HoodieTableType, streamingWriteEnabled: 
Boolean, holder: testRecordLevelIndexHolder): Unit = {
+  def testRecordLevelIndex(tableType: HoodieTableType, streamingWriteEnabled: 
Boolean, holder: testRecordLevelIndexHolder,
+                           rliInitDeferred: Boolean = false): Unit = {
     val dataGen = new HoodieTestDataGenerator();
     val inserts = dataGen.generateInserts("001", 5)
     val latestBatchDf = toDataset(spark, inserts)
@@ -156,9 +157,10 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase with SparkDatasetMix
       RECORDKEY_FIELD.key -> "_row_key",
       PARTITIONPATH_FIELD.key -> "data_partition_path",
       HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
-      HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key()-> 
"false",
+      HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> 
"false",
       HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
       HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() -> 
streamingWriteEnabled.toString,
+      HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key() -> 
rliInitDeferred.toString,
       HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
       HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
     holder.options = options
@@ -167,11 +169,19 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase with SparkDatasetMix
       .mode(SaveMode.Overwrite)
       .save(basePath)
     assertEquals(10, spark.read.format("hudi").load(basePath).count())
+    if (rliInitDeferred) {
+      // With defer enabled, the first commit should NOT have initialized the 
RLI partition.
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+      
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+        "RLI partition should not be initialized after the first commit when 
defer is enabled")
+    }
     val props = 
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(options).asJava)
     val writeConfig = HoodieWriteConfig.newBuilder()
       .withProps(props)
       .withPath(basePath)
       .build()
+    // Constructing the metadata writer here will initialize RLI (lazily, on 
this second metadata-writer entry)
+    // when defer is enabled, since there is now 1 completed commit on the 
data table.
     var metadata = metadataWriter(writeConfig).getTableMetadata
     val recordKeys = inserts.asScala.map(i => 
i.getRecordKey).asJava.stream().collect(Collectors.toList())
     holder.recordKeys = recordKeys
@@ -301,6 +311,108 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase with SparkDatasetMix
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testPartitionedRecordLevelIndexDefer(streamingWriteEnabled: Boolean): 
Unit = {
+    val holder = new testRecordLevelIndexHolder
+    testRecordLevelIndex(HoodieTableType.MERGE_ON_READ, streamingWriteEnabled, 
holder, true)
+    assertEquals("deltacommit", 
metaClient.getActiveTimeline.lastInstant().get().getAction)
+    val writeConfig = getWriteConfig(holder.options)
+    var metadata = metadataWriter(writeConfig).getTableMetadata
+    doAllAssertions(holder, metadata)
+    val writeClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(jsc), writeConfig)
+    val timeOpt = writeClient.scheduleCompaction(HOption.empty())
+    assertTrue(timeOpt.isPresent)
+    writeClient.compact(timeOpt.get())
+    metaClient.reloadActiveTimeline()
+    assertEquals("compaction", 
metaClient.getActiveTimeline.lastInstant().get().getAction)
+    metadata = metadataWriter(writeConfig).getTableMetadata
+    doAllAssertions(holder, metadata)
+    writeClient.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testPartitionedRecordLevelIndexDeferWithBulkInsert(streamingWriteEnabled: 
Boolean): Unit = {
+    val tableType = HoodieTableType.MERGE_ON_READ
+    val dataGen = new HoodieTestDataGenerator()
+    val inserts1 = dataGen.generateInserts("001", 5)
+    val batch1Df = toDataset(spark, inserts1)
+    val insertDf1 = batch1Df.withColumn("data_partition_path", 
lit("partition1"))
+      .union(batch1Df.withColumn("data_partition_path", lit("partition2")))
+
+    val options = Map(
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      RECORDKEY_FIELD.key -> "_row_key",
+      PARTITIONPATH_FIELD.key -> "data_partition_path",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+      HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> 
"false",
+      HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+      HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() -> 
streamingWriteEnabled.toString,
+      HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key() -> "true",
+      HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+      HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
+
+    // Commit #1: bulk_insert on a fresh table with defer enabled.
+    insertDf1.write.format("hudi")
+      .options(options)
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    assertEquals(10, spark.read.format("hudi").load(basePath).count())
+
+    // Defer should have kicked in: RLI partition is not initialized after the 
first bulk_insert.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+      "RLI partition should not be initialized after the first bulk_insert 
when defer is enabled")
+
+    // Commit #2: another bulk_insert into a new partition. New rows must use 
distinct record keys.
+    val inserts2 = dataGen.generateInserts("002", 5)
+    val batch2Df = toDataset(spark, inserts2)
+    val insertDf2 = batch2Df.withColumn("data_partition_path", 
lit("partition3"))
+
+    insertDf2.write.format("hudi")
+      .options(options)
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    assertEquals(15, spark.read.format("hudi").load(basePath).count())
+
+    // Build metadata writer/reader; this entry will initialize RLI now that 
there is a completed commit.
+    val writeConfig = getWriteConfig(options)
+    val metadata = metadataWriter(writeConfig).getTableMetadata
+
+    // RLI partition should now be present in the metadata table.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+      "RLI partition should be initialized once a completed commit exists on 
the data table")
+    assertTrue(HoodieRecordIndex.isPartitioned(
+      
metaClient.getIndexMetadata.get().getIndexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)),
+      "RLI should be initialized as partitioned RLI")
+
+    // Validate record key -> location mapping for both batches against the 
data.
+    val tableRows = spark.read.format("hudi").load(basePath).collect()
+
+    val batch1Keys = 
inserts1.asScala.map(_.getRecordKey).asJava.stream().collect(Collectors.toList())
+    val partition1Locations = readRecordIndex(metadata, batch1Keys, 
HOption.of("partition1"))
+    assertEquals(5, partition1Locations.size)
+    validateDFWithLocations(tableRows, partition1Locations, "partition1")
+    val partition2Locations = readRecordIndex(metadata, batch1Keys, 
HOption.of("partition2"))
+    assertEquals(5, partition2Locations.size)
+    validateDFWithLocations(tableRows, partition2Locations, "partition2")
+
+    val batch2Keys = 
inserts2.asScala.map(_.getRecordKey).asJava.stream().collect(Collectors.toList())
+    val partition3Locations = readRecordIndex(metadata, batch2Keys, 
HOption.of("partition3"))
+    assertEquals(5, partition3Locations.size)
+    validateDFWithLocations(tableRows, partition3Locations, "partition3")
+
+    // Cross-partition lookups for batch1 keys against partition3 (and vice 
versa) should be empty.
+    assertEquals(0, readRecordIndex(metadata, batch1Keys, 
HOption.of("partition3")).size)
+    assertEquals(0, readRecordIndex(metadata, batch2Keys, 
HOption.of("partition1")).size)
+    assertEquals(0, readRecordIndex(metadata, batch2Keys, 
HOption.of("partition2")).size)
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testPartitionedRecordLevelIndexCompact(streamingWriteEnabled: Boolean): 
Unit = {

Reply via email to