This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 44ab6f32bff [HUDI-6538] Refactor methods in TimelineDiffHelper class 
(#10938)
44ab6f32bff is described below

commit 44ab6f32bffbab8cd250bd0430d9591360f118e7
Author: wombatu-kun <wombatu...@gmail.com>
AuthorDate: Mon Apr 1 12:47:27 2024 +0700

    [HUDI-6538] Refactor methods in TimelineDiffHelper class (#10938)
---
 .../common/table/timeline/TimelineDiffHelper.java  | 66 +++++++---------------
 1 file changed, 21 insertions(+), 45 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
index aa7e2a30754..a98b71aa571 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
@@ -37,8 +37,11 @@ public class TimelineDiffHelper {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TimelineDiffHelper.class);
 
+  private TimelineDiffHelper() {
+  }
+
   public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline,
-      HoodieTimeline newTimeline) {
+                                                                    
HoodieTimeline newTimeline) {
 
     HoodieTimeline oldT = oldTimeline.filterCompletedAndCompactionInstants();
     HoodieTimeline newT = newTimeline.filterCompletedAndCompactionInstants();
@@ -57,14 +60,14 @@ public class TimelineDiffHelper {
       List<HoodieInstant> newInstants = new ArrayList<>();
 
       // Check If any pending compaction is lost. If so, do not allow 
incremental timeline sync
-      List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = 
getPendingCompactionTransitions(oldT, newT);
+      List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = 
getPendingActionTransitions(oldT.filterPendingCompactionTimeline(),
+          newT, HoodieTimeline.COMMIT_ACTION, 
HoodieTimeline.COMPACTION_ACTION);
       List<HoodieInstant> lostPendingCompactions = compactionInstants.stream()
           .filter(instantPair -> instantPair.getValue() == 
null).map(Pair::getKey).collect(Collectors.toList());
       if (!lostPendingCompactions.isEmpty()) {
         // If a compaction is unscheduled, fall back to complete refresh of fs 
view since some log files could have been
         // moved. Its unsafe to incrementally sync in that case.
-        LOG.warn("Some pending compactions are no longer in new timeline 
(unscheduled ?). They are :"
-            + lostPendingCompactions);
+        LOG.warn("Some pending compactions are no longer in new timeline 
(unscheduled ?). They are: {}", lostPendingCompactions);
         return TimelineDiffResult.UNSAFE_SYNC_RESULT;
       }
       List<HoodieInstant> finishedCompactionInstants = 
compactionInstants.stream()
@@ -74,7 +77,8 @@ public class TimelineDiffHelper {
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
-      List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
+      List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingActionTransitions(oldTimeline.filterPendingLogCompactionTimeline(),
+          newTimeline, HoodieTimeline.DELTA_COMMIT_ACTION, 
HoodieTimeline.LOG_COMPACTION_ACTION);
       List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
@@ -87,52 +91,24 @@ public class TimelineDiffHelper {
     }
   }
 
-  /**
-   * Getting pending log compaction transitions.
-   */
-  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingLogCompactionTransitions(HoodieTimeline oldTimeline,
-                                                                               
           HoodieTimeline newTimeline) {
-    Set<HoodieInstant> newTimelineInstants = 
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
-
-    return 
oldTimeline.filterPendingLogCompactionTimeline().getInstantsAsStream().map(instant
 -> {
-      if (newTimelineInstants.contains(instant)) {
-        return Pair.of(instant, instant);
-      } else {
-        HoodieInstant logCompacted =
-            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp());
-        if (newTimelineInstants.contains(logCompacted)) {
-          return Pair.of(instant, logCompacted);
-        }
-        HoodieInstant inflightLogCompacted =
-            new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, instant.getTimestamp());
-        if (newTimelineInstants.contains(inflightLogCompacted)) {
-          return Pair.of(instant, inflightLogCompacted);
-        }
-        return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
-      }
-    }).collect(Collectors.toList());
-  }
-
-  /**
-   * Getting pending compaction transitions.
-   */
-  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingCompactionTransitions(HoodieTimeline oldTimeline,
-      HoodieTimeline newTimeline) {
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingActionTransitions(HoodieTimeline pendingActionTimelineFromOld,
+                                                                               
              HoodieTimeline newTimeline,
+                                                                               
              String completedAction, String pendingAction) {
     Set<HoodieInstant> newTimelineInstants = 
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
 
-    return 
oldTimeline.filterPendingCompactionTimeline().getInstantsAsStream().map(instant 
-> {
+    return pendingActionTimelineFromOld.getInstantsAsStream().map(instant -> {
       if (newTimelineInstants.contains(instant)) {
         return Pair.of(instant, instant);
       } else {
-        HoodieInstant compacted =
-            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
instant.getTimestamp());
-        if (newTimelineInstants.contains(compacted)) {
-          return Pair.of(instant, compacted);
+        HoodieInstant completedInstant =
+            new HoodieInstant(State.COMPLETED, completedAction, 
instant.getTimestamp());
+        if (newTimelineInstants.contains(completedInstant)) {
+          return Pair.of(instant, completedInstant);
         }
-        HoodieInstant inflightCompacted =
-            new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.COMPACTION_ACTION, instant.getTimestamp());
-        if (newTimelineInstants.contains(inflightCompacted)) {
-          return Pair.of(instant, inflightCompacted);
+        HoodieInstant inflightInstant =
+            new HoodieInstant(State.INFLIGHT, pendingAction, 
instant.getTimestamp());
+        if (newTimelineInstants.contains(inflightInstant)) {
+          return Pair.of(instant, inflightInstant);
         }
         return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
       }

Reply via email to