This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 44ab6f32bff [HUDI-6538] Refactor methods in TimelineDiffHelper class (#10938) 44ab6f32bff is described below commit 44ab6f32bffbab8cd250bd0430d9591360f118e7 Author: wombatu-kun <wombatu...@gmail.com> AuthorDate: Mon Apr 1 12:47:27 2024 +0700 [HUDI-6538] Refactor methods in TimelineDiffHelper class (#10938) --- .../common/table/timeline/TimelineDiffHelper.java | 66 +++++++--------------- 1 file changed, 21 insertions(+), 45 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java index aa7e2a30754..a98b71aa571 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java @@ -37,8 +37,11 @@ public class TimelineDiffHelper { private static final Logger LOG = LoggerFactory.getLogger(TimelineDiffHelper.class); + private TimelineDiffHelper() { + } + public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline, - HoodieTimeline newTimeline) { + HoodieTimeline newTimeline) { HoodieTimeline oldT = oldTimeline.filterCompletedAndCompactionInstants(); HoodieTimeline newT = newTimeline.filterCompletedAndCompactionInstants(); @@ -57,14 +60,14 @@ public class TimelineDiffHelper { List<HoodieInstant> newInstants = new ArrayList<>(); // Check If any pending compaction is lost. If so, do not allow incremental timeline sync - List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = getPendingCompactionTransitions(oldT, newT); + List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = getPendingActionTransitions(oldT.filterPendingCompactionTimeline(), + newT, HoodieTimeline.COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION); List<HoodieInstant> lostPendingCompactions = compactionInstants.stream() .filter(instantPair -> instantPair.getValue() == null).map(Pair::getKey).collect(Collectors.toList()); if (!lostPendingCompactions.isEmpty()) { // If a compaction is unscheduled, fall back to complete refresh of fs view since some log files could have been // moved. Its unsafe to incrementally sync in that case. - LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :" - + lostPendingCompactions); + LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are: {}", lostPendingCompactions); return TimelineDiffResult.UNSAFE_SYNC_RESULT; } List<HoodieInstant> finishedCompactionInstants = compactionInstants.stream() @@ -74,7 +77,8 @@ public class TimelineDiffHelper { newTimeline.getInstantsAsStream().filter(instant -> !oldTimelineInstants.contains(instant)).forEach(newInstants::add); - List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = getPendingLogCompactionTransitions(oldTimeline, newTimeline); + List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = getPendingActionTransitions(oldTimeline.filterPendingLogCompactionTimeline(), + newTimeline, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.LOG_COMPACTION_ACTION); List<HoodieInstant> finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream() .filter(instantPair -> !instantPair.getKey().isCompleted() && (instantPair.getValue() == null || instantPair.getValue().isCompleted())) @@ -87,52 +91,24 @@ public class TimelineDiffHelper { } } - /** - * Getting pending log compaction transitions. - */ - private static List<Pair<HoodieInstant, HoodieInstant>> getPendingLogCompactionTransitions(HoodieTimeline oldTimeline, - HoodieTimeline newTimeline) { - Set<HoodieInstant> newTimelineInstants = newTimeline.getInstantsAsStream().collect(Collectors.toSet()); - - return oldTimeline.filterPendingLogCompactionTimeline().getInstantsAsStream().map(instant -> { - if (newTimelineInstants.contains(instant)) { - return Pair.of(instant, instant); - } else { - HoodieInstant logCompacted = - new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp()); - if (newTimelineInstants.contains(logCompacted)) { - return Pair.of(instant, logCompacted); - } - HoodieInstant inflightLogCompacted = - new HoodieInstant(State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, instant.getTimestamp()); - if (newTimelineInstants.contains(inflightLogCompacted)) { - return Pair.of(instant, inflightLogCompacted); - } - return Pair.<HoodieInstant, HoodieInstant>of(instant, null); - } - }).collect(Collectors.toList()); - } - - /** - * Getting pending compaction transitions. - */ - private static List<Pair<HoodieInstant, HoodieInstant>> getPendingCompactionTransitions(HoodieTimeline oldTimeline, - HoodieTimeline newTimeline) { + private static List<Pair<HoodieInstant, HoodieInstant>> getPendingActionTransitions(HoodieTimeline pendingActionTimelineFromOld, + HoodieTimeline newTimeline, + String completedAction, String pendingAction) { Set<HoodieInstant> newTimelineInstants = newTimeline.getInstantsAsStream().collect(Collectors.toSet()); - return oldTimeline.filterPendingCompactionTimeline().getInstantsAsStream().map(instant -> { + return pendingActionTimelineFromOld.getInstantsAsStream().map(instant -> { if (newTimelineInstants.contains(instant)) { return Pair.of(instant, instant); } else { - HoodieInstant compacted = - new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, instant.getTimestamp()); - if (newTimelineInstants.contains(compacted)) { - return Pair.of(instant, compacted); + HoodieInstant completedInstant = + new HoodieInstant(State.COMPLETED, completedAction, instant.getTimestamp()); + if (newTimelineInstants.contains(completedInstant)) { + return Pair.of(instant, completedInstant); } - HoodieInstant inflightCompacted = - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, instant.getTimestamp()); - if (newTimelineInstants.contains(inflightCompacted)) { - return Pair.of(instant, inflightCompacted); + HoodieInstant inflightInstant = + new HoodieInstant(State.INFLIGHT, pendingAction, instant.getTimestamp()); + if (newTimelineInstants.contains(inflightInstant)) { + return Pair.of(instant, inflightInstant); } return Pair.<HoodieInstant, HoodieInstant>of(instant, null); }