This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7af080af0bb [HUDI-9330] Avoid storing the clean plan in inflight clean 
instant (#13192)
7af080af0bb is described below

commit 7af080af0bb8b5883cc9233bbb027b913e654815
Author: TheR1sing3un <chaoy...@apache.org>
AuthorDate: Sat May 24 15:08:00 2025 +0800

    [HUDI-9330] Avoid storing the clean plan in inflight clean instant (#13192)
---
 .../apache/hudi/table/action/clean/CleanActionExecutor.java    |  5 +++--
 .../hudi/table/action/clean/CleanPlanActionExecutor.java       |  2 +-
 .../hudi/table/action/clean/TestCleanPlanActionExecutor.java   | 10 +++++-----
 .../hudi/client/functional/TestExternalPathHandling.java       |  2 +-
 .../apache/hudi/table/functional/TestCleanActionExecutor.java  |  2 +-
 .../hudi/common/table/timeline/HoodieActiveTimeline.java       |  3 +--
 .../common/table/timeline/versioning/v1/ActiveTimelineV1.java  |  4 ++--
 .../common/table/timeline/versioning/v2/ActiveTimelineV2.java  |  4 ++--
 .../main/java/org/apache/hudi/common/util/CleanerUtils.java    |  8 ++++++++
 .../main/java/org/apache/hudi/common/util/ClusteringUtils.java |  3 +--
 .../hudi/common/table/timeline/TestHoodieActiveTimeline.java   |  2 +-
 .../java/org/apache/hudi/common/util/TestClusteringUtils.java  |  4 ++--
 12 files changed, 28 insertions(+), 21 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 1040da62745..2e6afe1fd45 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -209,7 +209,7 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
     try {
       final HoodieTimer timer = HoodieTimer.start();
       if (cleanInstant.isRequested()) {
-        inflightInstant = 
table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant, 
Option.of(cleanerPlan));
+        inflightInstant = 
table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant);
       } else {
         inflightInstant = cleanInstant;
       }
@@ -257,7 +257,8 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
       }
 
       for (HoodieInstant hoodieInstant : pendingCleanInstants) {
-        if (table.getCleanTimeline().isEmpty(hoodieInstant)) {
+        if 
(table.getCleanTimeline().isEmpty(CleanerUtils.getCleanRequestInstant(table.getMetaClient(),
 hoodieInstant))) {
+          // remove the empty instant
           table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
         } else {
           LOG.info("Finishing previously unfinished cleaner instant=" + 
hoodieInstant);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 0f84448e3d7..63de0dd7232 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -189,7 +189,7 @@ public class CleanPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
       if (activeTimeline.isEmpty(cleanInstant)) {
         activeTimeline.deleteEmptyInstantIfExists(cleanInstant);
-        HoodieInstant cleanPlanInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, cleanInstant.getAction(), 
cleanInstant.requestedTime(), 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
+        HoodieInstant cleanPlanInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, cleanInstant.getAction(), 
cleanInstant.requestedTime(), 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
         try {
           // Deserialize plan.
           return Option.of(activeTimeline.readCleanerPlan(cleanPlanInstant));
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/clean/TestCleanPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/clean/TestCleanPlanActionExecutor.java
index 0e00e828fa0..6a4844a7932 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/clean/TestCleanPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/clean/TestCleanPlanActionExecutor.java
@@ -61,12 +61,12 @@ class TestCleanPlanActionExecutor {
 
     // signal that last clean commit is just an empty file
     HoodieInstant lastCompletedInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, "clean", "001", 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
-    HoodieInstant lastInflightInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, "clean", "001", 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
+    HoodieInstant lastRequestInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, "clean", "001", 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
     mockEmptyLastCompletedClean(table, lastCompletedInstant, activeTimeline);
 
     HoodieCleanerPlan cleanerPlan;
     if (isEmptyPlan) {
-      
when(activeTimeline.readCleanerPlan(lastInflightInstant)).thenReturn(HoodieCleanerPlan.class.newInstance());
+      
when(activeTimeline.readCleanerPlan(lastRequestInstant)).thenReturn(HoodieCleanerPlan.class.newInstance());
       cleanerPlan = new HoodieCleanerPlan();
     } else {
       cleanerPlan = HoodieCleanerPlan.newBuilder()
@@ -75,7 +75,7 @@ class TestCleanPlanActionExecutor {
           .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
           .setVersion(TimelineLayoutVersion.CURR_VERSION)
           .build();
-      
when(activeTimeline.readCleanerPlan(lastInflightInstant)).thenReturn(cleanerPlan);
+      
when(activeTimeline.readCleanerPlan(lastRequestInstant)).thenReturn(cleanerPlan);
     }
 
     HoodieEngineContext engineContext = new HoodieLocalEngineContext(new 
HadoopStorageConfiguration(false));
@@ -96,10 +96,10 @@ class TestCleanPlanActionExecutor {
 
     // signal that last clean commit is just an empty file
     HoodieInstant lastCompletedInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, "clean", "001", 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
-    HoodieInstant lastInflightInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, "clean", "001", 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
+    HoodieInstant lastRequestInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, "clean", "001", 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
     mockEmptyLastCompletedClean(table, lastCompletedInstant, activeTimeline);
 
-    when(activeTimeline.readCleanerPlan(lastInflightInstant)).thenThrow(new 
HoodieIOException("failed to read"));
+    when(activeTimeline.readCleanerPlan(lastRequestInstant)).thenThrow(new 
HoodieIOException("failed to read"));
 
     HoodieEngineContext engineContext = new HoodieLocalEngineContext(new 
HadoopStorageConfiguration(false));
     CleanPlanActionExecutor<?, ?, ?, ?> executor = new 
CleanPlanActionExecutor<>(engineContext, 
HoodieWriteConfig.newBuilder().withPath("file://tmp").build(), table, "002", 
Option.empty());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
index 6d21c183ff7..651ca99d4c1 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
@@ -160,7 +160,7 @@ public class TestExternalPathHandling extends 
HoodieClientTestBase {
         Collections.singletonMap(partitionPath1, Collections.singletonList(new 
HoodieCleanFileInfo(filePath1, false))));
     
metaClient.getActiveTimeline().saveToCleanRequested(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
 HoodieTimeline.CLEAN_ACTION, cleanTime), Option.of(cleanerPlan));
     HoodieInstant inflightClean = 
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(
-        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.CLEAN_ACTION, cleanTime), Option.empty());
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.CLEAN_ACTION, cleanTime));
     List<HoodieCleanStat> cleanStats = 
Collections.singletonList(createCleanStat(partitionPath1, 
Arrays.asList(filePath1), instantTime2, instantTime3));
     HoodieCleanMetadata cleanMetadata = CleanerUtils.convertCleanMetadata(
         cleanTime,
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
index fc39c33a38c..e36a1d7cb65 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
@@ -147,7 +147,7 @@ public class TestCleanActionExecutor {
     HoodieTimeline inflightsAndRequestedTimeline = mock(HoodieTimeline.class);
     
when(cleanTimeline.filterInflightsAndRequested()).thenReturn(inflightsAndRequestedTimeline);
     
when(inflightsAndRequestedTimeline.getInstants()).thenReturn(Collections.singletonList(cleanInstant));
-    when(activeTimeline.transitionCleanRequestedToInflight(any(), 
any())).thenReturn(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
 HoodieTimeline.CLEAN_ACTION, "002"));
+    
when(activeTimeline.transitionCleanRequestedToInflight(any())).thenReturn(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
 HoodieTimeline.CLEAN_ACTION, "002"));
     when(mockHoodieTable.getMetadataWriter("002")).thenReturn(Option.empty());
 
     CleanActionExecutor cleanActionExecutor = new CleanActionExecutor(context, 
config, mockHoodieTable, "002");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 79fbeef2bfd..a947c5c69fa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -237,10 +237,9 @@ public interface HoodieActiveTimeline extends 
HoodieTimeline {
    * Transition Clean State from requested to inflight.
    *
    * @param requestedInstant requested instant
-   * @param metadata metadata to write into the instant file
    * @return commit instant
    */
-  HoodieInstant transitionCleanRequestedToInflight(HoodieInstant 
requestedInstant, Option<HoodieCleanerPlan> metadata);
+  HoodieInstant transitionCleanRequestedToInflight(HoodieInstant 
requestedInstant);
 
   /**
    * Transition Rollback State from inflight to Committed.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java
index e4ad19a7f8a..71cfdd0b3cd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java
@@ -406,11 +406,11 @@ public class ActiveTimelineV1 extends BaseTimelineV1 
implements HoodieActiveTime
   }
 
   @Override
-  public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant 
requestedInstant, Option<HoodieCleanerPlan> metadata) {
+  public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant 
requestedInstant) {
     
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
     ValidationUtils.checkArgument(requestedInstant.isRequested());
     HoodieInstant inflight = 
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, CLEAN_ACTION, 
requestedInstant.requestedTime());
-    transitionState(requestedInstant, inflight, metadata);
+    transitionState(requestedInstant, inflight, Option.empty());
     return inflight;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java
index 7d57646dfb5..8ed80c68108 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java
@@ -420,11 +420,11 @@ public class ActiveTimelineV2 extends BaseTimelineV2 
implements HoodieActiveTime
   }
 
   @Override
-  public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant 
requestedInstant, Option<HoodieCleanerPlan> metadata) {
+  public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant 
requestedInstant) {
     
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
     ValidationUtils.checkArgument(requestedInstant.isRequested());
     HoodieInstant inflight = 
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, CLEAN_ACTION, 
requestedInstant.requestedTime());
-    transitionPendingState(requestedInstant, inflight, metadata);
+    transitionPendingState(requestedInstant, inflight, Option.empty());
     return inflight;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 9ded7dffd0f..0ff7fee37b7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -168,10 +168,18 @@ public class CleanerUtils {
   public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient 
metaClient, HoodieInstant cleanInstant)
       throws IOException {
     CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
+    cleanInstant = getCleanRequestInstant(metaClient, cleanInstant);
     HoodieCleanerPlan cleanerPlan = 
metaClient.getActiveTimeline().readCleanerPlan(cleanInstant);
     return cleanPlanMigrator.upgradeToLatest(cleanerPlan, 
cleanerPlan.getVersion());
   }
 
+  public static HoodieInstant getCleanRequestInstant(HoodieTableMetaClient 
metaClient, HoodieInstant cleanInstant) {
+    if (!cleanInstant.isRequested()) {
+      return 
metaClient.getInstantGenerator().getRequestedInstant(cleanInstant);
+    }
+    return cleanInstant;
+  }
+
   public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient 
metaClient, InputStream in)
       throws IOException {
     CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index d0c2961c92e..c6212f5d1f3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -400,7 +400,6 @@ public class ClusteringUtils {
    */
   public static Option<HoodieInstant> getEarliestInstantToRetainForClustering(
       HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient, 
HoodieCleaningPolicy cleanerPolicy) throws IOException {
-    InstantGenerator factory = metaClient.getInstantGenerator();
     Option<HoodieInstant> oldestInstantToRetain = Option.empty();
     HoodieTimeline replaceOrClusterTimeline = 
activeTimeline.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.REPLACE_COMMIT_ACTION,
 HoodieTimeline.CLUSTERING_ACTION));
     if (!replaceOrClusterTimeline.empty()) {
@@ -410,7 +409,7 @@ public class ClusteringUtils {
         // The first clustering instant of which timestamp is greater than or 
equal to the earliest commit to retain of
         // the clean metadata.
         HoodieInstant cleanInstant = cleanInstantOpt.get();
-        HoodieCleanerPlan cleanerPlan = 
CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested() ? 
cleanInstant : factory.getCleanRequestedInstant(cleanInstant.requestedTime()));
+        HoodieCleanerPlan cleanerPlan = 
CleanerUtils.getCleanerPlan(metaClient, cleanInstant);
         Option<String> earliestInstantToRetain = 
Option.ofNullable(cleanerPlan.getEarliestInstantToRetain()).map(HoodieActionInstant::getTimestamp);
         String retainLowerBound;
         Option<String> earliestReplacedSavepointInClean = 
getEarliestReplacedSavepointInClean(activeTimeline, cleanerPolicy, cleanerPlan);
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 3ab73a13672..33085c40b62 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -444,7 +444,7 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
     timeline.saveToCleanRequested(clean, Option.empty());
     timeline = timeline.reload();
     assertTrue(timeline.containsInstant(clean));
-    inflight = timeline.transitionCleanRequestedToInflight(clean, 
Option.empty());
+    inflight = timeline.transitionCleanRequestedToInflight(clean);
     timeline = timeline.reload();
     assertFalse(timeline.containsInstant(clean));
     assertTrue(timeline.containsInstant(inflight));
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index f822e363f1a..668a6c61f11 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -182,7 +182,7 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
         .setVersion(CleanPlanV2MigrationHandler.VERSION)
         .build();
     metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, 
Option.of(cleanerPlan1));
-    HoodieInstant inflightInstant4 = 
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4,
 Option.empty());
+    HoodieInstant inflightInstant4 = 
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4);
     HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 
1L, 1,
         completedInstant3.requestedTime(), "", Collections.emptyMap(), 0, 
Collections.emptyMap(), Collections.emptyMap());
     metaClient.getActiveTimeline().transitionCleanInflightToComplete(true, 
inflightInstant4, Option.of(cleanMetadata));
@@ -211,7 +211,7 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
         HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name(), 
Collections.emptyMap(),
         CleanPlanV2MigrationHandler.VERSION, Collections.emptyMap(), 
Collections.emptyList(), Collections.EMPTY_MAP);
     metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant2, 
Option.of(cleanerPlan1));
-    HoodieInstant inflightInstant2 = 
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant2,
 Option.empty());
+    HoodieInstant inflightInstant2 = 
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant2);
     HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 
1L, 1,
         "", "", Collections.emptyMap(), 0, Collections.emptyMap(), 
Collections.emptyMap());
     metaClient.getActiveTimeline().transitionCleanInflightToComplete(true, 
inflightInstant2, Option.of(cleanMetadata));

Reply via email to