This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 34e0a546198 [HUDI-9173] Fix issue with inflight compaction and global
index lookup (#12976)
34e0a546198 is described below
commit 34e0a5461988fcf672ed0aee8dc8096271329033
Author: Tim Brown <[email protected]>
AuthorDate: Sat Mar 15 04:33:14 2025 -0500
[HUDI-9173] Fix issue with inflight compaction and global index lookup
(#12976)
---
.../org/apache/hudi/io/HoodieMergedReadHandle.java | 2 +-
.../TestHoodieSparkMergeOnReadTableCompaction.java | 48 ++++++++++++++--------
2 files changed, 32 insertions(+), 18 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index ffcd6eedb2e..8195e248e13 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -112,7 +112,7 @@ public class HoodieMergedReadHandle<T, I, K, O> extends
HoodieReadHandle<T, I, K
&&
hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant().isPresent())
{
return Option.fromJavaOptional(hoodieTable
.getHoodieView()
- .getLatestFileSlices(partitionPathFileIDPair.getLeft())
+
.getLatestMergedFileSlicesBeforeOrOn(partitionPathFileIDPair.getLeft(),
instantTime)
.filter(fileSlice ->
fileSlice.getFileId().equals(partitionPathFileIDPair.getRight()))
.findFirst());
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index 73b9d7e8be2..9c8c29103bf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -62,6 +62,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
@Tag("functional")
public class TestHoodieSparkMergeOnReadTableCompaction extends
SparkClientFunctionalTestHarness {
@@ -78,8 +79,12 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
}
private static Stream<Arguments> writePayloadTest() {
- // Payload class
- return Stream.of(new Object[] {DefaultHoodieRecordPayload.class.getName(),
PartialUpdateAvroPayload.class.getName()}).map(Arguments::of);
+ // Payload class and index combinations
+ return Stream.of(
+ Arguments.of(DefaultHoodieRecordPayload.class.getName(),
HoodieIndex.IndexType.BUCKET),
+ Arguments.of(DefaultHoodieRecordPayload.class.getName(),
HoodieIndex.IndexType.GLOBAL_SIMPLE),
+ Arguments.of(PartialUpdateAvroPayload.class.getName(),
HoodieIndex.IndexType.BUCKET),
+ Arguments.of(PartialUpdateAvroPayload.class.getName(),
HoodieIndex.IndexType.GLOBAL_SIMPLE));
}
private HoodieTestDataGenerator dataGen;
@@ -100,8 +105,11 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
@ParameterizedTest
@MethodSource("writePayloadTest")
- public void testWriteDuringCompaction(String payloadClass) throws
IOException {
+ public void testWriteDuringCompaction(String payloadClass,
HoodieIndex.IndexType indexType) throws IOException {
Properties props = getPropertiesForKeyGen(true);
+ HoodieLayoutConfig layoutConfig = indexType == HoodieIndex.IndexType.BUCKET
+ ?
HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()
+ : HoodieLayoutConfig.newBuilder().build();
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
@@ -110,13 +118,13 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
.withAutoCommit(false)
.withWritePayLoad(payloadClass)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withMaxNumDeltaCommitsBeforeCompaction(1)
+ .compactionSmallFileSize(0)
+ .build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(1024).build())
- .withLayoutConfig(HoodieLayoutConfig.newBuilder()
- .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
-
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
-
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+ .withLayoutConfig(layoutConfig)
+
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(indexType).withBucketNum("1").build())
.build();
props.putAll(config.getProps());
@@ -124,18 +132,24 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
client = getHoodieWriteClient(config);
// write data and commit
- writeData(client.createNewInstantTime(), 100, true);
- // write data again, and in the case of bucket index, all records will go
into log files (we use a small max_file_size)
- writeData(client.createNewInstantTime(), 100, true);
+ String instant1 = client.createNewInstantTime();
+ writeData(instant1, dataGen.generateInserts(instant1, 100), true);
+ // write mix of new data and updates, and in the case of bucket index, all
records will go into log files (we use a small max_file_size)
+ String instant2 = client.createNewInstantTime();
+ List<HoodieRecord> updates1 = dataGen.generateUpdates(instant2, 100);
+ List<HoodieRecord> newRecords1 = dataGen.generateInserts(instant2, 100);
+ writeData(instant2, Stream.concat(newRecords1.stream(),
updates1.stream()).collect(Collectors.toList()), true);
Assertions.assertEquals(200, readTableTotalRecordsNum());
// schedule compaction
String compactionTime = (String)
client.scheduleCompaction(Option.empty()).get();
// write data, and do not commit. those records should not visible to
reader
- String insertTime = client.createNewInstantTime();
- List<WriteStatus> writeStatuses = writeData(insertTime, 100, false);
+ String instant3 = client.createNewInstantTime();
+ List<HoodieRecord> updates2 = dataGen.generateUpdates(instant3, 200);
+ List<HoodieRecord> newRecords2 = dataGen.generateInserts(instant3, 100);
+ List<WriteStatus> writeStatuses = writeData(instant3,
Stream.concat(newRecords2.stream(),
updates2.stream()).collect(Collectors.toList()), false);
Assertions.assertEquals(200, readTableTotalRecordsNum());
// commit the write. The records should be visible now even though the
compaction does not complete.
- client.commitStats(insertTime,
writeStatuses.stream().map(WriteStatus::getStat)
+ client.commitStats(instant3,
writeStatuses.stream().map(WriteStatus::getStat)
.collect(Collectors.toList()), Option.empty(),
metaClient.getCommitActionType());
Assertions.assertEquals(300, readTableTotalRecordsNum());
// after the compaction, total records should remain the same
@@ -198,13 +212,13 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
Arrays.stream(dataGen.getPartitionPaths()).map(p ->
Paths.get(basePath(), p).toString()).collect(Collectors.toList()),
basePath()).size();
}
- private List<WriteStatus> writeData(String instant, int numRecords, boolean
doCommit) {
+ private List<WriteStatus> writeData(String instant, List<HoodieRecord>
hoodieRecords, boolean doCommit) {
metaClient = HoodieTableMetaClient.reload(metaClient);
- JavaRDD records = jsc().parallelize(dataGen.generateInserts(instant,
numRecords), 2);
+ JavaRDD<HoodieRecord> records = jsc().parallelize(hoodieRecords, 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
client.startCommitWithTime(instant);
List<WriteStatus> writeStatuses = client.upsert(records,
instant).collect();
- org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
+ assertNoWriteErrors(writeStatuses);
if (doCommit) {
List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
boolean committed = client.commitStats(instant, writeStats,
Option.empty(), metaClient.getCommitActionType());