This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 07164406c44 [HUDI-6423] Incremental cleaning should consider inflight 
compaction instant  (#9038)
07164406c44 is described below

commit 07164406c44b4092eee810710a242d092c97bd58
Author: zhuanshenbsj1 <34104400+zhuanshenb...@users.noreply.github.com>
AuthorDate: Wed Jul 5 11:05:57 2023 +0800

    [HUDI-6423] Incremental cleaning should consider inflight compaction 
instant  (#9038)
    
    * The CleanPlanner#getEarliestCommitToRetain should consider pending 
compaction instants. If the pending compaction got missed under incremental 
cleaning mode, some files may never be cleaned when the cleaner moved to a 
different partition:
    
    --------  par1 ----          | ----- par2 ->
    dc.1 compaction.2 dc.3 | dc.4
    
    Assumes we have 3 delta commits and 1 pending compaction commit on the 
timeline, if the `EarliestCommitToRetain ` was recorded to dc.3, when the 
dc4(or subsequent instants) triggers cleaning, the cleaner just checks the 
timeline with dc.3, and the compaction.2 got skipped for ever if no subsequent 
mutations were made to partition par1.
    
    ---------
    
    Co-authored-by: Danny Chan <yuzhao....@gmail.com>
---
 .../action/clean/CleanPlanActionExecutor.java      |   1 +
 .../hudi/table/action/clean/CleanPlanner.java      |   2 +-
 .../java/org/apache/hudi/table/TestCleaner.java    | 183 ++++++++++++++++-----
 .../table/timeline/HoodieDefaultTimeline.java      |   7 +
 4 files changed, 148 insertions(+), 45 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index ba7c71b1356..b494df42b49 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -111,6 +111,7 @@ public class CleanPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
         LOG.info("Nothing to clean here. It is already clean");
         return 
HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
       }
+      LOG.info("Earliest commit to retain for clean : " + 
(earliestInstant.isPresent() ? earliestInstant.get().getTimestamp() : "null"));
       LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", 
with policy " + config.getCleanerPolicy());
       int cleanerParallelism = Math.min(partitionsToClean.size(), 
config.getCleanerParallelism());
       LOG.info("Using cleanerParallelism: " + cleanerParallelism);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index be949fedb37..80aa7b31624 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -509,7 +509,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
    */
   public Option<HoodieInstant> getEarliestCommitToRetain() {
     return CleanerUtils.getEarliestCommitToRetain(
-        hoodieTable.getMetaClient().getActiveTimeline().getCommitsTimeline(),
+        
hoodieTable.getMetaClient().getActiveTimeline().getCommitsAndCompactionTimeline(),
         config.getCleanerPolicy(),
         config.getCleanerCommitsRetained(),
         Instant.now(),
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index d1e77613691..17a12dcc7ff 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.HoodieTimelineArchiver;
+import org.apache.hudi.client.SparkRDDReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -260,6 +261,97 @@ public class TestCleaner extends HoodieCleanerTestBase {
     }
   }
 
+  /**
+   * Test earliest commit to retain should be earlier than first pending 
compaction in incremental cleaning scenarios.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testEarliestInstantToRetainForPendingCompaction() throws 
IOException {
+    HoodieWriteConfig writeConfig = getConfigBuilder().withPath(basePath)
+            .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+                    .withEnableBackupForRemoteFileSystemView(false)
+                    .build())
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
+                    .withAutoClean(false)
+                    
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                    .retainCommits(1)
+                    .build())
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                    .withInlineCompaction(false)
+                    .withMaxNumDeltaCommitsBeforeCompaction(1)
+                    .compactionSmallFileSize(1024 * 1024 * 1024)
+                    .build())
+            .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+                    .withAutoArchive(false)
+                    .archiveCommitsWith(2,3)
+                    .build())
+            .withEmbeddedTimelineServerEnabled(false).build();
+
+    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, 
writeConfig)) {
+
+      final String partition1 = "2023/06/01";
+      final String partition2 = "2023/06/02";
+      String instantTime = "";
+      String earliestInstantToRetain = "";
+
+      for (int idx = 0; idx < 3; ++idx) {
+        instantTime = HoodieActiveTimeline.createNewInstantTime();
+        if (idx == 2) {
+          earliestInstantToRetain = instantTime;
+        }
+        List<HoodieRecord> records = 
dataGen.generateInsertsForPartition(instantTime, 1, partition1);
+        client.startCommitWithTime(instantTime);
+        client.insert(jsc.parallelize(records, 1), instantTime).collect();
+      }
+
+
+      instantTime = HoodieActiveTimeline.createNewInstantTime();
+      HoodieTable table = HoodieSparkTable.create(writeConfig, context);
+      Option<HoodieCleanerPlan> cleanPlan = table.scheduleCleaning(context, 
instantTime, Option.empty());
+      
assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition1).size(),
 1);
+      assertEquals(earliestInstantToRetain, 
cleanPlan.get().getEarliestInstantToRetain().getTimestamp(),
+              "clean until " + earliestInstantToRetain);
+      table.getMetaClient().reloadActiveTimeline();
+      table.clean(context, instantTime);
+
+
+      instantTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records = 
dataGen.generateInsertsForPartition(instantTime, 1, partition1);
+      client.startCommitWithTime(instantTime);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+      client.insert(recordsRDD, instantTime).collect();
+
+
+      instantTime = HoodieActiveTimeline.createNewInstantTime();
+      earliestInstantToRetain = instantTime;
+      List<HoodieRecord> updatedRecords = dataGen.generateUpdates(instantTime, 
records);
+      JavaRDD<HoodieRecord> updatedRecordsRDD = 
jsc.parallelize(updatedRecords, 1);
+      SparkRDDReadClient readClient = new SparkRDDReadClient(context, 
writeConfig);
+      JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = 
readClient.tagLocation(updatedRecordsRDD);
+      client.startCommitWithTime(instantTime);
+      client.upsertPreppedRecords(updatedTaggedRecordsRDD, 
instantTime).collect();
+
+      table.getMetaClient().reloadActiveTimeline();
+      // pending compaction
+      String compactionInstantTime = 
client.scheduleCompaction(Option.empty()).get().toString();
+
+      for (int idx = 0; idx < 3; ++idx) {
+        instantTime = HoodieActiveTimeline.createNewInstantTime();
+        records = dataGen.generateInsertsForPartition(instantTime, 1, 
partition2);
+        client.startCommitWithTime(instantTime);
+        client.insert(jsc.parallelize(records, 1), instantTime).collect();
+      }
+
+      // earliest commit to retain should be earlier than first pending 
compaction in incremental cleaning scenarios.
+      instantTime = HoodieActiveTimeline.createNewInstantTime();
+      cleanPlan = table.scheduleCleaning(context, instantTime, Option.empty());
+      
assertEquals(earliestInstantToRetain,cleanPlan.get().getEarliestInstantToRetain().getTimestamp());
+    }
+  }
+
   /**
    * Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple 
config is set to false.
    */
@@ -777,16 +869,17 @@ public class TestCleaner extends HoodieCleanerTestBase {
         .withCleanConfig(HoodieCleanConfig.newBuilder()
             
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(2).build())
         .build();
+
     // Deletions:
-    // . FileId Base Logs Total Retained Commits
-    // FileId7 5 10 15 009, 011
-    // FileId6 5 10 15 009
-    // FileId5 3 6 9 005
-    // FileId4 2 4 6 003
-    // FileId3 1 2 3 001
-    // FileId2 0 0 0 000
-    // FileId1 0 0 0 000
-    testPendingCompactions(config, 48, 18, false);
+    // . FileId   Base  Logs  Total Retained_Commits  Under_Compaction
+    //   FileId7  1     2     3     001,003           false
+    //   FileId6  1     2     3     001,003           false
+    //   FileId5  1     2     3     001,003           true
+    //   FileId4  1     2     3     001,003           true
+    //   FileId3  1     2     3     001               true
+    //   FileId2  0     0     0     000               true
+    //   FileId1  0     0     0     000               false
+    testPendingCompactions(config, 15, 9, false);
   }
 
   /**
@@ -801,15 +894,16 @@ public class TestCleaner extends HoodieCleanerTestBase {
             .withCleanConfig(HoodieCleanConfig.newBuilder()
                 
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build())
             .build();
+
     // Deletions:
-    // . FileId Base Logs Total Retained Commits
-    // FileId7 5 10 15 009, 011
-    // FileId6 4 8 12 007, 009
-    // FileId5 2 4 6 003 005
-    // FileId4 1 2 3 001, 003
-    // FileId3 0 0 0 000, 001
-    // FileId2 0 0 0 000
-    // FileId1 0 0 0 000
+    // . FileId   Base  Logs  Total Retained_Commits  Under_Compaction
+    //   FileId7  5     10    15    009,013           false
+    //   FileId6  4     8     12    007,009           false
+    //   FileId5  2     4     6     003,005           true
+    //   FileId4  1     2     3     001,003           true
+    //   FileId3  0     0     0     000,001           true
+    //   FileId2  0     0     0     000               true
+    //   FileId1  0     0     0     000               false
     testPendingCompactions(config, 36, 9, retryFailure);
   }
 
@@ -1005,23 +1099,24 @@ public class TestCleaner extends HoodieCleanerTestBase {
     HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
 
     final String partition = "2016/03/15";
+    String timePrefix = "00000000000";
     Map<String, String> expFileIdToPendingCompaction = new HashMap<String, 
String>() {
       {
-        put("fileId2", "004");
-        put("fileId3", "006");
-        put("fileId4", "008");
-        put("fileId5", "010");
+        put("fileId2", timePrefix + "004");
+        put("fileId3", timePrefix + "006");
+        put("fileId4", timePrefix + "008");
+        put("fileId5", timePrefix + "010");
       }
     };
     Map<String, String> fileIdToLatestInstantBeforeCompaction = new 
HashMap<String, String>() {
       {
-        put("fileId1", "000");
-        put("fileId2", "000");
-        put("fileId3", "001");
-        put("fileId4", "003");
-        put("fileId5", "005");
-        put("fileId6", "009");
-        put("fileId7", "011");
+        put("fileId1", timePrefix + "000");
+        put("fileId2", timePrefix + "000");
+        put("fileId3", timePrefix + "001");
+        put("fileId4", timePrefix + "003");
+        put("fileId5", timePrefix + "005");
+        put("fileId6", timePrefix + "009");
+        put("fileId7", timePrefix + "013");
       }
     };
 
@@ -1047,60 +1142,60 @@ public class TestCleaner extends HoodieCleanerTestBase {
     Map<String, List<String>> part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file1P1, file2P1, file3P1, 
file4P1, file5P1, file6P1, file7P1));
     // all 7 fileIds
-    commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true, true);
+    commitWithMdt(timePrefix + "000", part1ToFileId, testTable, 
metadataWriter, true, true);
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file3P1, file4P1, file5P1, 
file6P1, file7P1));
     // fileIds 3 to 7
-    commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true, true);
+    commitWithMdt(timePrefix + "001", part1ToFileId, testTable, 
metadataWriter, true, true);
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file4P1, file5P1, file6P1, 
file7P1));
     // fileIds 4 to 7
-    commitWithMdt("003", part1ToFileId, testTable, metadataWriter, true, true);
+    commitWithMdt(timePrefix + "003", part1ToFileId, testTable, 
metadataWriter, true, true);
 
     // add compaction
-    testTable.addRequestedCompaction("004", new FileSlice(partition, "000", 
file2P1));
+    testTable.addRequestedCompaction(timePrefix + "004", new 
FileSlice(partition, timePrefix + "000", file2P1));
 
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file2P1));
-    commitWithMdt("005", part1ToFileId, testTable, metadataWriter, false, 
true);
+    commitWithMdt(timePrefix + "005", part1ToFileId, testTable, 
metadataWriter, false, true);
 
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file5P1, file6P1, file7P1));
-    commitWithMdt("0055", part1ToFileId, testTable, metadataWriter, true, 
true);
+    commitWithMdt(timePrefix + "0055", part1ToFileId, testTable, 
metadataWriter, true, true);
 
-    testTable.addRequestedCompaction("006", new FileSlice(partition, "001", 
file3P1));
+    testTable.addRequestedCompaction(timePrefix + "006", new 
FileSlice(partition, timePrefix + "001", file3P1));
 
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file3P1));
-    commitWithMdt("007", part1ToFileId, testTable, metadataWriter, false, 
true);
+    commitWithMdt(timePrefix + "007", part1ToFileId, testTable, 
metadataWriter, false, true);
 
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1));
-    commitWithMdt("0075", part1ToFileId, testTable, metadataWriter, true, 
true);
+    commitWithMdt(timePrefix + "0075", part1ToFileId, testTable, 
metadataWriter, true, true);
 
-    testTable.addRequestedCompaction("008", new FileSlice(partition, "003", 
file4P1));
+    testTable.addRequestedCompaction(timePrefix + "008", new 
FileSlice(partition, timePrefix + "003", file4P1));
 
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file4P1));
-    commitWithMdt("009", part1ToFileId, testTable, metadataWriter, false, 
true);
+    commitWithMdt(timePrefix + "009", part1ToFileId, testTable, 
metadataWriter, false, true);
 
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1));
-    commitWithMdt("0095", part1ToFileId, testTable, metadataWriter, true, 
true);
+    commitWithMdt(timePrefix + "0095", part1ToFileId, testTable, 
metadataWriter, true, true);
 
-    testTable.addRequestedCompaction("010", new FileSlice(partition, "005", 
file5P1));
+    testTable.addRequestedCompaction(timePrefix + "010", new 
FileSlice(partition, timePrefix + "005", file5P1));
 
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file5P1));
-    commitWithMdt("011", part1ToFileId, testTable, metadataWriter, false, 
true);
+    commitWithMdt(timePrefix + "011", part1ToFileId, testTable, 
metadataWriter, false, true);
 
     part1ToFileId = new HashMap<>();
     part1ToFileId.put(partition, Arrays.asList(file7P1));
-    commitWithMdt("013", part1ToFileId, testTable, metadataWriter, true, true);
+    commitWithMdt(timePrefix + "013", part1ToFileId, testTable, 
metadataWriter, true, true);
 
     // Clean now
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, retryFailure);
+    List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, 14, true);
 
     // Test for safety
     final HoodieTableMetaClient newMetaClient = 
HoodieTableMetaClient.reload(metaClient);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 8c4a5cb377e..6182bc4d4eb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -267,6 +267,13 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION));
   }
 
+  /**
+   * Get all instants (commits, delta commits, replace, compaction) that 
produce new data or merge file, in the active timeline.
+   */
+  public HoodieTimeline getCommitsAndCompactionTimeline() {
+    return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, COMPACTION_ACTION));
+  }
+
   /**
    * Get all instants (commits, delta commits, compaction, clean, savepoint, 
rollback, replace commits, index) that result in actions,
    * in the active timeline.

Reply via email to