This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 34e0a546198 [HUDI-9173] Fix issue with inflight compaction and global 
index lookup (#12976)
34e0a546198 is described below

commit 34e0a5461988fcf672ed0aee8dc8096271329033
Author: Tim Brown <[email protected]>
AuthorDate: Sat Mar 15 04:33:14 2025 -0500

    [HUDI-9173] Fix issue with inflight compaction and global index lookup 
(#12976)
---
 .../org/apache/hudi/io/HoodieMergedReadHandle.java |  2 +-
 .../TestHoodieSparkMergeOnReadTableCompaction.java | 48 ++++++++++++++--------
 2 files changed, 32 insertions(+), 18 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index ffcd6eedb2e..8195e248e13 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -112,7 +112,7 @@ public class HoodieMergedReadHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K
         && 
hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant().isPresent())
 {
       return Option.fromJavaOptional(hoodieTable
           .getHoodieView()
-          .getLatestFileSlices(partitionPathFileIDPair.getLeft())
+          
.getLatestMergedFileSlicesBeforeOrOn(partitionPathFileIDPair.getLeft(), 
instantTime)
           .filter(fileSlice -> 
fileSlice.getFileId().equals(partitionPathFileIDPair.getRight()))
           .findFirst());
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index 73b9d7e8be2..9c8c29103bf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -62,6 +62,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 
 @Tag("functional")
 public class TestHoodieSparkMergeOnReadTableCompaction extends 
SparkClientFunctionalTestHarness {
@@ -78,8 +79,12 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
   }
 
   private static Stream<Arguments> writePayloadTest() {
-    // Payload class
-    return Stream.of(new Object[] {DefaultHoodieRecordPayload.class.getName(), 
PartialUpdateAvroPayload.class.getName()}).map(Arguments::of);
+    // Payload class and index combinations
+    return Stream.of(
+        Arguments.of(DefaultHoodieRecordPayload.class.getName(), 
HoodieIndex.IndexType.BUCKET),
+        Arguments.of(DefaultHoodieRecordPayload.class.getName(), 
HoodieIndex.IndexType.GLOBAL_SIMPLE),
+        Arguments.of(PartialUpdateAvroPayload.class.getName(), 
HoodieIndex.IndexType.BUCKET),
+        Arguments.of(PartialUpdateAvroPayload.class.getName(), 
HoodieIndex.IndexType.GLOBAL_SIMPLE));
   }
 
   private HoodieTestDataGenerator dataGen;
@@ -100,8 +105,11 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
 
   @ParameterizedTest
   @MethodSource("writePayloadTest")
-  public void testWriteDuringCompaction(String payloadClass) throws 
IOException {
+  public void testWriteDuringCompaction(String payloadClass, 
HoodieIndex.IndexType indexType) throws IOException {
     Properties props = getPropertiesForKeyGen(true);
+    HoodieLayoutConfig layoutConfig = indexType == HoodieIndex.IndexType.BUCKET
+        ? 
HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()
+        : HoodieLayoutConfig.newBuilder().build();
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
         .forTable("test-trip-table")
         .withPath(basePath())
@@ -110,13 +118,13 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
         .withAutoCommit(false)
         .withWritePayLoad(payloadClass)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+            .withMaxNumDeltaCommitsBeforeCompaction(1)
+            .compactionSmallFileSize(0)
+            .build())
         .withStorageConfig(HoodieStorageConfig.newBuilder()
             .parquetMaxFileSize(1024).build())
-        .withLayoutConfig(HoodieLayoutConfig.newBuilder()
-            .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
-            
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
-        
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+        .withLayoutConfig(layoutConfig)
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(indexType).withBucketNum("1").build())
         .build();
     props.putAll(config.getProps());
 
@@ -124,18 +132,24 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
     client = getHoodieWriteClient(config);
 
     // write data and commit
-    writeData(client.createNewInstantTime(), 100, true);
-    // write data again, and in the case of bucket index, all records will go 
into log files (we use a small max_file_size)
-    writeData(client.createNewInstantTime(), 100, true);
+    String instant1 = client.createNewInstantTime();
+    writeData(instant1, dataGen.generateInserts(instant1, 100), true);
+    // write mix of new data and updates, and in the case of bucket index, all 
records will go into log files (we use a small max_file_size)
+    String instant2 = client.createNewInstantTime();
+    List<HoodieRecord> updates1 = dataGen.generateUpdates(instant2, 100);
+    List<HoodieRecord> newRecords1 = dataGen.generateInserts(instant2, 100);
+    writeData(instant2, Stream.concat(newRecords1.stream(), 
updates1.stream()).collect(Collectors.toList()), true);
     Assertions.assertEquals(200, readTableTotalRecordsNum());
     // schedule compaction
     String compactionTime = (String) 
client.scheduleCompaction(Option.empty()).get();
     // write data, and do not commit. those records should not visible to 
reader
-    String insertTime = client.createNewInstantTime();
-    List<WriteStatus> writeStatuses = writeData(insertTime, 100, false);
+    String instant3 = client.createNewInstantTime();
+    List<HoodieRecord> updates2 = dataGen.generateUpdates(instant3, 200);
+    List<HoodieRecord> newRecords2 = dataGen.generateInserts(instant3, 100);
+    List<WriteStatus> writeStatuses = writeData(instant3, 
Stream.concat(newRecords2.stream(), 
updates2.stream()).collect(Collectors.toList()), false);
     Assertions.assertEquals(200, readTableTotalRecordsNum());
     // commit the write. The records should be visible now even though the 
compaction does not complete.
-    client.commitStats(insertTime, 
writeStatuses.stream().map(WriteStatus::getStat)
+    client.commitStats(instant3, 
writeStatuses.stream().map(WriteStatus::getStat)
         .collect(Collectors.toList()), Option.empty(), 
metaClient.getCommitActionType());
     Assertions.assertEquals(300, readTableTotalRecordsNum());
     // after the compaction, total records should remain the same
@@ -198,13 +212,13 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
         Arrays.stream(dataGen.getPartitionPaths()).map(p -> 
Paths.get(basePath(), p).toString()).collect(Collectors.toList()), 
basePath()).size();
   }
 
-  private List<WriteStatus> writeData(String instant, int numRecords, boolean 
doCommit) {
+  private List<WriteStatus> writeData(String instant, List<HoodieRecord> 
hoodieRecords, boolean doCommit) {
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    JavaRDD records = jsc().parallelize(dataGen.generateInserts(instant, 
numRecords), 2);
+    JavaRDD<HoodieRecord> records = jsc().parallelize(hoodieRecords, 2);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     client.startCommitWithTime(instant);
     List<WriteStatus> writeStatuses = client.upsert(records, 
instant).collect();
-    org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
+    assertNoWriteErrors(writeStatuses);
     if (doCommit) {
       List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
       boolean committed = client.commitStats(instant, writeStats, 
Option.empty(), metaClient.getCommitActionType());

Reply via email to