This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0138f41bd421 perf: Improve global index performance for commit time
ordering (#17797)
0138f41bd421 is described below
commit 0138f41bd42176dde9a906291a96ac73f919dc52
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue May 19 21:51:01 2026 -0700
perf: Improve global index performance for commit time ordering (#17797)
---
.../org/apache/hudi/index/HoodieIndexUtils.java | 3 +-
.../TestGlobalIndexCommitTimeOrdering.java | 473 +++++++++++++++++++++
2 files changed, 475 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index b3086eb6a361..d9fe1068e421 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -590,7 +590,8 @@ public class HoodieIndexUtils {
boolean isCommitTimeOrdered = readerContext.getMergeMode() ==
RecordMergeMode.COMMIT_TIME_ORDERING;
// if the index is not updating the partition of the record, and the table
is COW, then we do not need to do merging at
// this phase since the writer path will merge when rewriting the files as
part of the upsert operation.
- boolean requiresMergingWithOlderRecordVersion = shouldUpdatePartitionPath
|| table.getMetaClient().getTableConfig().getTableType() ==
HoodieTableType.MERGE_ON_READ;
+ boolean requiresMergingWithOlderRecordVersion = shouldUpdatePartitionPath
+ || (!isCommitTimeOrdered &&
table.getMetaClient().getTableConfig().getTableType() ==
HoodieTableType.MERGE_ON_READ);
DeleteContext deleteContext = DeleteContext.fromRecordSchema(properties,
writerSchema);
// Pair of incoming record and the global location if meant for merged
lookup in later stage
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexCommitTimeOrdering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexCommitTimeOrdering.java
new file mode 100644
index 000000000000..2149549347d9
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexCommitTimeOrdering.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA_STR;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getDeletesWithEmptyPayloadAndNewPartition;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getInserts;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getKeyGenProps;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getUpdates;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
+import static org.apache.hudi.index.HoodieIndex.IndexType.RECORD_INDEX;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for global index with COMMIT_TIME_ORDERING merge mode.
+ * This validates that the optimization to skip merging with older record
versions
+ * works correctly for COMMIT_TIME_ORDERING on MOR tables.
+ */
+public class TestGlobalIndexCommitTimeOrdering extends
SparkClientFunctionalTestHarness {
+
+ @Override
+ public SparkConf conf() {
+ return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
+ }
+
+ private static Stream<Arguments> getTableTypeAndIndexType() {
+ return Stream.of(
+ Arguments.of(COPY_ON_WRITE, RECORD_INDEX),
+ Arguments.of(MERGE_ON_READ, RECORD_INDEX),
+ Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE),
+ Arguments.of(MERGE_ON_READ, GLOBAL_SIMPLE)
+ );
+ }
+
+ /**
+ * Tests basic upserts with COMMIT_TIME_ORDERING.
+ * The newer commit should always win regardless of the ts field value.
+ */
+ @ParameterizedTest
+ @MethodSource("getTableTypeAndIndexType")
+ public void testBasicUpsertsWithCommitTimeOrdering(HoodieTableType
tableType, IndexType indexType) throws IOException {
+ final Class<?> payloadClass = OverwriteWithLatestAvroPayload.class;
+ HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+ final int totalRecords = 4;
+ final String p1 = "p1";
+
+ // 1st batch: inserts with ts=100
+ String commitTime1 = getCommitTimeAtUTC(0);
+ List<HoodieRecord> insertsAtTs100 = getInserts(totalRecords, p1, 100,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+ List<WriteStatus> writeStatusesList =
client.upsert(jsc().parallelize(insertsAtTs100, 2), commitTime1).collect();
+ client.commit(commitTime1, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 100);
+
+ // 2nd batch: updates with LOWER ts=50 (should still be visible due to
COMMIT_TIME_ORDERING)
+ String commitTime2 = getCommitTimeAtUTC(5);
+ List<HoodieRecord> updatesAtTs50 = getUpdates(insertsAtTs100, 50,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+ writeStatusesList = client.upsert(jsc().parallelize(updatesAtTs50, 2),
commitTime2).collect();
+ client.commit(commitTime2, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ // With COMMIT_TIME_ORDERING, newer commit wins, so ts=50 records should
be visible
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 50);
+
+ // 3rd batch: updates with HIGHER ts=200 (should be visible)
+ String commitTime3 = getCommitTimeAtUTC(10);
+ List<HoodieRecord> updatesAtTs200 = getUpdates(insertsAtTs100, 200,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime3);
+ writeStatusesList = client.upsert(jsc().parallelize(updatesAtTs200, 2),
commitTime3).collect();
+ client.commit(commitTime3, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 200);
+
+ // 4th batch: updates with EQUAL ts=200 (should be visible as newer
commit wins)
+ String commitTime4 = getCommitTimeAtUTC(15);
+ List<HoodieRecord> updatesAtTs200Again = getUpdates(insertsAtTs100, 200,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime4);
+ writeStatusesList = client.upsert(jsc().parallelize(updatesAtTs200Again,
2), commitTime4).collect();
+ client.commit(commitTime4, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 200);
+ }
+ }
+
+ /**
+ * Tests partition path updates with COMMIT_TIME_ORDERING.
+ * Records should be moved to new partition regardless of ts field value.
+ */
+ @ParameterizedTest
+ @MethodSource("getTableTypeAndIndexType")
+ public void testPartitionUpdatesWithCommitTimeOrdering(HoodieTableType
tableType, IndexType indexType) throws IOException {
+ final Class<?> payloadClass = OverwriteWithLatestAvroPayload.class;
+ HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+ final int totalRecords = 4;
+ final String p1 = "p1";
+ final String p2 = "p2";
+ final String p3 = "p3";
+
+ // 1st batch: inserts to p1 with ts=100
+ String commitTime1 = getCommitTimeAtUTC(0);
+ List<HoodieRecord> insertsAtTs100 = getInserts(totalRecords, p1, 100,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+ List<WriteStatus> writeStatusesList =
client.upsert(jsc().parallelize(insertsAtTs100, 2), commitTime1).collect();
+ client.commit(commitTime1, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 100);
+
+ // 2nd batch: move all from p1 to p2 with LOWER ts=50 (should still move
due to COMMIT_TIME_ORDERING)
+ String commitTime2 = getCommitTimeAtUTC(5);
+ List<HoodieRecord> updatesToP2AtTs50 = getUpdates(insertsAtTs100, p2,
50, payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+ writeStatusesList = client.upsert(jsc().parallelize(updatesToP2AtTs50,
2), commitTime2).collect();
+ client.commit(commitTime2, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ // Records should be in p2 with ts=50
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 50);
+
+ // 3rd batch: move all from p2 to p3 with ts=150
+ String commitTime3 = getCommitTimeAtUTC(10);
+ List<HoodieRecord> updatesToP3AtTs150 = getUpdates(updatesToP2AtTs50,
p3, 150, payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime3);
+ writeStatusesList = client.upsert(jsc().parallelize(updatesToP3AtTs150,
2), commitTime3).collect();
+ client.commit(commitTime3, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p3, 150);
+
+ // 4th batch: move all back to p1 with ts=25 (lower than current 150,
but should still take effect)
+ String commitTime4 = getCommitTimeAtUTC(15);
+ List<HoodieRecord> updatesToP1AtTs25 = getUpdates(updatesToP3AtTs150,
p1, 25, payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime4);
+ writeStatusesList = client.upsert(jsc().parallelize(updatesToP1AtTs25,
2), commitTime4).collect();
+ client.commit(commitTime4, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 25);
+ }
+ }
+
+ /**
+ * Tests delete operations with COMMIT_TIME_ORDERING.
+ * Deletes should always succeed regardless of ts field value.
+ */
+ @ParameterizedTest
+ @MethodSource("getTableTypeAndIndexType")
+ public void testDeletesWithCommitTimeOrdering(HoodieTableType tableType,
IndexType indexType) throws IOException {
+ final Class<?> payloadClass = OverwriteWithLatestAvroPayload.class;
+ HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+ final int totalRecords = 4;
+ final String p1 = "p1";
+
+ // 1st batch: inserts with ts=100
+ String commitTime1 = getCommitTimeAtUTC(0);
+ List<HoodieRecord> insertsAtTs100 = getInserts(totalRecords, p1, 100,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+ List<WriteStatus> writeStatusesList =
client.upsert(jsc().parallelize(insertsAtTs100, 2), commitTime1).collect();
+ client.commit(commitTime1, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 100);
+
+ // 2nd batch: delete records 0, 1 using delete API
+ String commitTime2 = getCommitTimeAtUTC(5);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+ List<WriteStatus> deleteStatuses = client.delete(
+ jsc().parallelize(insertsAtTs100.subList(0, 2).stream()
+ .map(HoodieRecord::getKey).collect(Collectors.toList()), 2),
+ commitTime2).collect();
+ client.commit(commitTime2, jsc().parallelize(deleteStatuses));
+ assertNoWriteErrors(deleteStatuses);
+ // Only records 2, 3 should remain
+ readTableAndValidate(metaClient, new int[] {2, 3}, p1, 100);
+
+ // 3rd batch: re-insert deleted records with ts=50 (lower than original,
should still insert)
+ String commitTime3 = getCommitTimeAtUTC(10);
+ List<HoodieRecord> reInsertsAtTs50 = getInserts(2, p1, 50, payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime3);
+ writeStatusesList = client.upsert(jsc().parallelize(reInsertsAtTs50, 2),
commitTime3).collect();
+ client.commit(commitTime3, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ // Records 0, 1 should be back with ts=50, records 2, 3 still have ts=100
+ Map<String, Long> expectedTsMap = new HashMap<>();
+ expectedTsMap.put("0", 50L);
+ expectedTsMap.put("1", 50L);
+ expectedTsMap.put("2", 100L);
+ expectedTsMap.put("3", 100L);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1,
expectedTsMap);
+ }
+ }
+
+ /**
+ * Tests deletes using empty payload with partition path different from
actual location.
+ * The global index should find the record and delete it.
+ */
+ @ParameterizedTest
+ @MethodSource("getTableTypeAndIndexType")
+ public void
testDeleteWithUnknownPartitionAndCommitTimeOrdering(HoodieTableType tableType,
IndexType indexType) throws IOException {
+ final Class<?> payloadClass = OverwriteWithLatestAvroPayload.class;
+ HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+ final int totalRecords = 4;
+ final String p1 = "p1";
+
+ // 1st batch: inserts with ts=100
+ String commitTime1 = getCommitTimeAtUTC(0);
+ List<HoodieRecord> insertsAtTs100 = getInserts(totalRecords, p1, 100,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+ List<WriteStatus> writeStatusesList =
client.upsert(jsc().parallelize(insertsAtTs100, 2), commitTime1).collect();
+ client.commit(commitTime1, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 100);
+
+ // 2nd batch: delete record 0 with unknown partition (global index
should find it)
+ String commitTime2 = getCommitTimeAtUTC(5);
+ List<HoodieRecord> deletesWithUnknownPartition =
getDeletesWithEmptyPayloadAndNewPartition(
+ insertsAtTs100.subList(0, 1), "unknown_partition");
+ WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+ writeStatusesList =
client.upsert(jsc().parallelize(deletesWithUnknownPartition, 1),
commitTime2).collect();
+ client.commit(commitTime2, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ // Record 0 should be deleted, records 1, 2, 3 should remain
+ readTableAndValidate(metaClient, new int[] {1, 2, 3}, p1, 100);
+ }
+ }
+
+ /**
+ * Tests mixed operations: inserts, updates with partition changes, and
deletes.
+ */
+ @ParameterizedTest
+ @MethodSource("getTableTypeAndIndexType")
+ public void testMixedOperationsWithCommitTimeOrdering(HoodieTableType
tableType, IndexType indexType) throws IOException {
+ final Class<?> payloadClass = OverwriteWithLatestAvroPayload.class;
+ HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+ final int totalRecords = 8;
+ final String p1 = "p1";
+ final String p2 = "p2";
+
+ // 1st batch: insert 8 records to p1
+ String commitTime1 = getCommitTimeAtUTC(0);
+ List<HoodieRecord> inserts = getInserts(totalRecords, p1, 100,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+ List<WriteStatus> writeStatusesList =
client.upsert(jsc().parallelize(inserts, 2), commitTime1).collect();
+ client.commit(commitTime1, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3, 4, 5, 6, 7}, p1,
100);
+
+ // 2nd batch: move records 0-3 to p2 with lower ts=50
+ String commitTime2 = getCommitTimeAtUTC(5);
+ List<HoodieRecord> updatesToP2 = getUpdates(inserts.subList(0, 4), p2,
50, payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+ writeStatusesList = client.upsert(jsc().parallelize(updatesToP2, 2),
commitTime2).collect();
+ client.commit(commitTime2, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 100);
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 50);
+
+ // 3rd batch: delete records 0, 1 from p2
+ String commitTime3 = getCommitTimeAtUTC(10);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime3);
+ List<WriteStatus> deleteStatuses = client.delete(
+ jsc().parallelize(inserts.subList(0, 2).stream()
+ .map(HoodieRecord::getKey).collect(Collectors.toList()), 2),
+ commitTime3).collect();
+ client.commit(commitTime3, jsc().parallelize(deleteStatuses));
+ assertNoWriteErrors(deleteStatuses);
+ readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 100);
+ readTableAndValidate(metaClient, new int[] {2, 3}, p2, 50);
+
+ // 4th batch: update records 4, 5 in p1 with higher ts=200
+ String commitTime4 = getCommitTimeAtUTC(15);
+ List<HoodieRecord> updatesInP1 = getUpdates(inserts.subList(4, 6), 200,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime4);
+ writeStatusesList = client.upsert(jsc().parallelize(updatesInP1, 2),
commitTime4).collect();
+ client.commit(commitTime4, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+ Map<String, Long> expectedP1TsMap = new HashMap<>();
+ expectedP1TsMap.put("4", 200L);
+ expectedP1TsMap.put("5", 200L);
+ expectedP1TsMap.put("6", 100L);
+ expectedP1TsMap.put("7", 100L);
+ readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1,
expectedP1TsMap);
+ readTableAndValidate(metaClient, new int[] {2, 3}, p2, 50);
+ }
+ }
+
+ /**
+ * Tests that records are properly handled when compaction occurs on MOR
tables.
+ */
+ @Test
+ public void testCompactionWithCommitTimeOrderingMOR() throws IOException {
+ final Class<?> payloadClass = OverwriteWithLatestAvroPayload.class;
+ HoodieWriteConfig writeConfig =
getWriteConfigWithInlineCompaction(payloadClass, RECORD_INDEX);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(MERGE_ON_READ,
writeConfig.getProps());
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+ final int totalRecords = 4;
+ final String p1 = "p1";
+
+ // 1st batch: inserts
+ String commitTime1 = getCommitTimeAtUTC(0);
+ List<HoodieRecord> inserts = getInserts(totalRecords, p1, 100,
payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+ List<WriteStatus> writeStatusesList =
client.upsert(jsc().parallelize(inserts, 2), commitTime1).collect();
+ client.commit(commitTime1, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+
+ // 2nd batch: updates with lower ts
+ String commitTime2 = getCommitTimeAtUTC(5);
+ List<HoodieRecord> updates1 = getUpdates(inserts, 50, payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+ writeStatusesList = client.upsert(jsc().parallelize(updates1, 2),
commitTime2).collect();
+ client.commit(commitTime2, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+
+ // 3rd batch: more updates - this should trigger inline compaction
(configured for 2 delta commits)
+ String commitTime3 = getCommitTimeAtUTC(10);
+ List<HoodieRecord> updates2 = getUpdates(inserts, 75, payloadClass);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime3);
+ writeStatusesList = client.upsert(jsc().parallelize(updates2, 2),
commitTime3).collect();
+ client.commit(commitTime3, jsc().parallelize(writeStatusesList));
+ assertNoWriteErrors(writeStatusesList);
+
+ // Verify the latest values are visible (ts=75 from the 3rd commit)
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 75);
+ }
+ }
+
+ private void readTableAndValidate(HoodieTableMetaClient metaClient, int[]
expectedIds, String expectedPartition, long expectedTs) {
+ Map<String, Long> expectedTsMap = new HashMap<>();
+ Arrays.stream(expectedIds).forEach(entry ->
expectedTsMap.put(String.valueOf(entry), expectedTs));
+ readTableAndValidate(metaClient, expectedIds, expectedPartition,
expectedTsMap);
+ }
+
+ private void readTableAndValidate(HoodieTableMetaClient metaClient, int[]
expectedIds, String expectedPartition, Map<String, Long> expectedTsMap) {
+ metaClient.reloadTableConfig();
+ assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING,
metaClient.getTableConfig().getRecordMergeMode(),
+ "Table must be initialized with COMMIT_TIME_ORDERING for these tests
to exercise the optimization");
+ Dataset<Row> df = spark().read().format("hudi")
+ .load(metaClient.getBasePath().toString())
+ .sort("id")
+ .select("_hoodie_record_key", "_hoodie_partition_path", "id", "pt",
"ts")
+ .cache();
+ int expectedCount = expectedIds.length;
+ Row[] allRows = (Row[]) df.filter(String.format("pt = '%s'",
expectedPartition)).collect();
+ assertEquals(expectedCount, allRows.length, "Expected " + expectedCount +
" records in partition " + expectedPartition);
+ for (int i = 0; i < expectedCount; i++) {
+ int expectedId = expectedIds[i];
+ Row r = allRows[i];
+ assertEquals(String.valueOf(expectedId), r.getString(0));
+ assertEquals(expectedPartition, r.getString(1));
+ assertEquals(expectedId, r.getInt(2));
+ assertEquals(expectedPartition, r.getString(3));
+ assertEquals(expectedTsMap.get(String.valueOf(expectedId)),
r.getLong(4));
+ }
+ df.unpersist();
+ }
+
+ private HoodieWriteConfig getWriteConfig(Class<?> payloadClass, IndexType
indexType) {
+ return getWriteConfig(payloadClass, indexType,
HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4).build());
+ }
+
+ private HoodieWriteConfig getWriteConfigWithInlineCompaction(Class<?>
payloadClass, IndexType indexType) {
+ return getWriteConfig(payloadClass, indexType,
HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build());
+ }
+
+ private HoodieWriteConfig getWriteConfig(Class<?> payloadClass, IndexType
indexType, HoodieCompactionConfig compactionConfig) {
+ HoodieMetadataConfig.Builder metadataConfigBuilder =
HoodieMetadataConfig.newBuilder();
+ if (indexType == IndexType.RECORD_INDEX) {
+
metadataConfigBuilder.enable(true).withEnableGlobalRecordLevelIndex(true);
+ } else {
+ metadataConfigBuilder.enable(false);
+ }
+ return getConfigBuilder(false)
+ .withProperties(getCommitTimeOrderingProps(payloadClass))
+ .withParallelism(2, 2)
+ .withBulkInsertParallelism(2)
+ .withDeleteParallelism(2)
+ .withMetadataConfig(metadataConfigBuilder.build())
+ .withIndexConfig(HoodieIndexConfig.newBuilder()
+ .withIndexType(indexType)
+ .bloomIndexParallelism(2)
+ .withSimpleIndexParallelism(2)
+ .withGlobalSimpleIndexParallelism(2)
+ .withGlobalIndexReconcileParallelism(2)
+ .withGlobalBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .withRecordIndexUpdatePartitionPath(true).build())
+ .withCompactionConfig(compactionConfig)
+ .withSchema(SCHEMA_STR)
+ .withRecordMergeMode(RecordMergeMode.COMMIT_TIME_ORDERING)
+ .build();
+ }
+
+ // Two things are needed to make HoodieTableMetaClient persist
COMMIT_TIME_ORDERING:
+ // (1) HoodieWriteConfig.withRecordMergeMode writes
hoodie.write.record.merge.mode, but the
+ // meta client only reads hoodie.record.merge.mode (no alt keys), so
set it explicitly.
+ // (2) HoodieWriteConfig.build() runs HoodiePayloadConfig defaulting,
which sets
+ // hoodie.compaction.payload.class -> DefaultHoodieRecordPayload. Then
+ // inferMergingConfigsForV9TableCreation sees
DefaultHoodieRecordPayload as a deprecated
+ // payload and silently re-infers the mode to EVENT_TIME_ORDERING. Pin
the payload class
+ // to OverwriteWithLatestAvroPayload (commit-time-compatible) so the
inference is stable.
+ // Also strip ORDERING_FIELDS, which is meaningless under commit-time
ordering.
+ private static Properties getCommitTimeOrderingProps(Class<?> payloadClass) {
+ Properties props = getKeyGenProps(payloadClass);
+ props.remove(HoodieTableConfig.ORDERING_FIELDS.key());
+ props.put(HoodieTableConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+ props.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
OverwriteWithLatestAvroPayload.class.getName());
+ return props;
+ }
+}
\ No newline at end of file