This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 7af080af0bb [HUDI-9330] Avoid storing the clean plan in inflight clean instant (#13192) 7af080af0bb is described below commit 7af080af0bb8b5883cc9233bbb027b913e654815 Author: TheR1sing3un <chaoy...@apache.org> AuthorDate: Sat May 24 15:08:00 2025 +0800 [HUDI-9330] Avoid storing the clean plan in inflight clean instant (#13192) --- .../apache/hudi/table/action/clean/CleanActionExecutor.java | 5 +++-- .../hudi/table/action/clean/CleanPlanActionExecutor.java | 2 +- .../hudi/table/action/clean/TestCleanPlanActionExecutor.java | 10 +++++----- .../hudi/client/functional/TestExternalPathHandling.java | 2 +- .../apache/hudi/table/functional/TestCleanActionExecutor.java | 2 +- .../hudi/common/table/timeline/HoodieActiveTimeline.java | 3 +-- .../common/table/timeline/versioning/v1/ActiveTimelineV1.java | 4 ++-- .../common/table/timeline/versioning/v2/ActiveTimelineV2.java | 4 ++-- .../main/java/org/apache/hudi/common/util/CleanerUtils.java | 8 ++++++++ .../main/java/org/apache/hudi/common/util/ClusteringUtils.java | 3 +-- .../hudi/common/table/timeline/TestHoodieActiveTimeline.java | 2 +- .../java/org/apache/hudi/common/util/TestClusteringUtils.java | 4 ++-- 12 files changed, 28 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 1040da62745..2e6afe1fd45 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -209,7 +209,7 @@ public class CleanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, try { final HoodieTimer timer = HoodieTimer.start(); if (cleanInstant.isRequested()) { - inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant, Option.of(cleanerPlan)); + inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant); } else { inflightInstant = cleanInstant; } @@ -257,7 +257,8 @@ public class CleanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, } for (HoodieInstant hoodieInstant : pendingCleanInstants) { - if (table.getCleanTimeline().isEmpty(hoodieInstant)) { + if (table.getCleanTimeline().isEmpty(CleanerUtils.getCleanRequestInstant(table.getMetaClient(), hoodieInstant))) { + // remove the empty instant table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant); } else { LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 0f84448e3d7..63de0dd7232 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -189,7 +189,7 @@ public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); if (activeTimeline.isEmpty(cleanInstant)) { activeTimeline.deleteEmptyInstantIfExists(cleanInstant); - HoodieInstant cleanPlanInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, cleanInstant.getAction(), cleanInstant.requestedTime(), InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + HoodieInstant cleanPlanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, cleanInstant.getAction(), cleanInstant.requestedTime(), InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); try { // Deserialize plan. return Option.of(activeTimeline.readCleanerPlan(cleanPlanInstant)); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/clean/TestCleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/clean/TestCleanPlanActionExecutor.java index 0e00e828fa0..6a4844a7932 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/clean/TestCleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/clean/TestCleanPlanActionExecutor.java @@ -61,12 +61,12 @@ class TestCleanPlanActionExecutor { // signal that last clean commit is just an empty file HoodieInstant lastCompletedInstant = new HoodieInstant(HoodieInstant.State.COMPLETED, "clean", "001", InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); - HoodieInstant lastInflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, "clean", "001", InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + HoodieInstant lastRequestInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "clean", "001", InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); mockEmptyLastCompletedClean(table, lastCompletedInstant, activeTimeline); HoodieCleanerPlan cleanerPlan; if (isEmptyPlan) { - when(activeTimeline.readCleanerPlan(lastInflightInstant)).thenReturn(HoodieCleanerPlan.class.newInstance()); + when(activeTimeline.readCleanerPlan(lastRequestInstant)).thenReturn(HoodieCleanerPlan.class.newInstance()); cleanerPlan = new HoodieCleanerPlan(); } else { cleanerPlan = HoodieCleanerPlan.newBuilder() @@ -75,7 +75,7 @@ class TestCleanPlanActionExecutor { .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) .setVersion(TimelineLayoutVersion.CURR_VERSION) .build(); - when(activeTimeline.readCleanerPlan(lastInflightInstant)).thenReturn(cleanerPlan); + when(activeTimeline.readCleanerPlan(lastRequestInstant)).thenReturn(cleanerPlan); } HoodieEngineContext engineContext = new HoodieLocalEngineContext(new HadoopStorageConfiguration(false)); @@ -96,10 +96,10 @@ class TestCleanPlanActionExecutor { // signal that last clean commit is just an empty file HoodieInstant lastCompletedInstant = new HoodieInstant(HoodieInstant.State.COMPLETED, "clean", "001", InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); - HoodieInstant lastInflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, "clean", "001", InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + HoodieInstant lastRequestInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "clean", "001", InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); mockEmptyLastCompletedClean(table, lastCompletedInstant, activeTimeline); - when(activeTimeline.readCleanerPlan(lastInflightInstant)).thenThrow(new HoodieIOException("failed to read")); + when(activeTimeline.readCleanerPlan(lastRequestInstant)).thenThrow(new HoodieIOException("failed to read")); HoodieEngineContext engineContext = new HoodieLocalEngineContext(new HadoopStorageConfiguration(false)); CleanPlanActionExecutor<?, ?, ?, ?> executor = new CleanPlanActionExecutor<>(engineContext, HoodieWriteConfig.newBuilder().withPath("file://tmp").build(), table, "002", Option.empty()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java index 6d21c183ff7..651ca99d4c1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java @@ -160,7 +160,7 @@ public class TestExternalPathHandling extends HoodieClientTestBase { Collections.singletonMap(partitionPath1, Collections.singletonList(new HoodieCleanFileInfo(filePath1, false)))); metaClient.getActiveTimeline().saveToCleanRequested(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime), Option.of(cleanerPlan)); HoodieInstant inflightClean = metaClient.getActiveTimeline().transitionCleanRequestedToInflight( - INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime), Option.empty()); + INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime)); List<HoodieCleanStat> cleanStats = Collections.singletonList(createCleanStat(partitionPath1, Arrays.asList(filePath1), instantTime2, instantTime3)); HoodieCleanMetadata cleanMetadata = CleanerUtils.convertCleanMetadata( cleanTime, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java index fc39c33a38c..e36a1d7cb65 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java @@ -147,7 +147,7 @@ public class TestCleanActionExecutor { HoodieTimeline inflightsAndRequestedTimeline = mock(HoodieTimeline.class); when(cleanTimeline.filterInflightsAndRequested()).thenReturn(inflightsAndRequestedTimeline); when(inflightsAndRequestedTimeline.getInstants()).thenReturn(Collections.singletonList(cleanInstant)); - when(activeTimeline.transitionCleanRequestedToInflight(any(), any())).thenReturn(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.CLEAN_ACTION, "002")); + when(activeTimeline.transitionCleanRequestedToInflight(any())).thenReturn(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.CLEAN_ACTION, "002")); when(mockHoodieTable.getMetadataWriter("002")).thenReturn(Option.empty()); CleanActionExecutor cleanActionExecutor = new CleanActionExecutor(context, config, mockHoodieTable, "002"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 79fbeef2bfd..a947c5c69fa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -237,10 +237,9 @@ public interface HoodieActiveTimeline extends HoodieTimeline { * Transition Clean State from requested to inflight. * * @param requestedInstant requested instant - * @param metadata metadata to write into the instant file * @return commit instant */ - HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant, Option<HoodieCleanerPlan> metadata); + HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant); /** * Transition Rollback State from inflight to Committed. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java index e4ad19a7f8a..71cfdd0b3cd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java @@ -406,11 +406,11 @@ public class ActiveTimelineV1 extends BaseTimelineV1 implements HoodieActiveTime } @Override - public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant, Option<HoodieCleanerPlan> metadata) { + public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant) { ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); ValidationUtils.checkArgument(requestedInstant.isRequested()); HoodieInstant inflight = instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, CLEAN_ACTION, requestedInstant.requestedTime()); - transitionState(requestedInstant, inflight, metadata); + transitionState(requestedInstant, inflight, Option.empty()); return inflight; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java index 7d57646dfb5..8ed80c68108 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java @@ -420,11 +420,11 @@ public class ActiveTimelineV2 extends BaseTimelineV2 implements HoodieActiveTime } @Override - public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant, Option<HoodieCleanerPlan> metadata) { + public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant) { ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); ValidationUtils.checkArgument(requestedInstant.isRequested()); HoodieInstant inflight = instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, CLEAN_ACTION, requestedInstant.requestedTime()); - transitionPendingState(requestedInstant, inflight, metadata); + transitionPendingState(requestedInstant, inflight, Option.empty()); return inflight; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index 9ded7dffd0f..0ff7fee37b7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -168,10 +168,18 @@ public class CleanerUtils { public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant) throws IOException { CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient); + cleanInstant = getCleanRequestInstant(metaClient, cleanInstant); HoodieCleanerPlan cleanerPlan = metaClient.getActiveTimeline().readCleanerPlan(cleanInstant); return cleanPlanMigrator.upgradeToLatest(cleanerPlan, cleanerPlan.getVersion()); } + public static HoodieInstant getCleanRequestInstant(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant) { + if (!cleanInstant.isRequested()) { + return metaClient.getInstantGenerator().getRequestedInstant(cleanInstant); + } + return cleanInstant; + } + public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, InputStream in) throws IOException { CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index d0c2961c92e..c6212f5d1f3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -400,7 +400,6 @@ public class ClusteringUtils { */ public static Option<HoodieInstant> getEarliestInstantToRetainForClustering( HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient, HoodieCleaningPolicy cleanerPolicy) throws IOException { - InstantGenerator factory = metaClient.getInstantGenerator(); Option<HoodieInstant> oldestInstantToRetain = Option.empty(); HoodieTimeline replaceOrClusterTimeline = activeTimeline.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.CLUSTERING_ACTION)); if (!replaceOrClusterTimeline.empty()) { @@ -410,7 +409,7 @@ public class ClusteringUtils { // The first clustering instant of which timestamp is greater than or equal to the earliest commit to retain of // the clean metadata. HoodieInstant cleanInstant = cleanInstantOpt.get(); - HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested() ? cleanInstant : factory.getCleanRequestedInstant(cleanInstant.requestedTime())); + HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(metaClient, cleanInstant); Option<String> earliestInstantToRetain = Option.ofNullable(cleanerPlan.getEarliestInstantToRetain()).map(HoodieActionInstant::getTimestamp); String retainLowerBound; Option<String> earliestReplacedSavepointInClean = getEarliestReplacedSavepointInClean(activeTimeline, cleanerPolicy, cleanerPlan); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 3ab73a13672..33085c40b62 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -444,7 +444,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { timeline.saveToCleanRequested(clean, Option.empty()); timeline = timeline.reload(); assertTrue(timeline.containsInstant(clean)); - inflight = timeline.transitionCleanRequestedToInflight(clean, Option.empty()); + inflight = timeline.transitionCleanRequestedToInflight(clean); timeline = timeline.reload(); assertFalse(timeline.containsInstant(clean)); assertTrue(timeline.containsInstant(inflight)); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java index f822e363f1a..668a6c61f11 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java @@ -182,7 +182,7 @@ public class TestClusteringUtils extends HoodieCommonTestHarness { .setVersion(CleanPlanV2MigrationHandler.VERSION) .build(); metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, Option.of(cleanerPlan1)); - HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty()); + HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4); HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 1L, 1, completedInstant3.requestedTime(), "", Collections.emptyMap(), 0, Collections.emptyMap(), Collections.emptyMap()); metaClient.getActiveTimeline().transitionCleanInflightToComplete(true, inflightInstant4, Option.of(cleanMetadata)); @@ -211,7 +211,7 @@ public class TestClusteringUtils extends HoodieCommonTestHarness { HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name(), Collections.emptyMap(), CleanPlanV2MigrationHandler.VERSION, Collections.emptyMap(), Collections.emptyList(), Collections.EMPTY_MAP); metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant2, Option.of(cleanerPlan1)); - HoodieInstant inflightInstant2 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant2, Option.empty()); + HoodieInstant inflightInstant2 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant2); HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 1L, 1, "", "", Collections.emptyMap(), 0, Collections.emptyMap(), Collections.emptyMap()); metaClient.getActiveTimeline().transitionCleanInflightToComplete(true, inflightInstant2, Option.of(cleanMetadata));