This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 685f4acbfbade9842589df2b4801ec0db21f71ae Author: Y Ethan Guo <[email protected]> AuthorDate: Tue May 19 16:31:43 2026 -0700 fix: Skip pre-compaction rollback metadata reads in getValidInstantTimestamps (#18544) --- .../hudi/metadata/HoodieTableMetadataUtil.java | 18 +++- .../hudi/metadata/TestHoodieTableMetadataUtil.java | 108 +++++++++++++++++++++ 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 2a272a3f2cc1..59e2d6dcd83f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -2100,9 +2100,25 @@ public class HoodieTableMetadataUtil { // For any rollbacks and restores, we cannot neglect the instants that they are rolling back. // The rollback instant should be more recent than the start of the timeline for it to have rolled back any // instant which we have a log block for. + // + // Only read rollback metadata for rollbacks newer than the latest MDT compaction. + // After compaction, rolled-back log blocks are already merged into base files, so pre-compaction + // rollback timestamps are no longer needed for log block filtering. This avoids sequential storage + // reads for old rollback instants that can cause long latency during metadata table reading. final String earliestInstantTime = validInstantTimestamps.isEmpty() ? SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps); + final String latestMdtCompactionTime = metadataMetaClient.getActiveTimeline() + .getCommitTimeline() + .filterCompletedInstants() + .lastInstant() + .map(HoodieInstant::requestedTime) + .orElse(earliestInstantTime); + // Only read rollback metadata for rollbacks newer than the later of: + // (a) the earliest completed instant, and + // (b) the latest MDT compaction instant + final String rollbackFilterThreshold = compareTimestamps(latestMdtCompactionTime, + GREATER_THAN, earliestInstantTime) ? latestMdtCompactionTime : earliestInstantTime; datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream() - .filter(instant -> compareTimestamps(instant.requestedTime(), GREATER_THAN, earliestInstantTime)) + .filter(instant -> compareTimestamps(instant.requestedTime(), GREATER_THAN, rollbackFilterThreshold)) .forEach(instant -> validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline, dataMetaClient.getInstantGenerator()))); // add restore and rollback instants from MDT. diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java index 74589886a35a..31d2d14e621d 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java @@ -19,6 +19,9 @@ package org.apache.hudi.metadata; +import org.apache.hudi.avro.model.HoodieInstantInfo; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieLocalEngineContext; @@ -28,6 +31,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; @@ -36,10 +40,13 @@ import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.FileCreateUtilsLegacy; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -61,8 +68,10 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; @@ -869,4 +878,103 @@ public class TestHoodieTableMetadataUtil extends HoodieCommonTestHarness { ) ); } + + /** + * Tests getValidInstantTimestamps rollback handling: + * - Without MDT compaction, all rollback metadata is read (rolled-back commits appear in valid timestamps). + * - With MDT compaction, only post-compaction rollback metadata is read (pre-compaction rollbacks are skipped + * because those log blocks are already merged into base files). + */ + @Test + void testGetValidInstantTimestampsSkipsPreCompactionRollbacks() throws Exception { + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + String commit1 = "20260101010101000"; + String commit1CompletionTime = "20260101010101001"; + String commit2 = "20260201010101000"; + String commit2CompletionTime = "20260201010101001"; + String commit3 = "20260301010101000"; + String commit3CompletionTime = "20260301010101001"; + String commit4 = "20260501010101000"; + String commit4CompletionTime = "20260501010101001"; + String commit5 = "20260601010101000"; + String commit5CompletionTime = "20260601010101001"; + testTable.addCommit(commit1, Option.of(commit1CompletionTime), Option.empty()); + testTable.addCommit(commit2, Option.of(commit2CompletionTime), Option.empty()); + testTable.addCommit(commit3, Option.of(commit3CompletionTime), Option.empty()); + testTable.addCommit(commit4, Option.of(commit4CompletionTime), Option.empty()); + testTable.addCommit(commit5, Option.of(commit5CompletionTime), Option.empty()); + + // Rollbacks before MDT compaction time + addCompletedRollback(testTable, "20260202010101000", commit2); + addCompletedRollback(testTable, "20260302010101000", commit3); + // Rollback after MDT compaction time + addCompletedRollback(testTable, "20260502010101000", commit4); + + // Delete rolled-back commit instants from the timeline to simulate real rollback behavior. + // In a real system, the commit instant file is removed when a rollback completes, so the + // only way these timestamps appear in validInstantTimestamps is via rollback metadata reading. + metaClient = HoodieTableMetaClient.reload(metaClient); + for (Pair<String, String> rolledBack : Arrays.asList( + Pair.of(commit2, commit2CompletionTime), + Pair.of(commit3, commit3CompletionTime), + Pair.of(commit4, commit4CompletionTime))) { + HoodieInstant completedCommit = metaClient.getInstantGenerator() + .createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, + rolledBack.getKey(), rolledBack.getValue()); + metaClient.getActiveTimeline().deleteInstantFileIfExists(completedCommit); + } + + // Create MDT metaClient with NO compaction initially (only delta commits) + HoodieTableMetaClient mdtMetaClient = createMdtMetaClient(); + HoodieTestTable mdtTestTable = HoodieTestTable.of(mdtMetaClient); + mdtTestTable.addDeltaCommit("20260101020101000"); + + metaClient = HoodieTableMetaClient.reload(metaClient); + mdtMetaClient = HoodieTableMetaClient.reload(mdtMetaClient); + + // Without MDT compaction, all rollback metadata is read — rolled-back commits appear + Set<String> validTimestamps = HoodieTableMetadataUtil.getValidInstantTimestamps(metaClient, mdtMetaClient); + assertTrue(validTimestamps.contains(commit1), "commit1 should be in valid timestamps"); + assertTrue(validTimestamps.contains(commit2), "commit2 should be in valid timestamps (from rollback metadata read)"); + assertTrue(validTimestamps.contains(commit3), "commit3 should be in valid timestamps (from rollback metadata read)"); + assertTrue(validTimestamps.contains(commit4), "commit4 should be in valid timestamps (from rollback metadata read)"); + assertTrue(validTimestamps.contains(commit5), "commit5 should be in valid timestamps"); + + // Now add a compaction commit to MDT at a time between rollback2 and rollback3 + mdtTestTable.addCommit("20260401010101000"); + mdtTestTable.addDeltaCommit("20260501020101000"); + + metaClient = HoodieTableMetaClient.reload(metaClient); + mdtMetaClient = HoodieTableMetaClient.reload(mdtMetaClient); + + // With MDT compaction, only post-compaction rollback (commit4) metadata is read; + // pre-compaction rollbacks for commit2 and commit3 are skipped + validTimestamps = HoodieTableMetadataUtil.getValidInstantTimestamps(metaClient, mdtMetaClient); + assertTrue(validTimestamps.contains(commit1), "commit1 should be in valid timestamps"); + assertTrue(validTimestamps.contains(commit5), "commit5 should be in valid timestamps"); + assertTrue(validTimestamps.contains(commit4), "commit4 should be in valid timestamps (from post-compaction rollback)"); + assertFalse(validTimestamps.contains(commit2), "commit2 should NOT be in valid timestamps (pre-compaction rollback skipped)"); + assertFalse(validTimestamps.contains(commit3), "commit3 should NOT be in valid timestamps (pre-compaction rollback skipped)"); + } + + private void addCompletedRollback(HoodieTestTable testTable, String rollbackTime, String rolledBackCommit) throws Exception { + Map<String, List<String>> emptyPartitionFiles = new HashMap<>(); + emptyPartitionFiles.put("partition1", Collections.emptyList()); + HoodieRollbackMetadata rollbackMeta = testTable.getRollbackMetadata(rolledBackCommit, emptyPartitionFiles, false); + HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(); + rollbackPlan.setInstantToRollback(new HoodieInstantInfo(rolledBackCommit, HoodieTimeline.COMMIT_ACTION)); + rollbackPlan.setRollbackRequests(Collections.emptyList()); + testTable.addRollback(rollbackTime, rollbackMeta, rollbackPlan); + testTable.addRollbackCompleted(rollbackTime, rollbackMeta, false); + } + + private HoodieTableMetaClient createMdtMetaClient() throws IOException { + String mdtBasePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath().toString()); + HoodieTestUtils.init(mdtBasePath, HoodieTableType.MERGE_ON_READ); + return HoodieTableMetaClient.builder() + .setConf(metaClient.getStorageConf()) + .setBasePath(mdtBasePath) + .build(); + } }
