This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 07164406c44 [HUDI-6423] Incremental cleaning should consider inflight compaction instant (#9038) 07164406c44 is described below commit 07164406c44b4092eee810710a242d092c97bd58 Author: zhuanshenbsj1 <34104400+zhuanshenb...@users.noreply.github.com> AuthorDate: Wed Jul 5 11:05:57 2023 +0800 [HUDI-6423] Incremental cleaning should consider inflight compaction instant (#9038) * The CleanPlanner#getEarliestCommitToRetain should consider pending compaction instants. If the pending compaction got missed under incremental cleaning mode, some files may never be cleaned when the cleaner moved to a different partition: -------- par1 ---- | ----- par2 -> dc.1 compaction.2 dc.3 | dc.4 Assumes we have 3 delta commits and 1 pending compaction commit on the timeline, if the `EarliestCommitToRetain ` was recorded to dc.3, when the dc4(or subsequent instants) triggers cleaning, the cleaner just checks the timeline with dc.3, and the compaction.2 got skipped for ever if no subsequent mutations were made to partition par1. --------- Co-authored-by: Danny Chan <yuzhao....@gmail.com> --- .../action/clean/CleanPlanActionExecutor.java | 1 + .../hudi/table/action/clean/CleanPlanner.java | 2 +- .../java/org/apache/hudi/table/TestCleaner.java | 183 ++++++++++++++++----- .../table/timeline/HoodieDefaultTimeline.java | 7 + 4 files changed, 148 insertions(+), 45 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index ba7c71b1356..b494df42b49 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -111,6 +111,7 @@ public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I LOG.info("Nothing to clean here. It is already clean"); return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); } + LOG.info("Earliest commit to retain for clean : " + (earliestInstant.isPresent() ? earliestInstant.get().getTimestamp() : "null")); LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index be949fedb37..80aa7b31624 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -509,7 +509,7 @@ public class CleanPlanner<T, I, K, O> implements Serializable { */ public Option<HoodieInstant> getEarliestCommitToRetain() { return CleanerUtils.getEarliestCommitToRetain( - hoodieTable.getMetaClient().getActiveTimeline().getCommitsTimeline(), + hoodieTable.getMetaClient().getActiveTimeline().getCommitsAndCompactionTimeline(), config.getCleanerPolicy(), config.getCleanerCommitsRetained(), Instant.now(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index d1e77613691..17a12dcc7ff 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.HoodieTimelineArchiver; +import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -260,6 +261,97 @@ public class TestCleaner extends HoodieCleanerTestBase { } } + /** + * Test earliest commit to retain should be earlier than first pending compaction in incremental cleaning scenarios. + * + * @throws IOException + */ + @Test + public void testEarliestInstantToRetainForPendingCompaction() throws IOException { + HoodieWriteConfig writeConfig = getConfigBuilder().withPath(basePath) + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(1) + .build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(1) + .compactionSmallFileSize(1024 * 1024 * 1024) + .build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .withAutoArchive(false) + .archiveCommitsWith(2,3) + .build()) + .withEmbeddedTimelineServerEnabled(false).build(); + + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) { + + final String partition1 = "2023/06/01"; + final String partition2 = "2023/06/02"; + String instantTime = ""; + String earliestInstantToRetain = ""; + + for (int idx = 0; idx < 3; ++idx) { + instantTime = HoodieActiveTimeline.createNewInstantTime(); + if (idx == 2) { + earliestInstantToRetain = instantTime; + } + List<HoodieRecord> records = dataGen.generateInsertsForPartition(instantTime, 1, partition1); + client.startCommitWithTime(instantTime); + client.insert(jsc.parallelize(records, 1), instantTime).collect(); + } + + + instantTime = HoodieActiveTimeline.createNewInstantTime(); + HoodieTable table = HoodieSparkTable.create(writeConfig, context); + Option<HoodieCleanerPlan> cleanPlan = table.scheduleCleaning(context, instantTime, Option.empty()); + assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition1).size(), 1); + assertEquals(earliestInstantToRetain, cleanPlan.get().getEarliestInstantToRetain().getTimestamp(), + "clean until " + earliestInstantToRetain); + table.getMetaClient().reloadActiveTimeline(); + table.clean(context, instantTime); + + + instantTime = HoodieActiveTimeline.createNewInstantTime(); + List<HoodieRecord> records = dataGen.generateInsertsForPartition(instantTime, 1, partition1); + client.startCommitWithTime(instantTime); + JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); + client.insert(recordsRDD, instantTime).collect(); + + + instantTime = HoodieActiveTimeline.createNewInstantTime(); + earliestInstantToRetain = instantTime; + List<HoodieRecord> updatedRecords = dataGen.generateUpdates(instantTime, records); + JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); + SparkRDDReadClient readClient = new SparkRDDReadClient(context, writeConfig); + JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = readClient.tagLocation(updatedRecordsRDD); + client.startCommitWithTime(instantTime); + client.upsertPreppedRecords(updatedTaggedRecordsRDD, instantTime).collect(); + + table.getMetaClient().reloadActiveTimeline(); + // pending compaction + String compactionInstantTime = client.scheduleCompaction(Option.empty()).get().toString(); + + for (int idx = 0; idx < 3; ++idx) { + instantTime = HoodieActiveTimeline.createNewInstantTime(); + records = dataGen.generateInsertsForPartition(instantTime, 1, partition2); + client.startCommitWithTime(instantTime); + client.insert(jsc.parallelize(records, 1), instantTime).collect(); + } + + // earliest commit to retain should be earlier than first pending compaction in incremental cleaning scenarios. + instantTime = HoodieActiveTimeline.createNewInstantTime(); + cleanPlan = table.scheduleCleaning(context, instantTime, Option.empty()); + assertEquals(earliestInstantToRetain,cleanPlan.get().getEarliestInstantToRetain().getTimestamp()); + } + } + /** * Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple config is set to false. */ @@ -777,16 +869,17 @@ public class TestCleaner extends HoodieCleanerTestBase { .withCleanConfig(HoodieCleanConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(2).build()) .build(); + // Deletions: - // . FileId Base Logs Total Retained Commits - // FileId7 5 10 15 009, 011 - // FileId6 5 10 15 009 - // FileId5 3 6 9 005 - // FileId4 2 4 6 003 - // FileId3 1 2 3 001 - // FileId2 0 0 0 000 - // FileId1 0 0 0 000 - testPendingCompactions(config, 48, 18, false); + // . FileId Base Logs Total Retained_Commits Under_Compaction + // FileId7 1 2 3 001,003 false + // FileId6 1 2 3 001,003 false + // FileId5 1 2 3 001,003 true + // FileId4 1 2 3 001,003 true + // FileId3 1 2 3 001 true + // FileId2 0 0 0 000 true + // FileId1 0 0 0 000 false + testPendingCompactions(config, 15, 9, false); } /** @@ -801,15 +894,16 @@ public class TestCleaner extends HoodieCleanerTestBase { .withCleanConfig(HoodieCleanConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()) .build(); + // Deletions: - // . FileId Base Logs Total Retained Commits - // FileId7 5 10 15 009, 011 - // FileId6 4 8 12 007, 009 - // FileId5 2 4 6 003 005 - // FileId4 1 2 3 001, 003 - // FileId3 0 0 0 000, 001 - // FileId2 0 0 0 000 - // FileId1 0 0 0 000 + // . FileId Base Logs Total Retained_Commits Under_Compaction + // FileId7 5 10 15 009,013 false + // FileId6 4 8 12 007,009 false + // FileId5 2 4 6 003,005 true + // FileId4 1 2 3 001,003 true + // FileId3 0 0 0 000,001 true + // FileId2 0 0 0 000 true + // FileId1 0 0 0 000 false testPendingCompactions(config, 36, 9, retryFailure); } @@ -1005,23 +1099,24 @@ public class TestCleaner extends HoodieCleanerTestBase { HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); final String partition = "2016/03/15"; + String timePrefix = "00000000000"; Map<String, String> expFileIdToPendingCompaction = new HashMap<String, String>() { { - put("fileId2", "004"); - put("fileId3", "006"); - put("fileId4", "008"); - put("fileId5", "010"); + put("fileId2", timePrefix + "004"); + put("fileId3", timePrefix + "006"); + put("fileId4", timePrefix + "008"); + put("fileId5", timePrefix + "010"); } }; Map<String, String> fileIdToLatestInstantBeforeCompaction = new HashMap<String, String>() { { - put("fileId1", "000"); - put("fileId2", "000"); - put("fileId3", "001"); - put("fileId4", "003"); - put("fileId5", "005"); - put("fileId6", "009"); - put("fileId7", "011"); + put("fileId1", timePrefix + "000"); + put("fileId2", timePrefix + "000"); + put("fileId3", timePrefix + "001"); + put("fileId4", timePrefix + "003"); + put("fileId5", timePrefix + "005"); + put("fileId6", timePrefix + "009"); + put("fileId7", timePrefix + "013"); } }; @@ -1047,60 +1142,60 @@ public class TestCleaner extends HoodieCleanerTestBase { Map<String, List<String>> part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file1P1, file2P1, file3P1, file4P1, file5P1, file6P1, file7P1)); // all 7 fileIds - commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "000", part1ToFileId, testTable, metadataWriter, true, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file3P1, file4P1, file5P1, file6P1, file7P1)); // fileIds 3 to 7 - commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "001", part1ToFileId, testTable, metadataWriter, true, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file4P1, file5P1, file6P1, file7P1)); // fileIds 4 to 7 - commitWithMdt("003", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "003", part1ToFileId, testTable, metadataWriter, true, true); // add compaction - testTable.addRequestedCompaction("004", new FileSlice(partition, "000", file2P1)); + testTable.addRequestedCompaction(timePrefix + "004", new FileSlice(partition, timePrefix + "000", file2P1)); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file2P1)); - commitWithMdt("005", part1ToFileId, testTable, metadataWriter, false, true); + commitWithMdt(timePrefix + "005", part1ToFileId, testTable, metadataWriter, false, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file5P1, file6P1, file7P1)); - commitWithMdt("0055", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "0055", part1ToFileId, testTable, metadataWriter, true, true); - testTable.addRequestedCompaction("006", new FileSlice(partition, "001", file3P1)); + testTable.addRequestedCompaction(timePrefix + "006", new FileSlice(partition, timePrefix + "001", file3P1)); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file3P1)); - commitWithMdt("007", part1ToFileId, testTable, metadataWriter, false, true); + commitWithMdt(timePrefix + "007", part1ToFileId, testTable, metadataWriter, false, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1)); - commitWithMdt("0075", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "0075", part1ToFileId, testTable, metadataWriter, true, true); - testTable.addRequestedCompaction("008", new FileSlice(partition, "003", file4P1)); + testTable.addRequestedCompaction(timePrefix + "008", new FileSlice(partition, timePrefix + "003", file4P1)); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file4P1)); - commitWithMdt("009", part1ToFileId, testTable, metadataWriter, false, true); + commitWithMdt(timePrefix + "009", part1ToFileId, testTable, metadataWriter, false, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1)); - commitWithMdt("0095", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "0095", part1ToFileId, testTable, metadataWriter, true, true); - testTable.addRequestedCompaction("010", new FileSlice(partition, "005", file5P1)); + testTable.addRequestedCompaction(timePrefix + "010", new FileSlice(partition, timePrefix + "005", file5P1)); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file5P1)); - commitWithMdt("011", part1ToFileId, testTable, metadataWriter, false, true); + commitWithMdt(timePrefix + "011", part1ToFileId, testTable, metadataWriter, false, true); part1ToFileId = new HashMap<>(); part1ToFileId.put(partition, Arrays.asList(file7P1)); - commitWithMdt("013", part1ToFileId, testTable, metadataWriter, true, true); + commitWithMdt(timePrefix + "013", part1ToFileId, testTable, metadataWriter, true, true); // Clean now metaClient = HoodieTableMetaClient.reload(metaClient); - List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, retryFailure); + List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, 14, true); // Test for safety final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 8c4a5cb377e..6182bc4d4eb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -267,6 +267,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline { return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION)); } + /** + * Get all instants (commits, delta commits, replace, compaction) that produce new data or merge file, in the active timeline. + */ + public HoodieTimeline getCommitsAndCompactionTimeline() { + return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, COMPACTION_ACTION)); + } + /** * Get all instants (commits, delta commits, compaction, clean, savepoint, rollback, replace commits, index) that result in actions, * in the active timeline.