This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bb8fc3e9f63 [HUDI-6929] Lazy loading dynamically for 
CompletionTimeQueryView (#9898)
bb8fc3e9f63 is described below

commit bb8fc3e9f632a1fc3647fda63d482849355df2b7
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Mon Oct 23 11:39:25 2023 +0800

    [HUDI-6929] Lazy loading dynamically for CompletionTimeQueryView (#9898)
---
 .../timeline/TestCompletionTimeQueryView.java      |  1 +
 .../table/timeline/CompletionTimeQueryView.java    | 67 ++++++++++------------
 .../table/timeline/HoodieArchivedTimeline.java     | 20 +++++--
 .../hudi/common/table/timeline/HoodieTimeline.java |  8 +++
 4 files changed, 54 insertions(+), 42 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
index ae34f4d606f..9df49d4b9d0 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
@@ -82,6 +82,7 @@ public class TestCompletionTimeQueryView {
       for (int i = 1; i < 3; i++) {
         assertThat(view.getCompletionTime(String.format("%08d", 
i)).orElse(""), is(String.format("%08d", i + 1000)));
       }
+      assertThat("The cursor instant should be slided", 
view.getCursorInstant(), is(String.format("%08d", 1)));
       // query with inflight start time
       assertFalse(view.getCompletionTime(String.format("%08d", 
11)).isPresent());
       // query with non-exist start time
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 290f31ff344..1e2881809f3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -30,8 +30,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
-import static org.apache.hudi.common.table.timeline.HoodieTimeline.EQUALS;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
@@ -53,9 +51,12 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
   private final Map<String, String> startToCompletionInstantTimeMap;
 
   /**
-   * The start instant time to eagerly load from, by default load last N days 
of completed instants.
+   * The cursor instant time to eagerly load from, by default load last N days 
of completed instants.
+   * It is tuned dynamically with lazy loading occurs, assumes an initial 
cursor instant as t10,
+   * a completion query for t5 would trigger a lazy loading with this cursor 
instant been updated as t5.
+   * The sliding of the cursor instant economizes redundant loading from 
different queries.
    */
-  private final String startInstant;
+  private volatile String cursorInstant;
 
   /**
    * The first write instant on the active timeline, used for query 
optimization.
@@ -75,12 +76,12 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
    * The constructor.
    *
    * @param metaClient   The table meta client.
-   * @param startInstant The earliest instant time to eagerly load from, by 
default load last N days of completed instants.
+   * @param cursorInstant The earliest instant time to eagerly load from, by 
default load last N days of completed instants.
    */
-  public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String 
startInstant) {
+  public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String 
cursorInstant) {
     this.metaClient = metaClient;
     this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
-    this.startInstant = minInstant(startInstant, 
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
+    this.cursorInstant = minInstant(cursorInstant, 
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
     // Note: use getWriteTimeline() to keep sync with the fs view 
visibleCommitsAndCompactionTimeline, see 
AbstractTableFileSystemView.refreshTimeline.
     this.firstNonSavepointCommit = 
metaClient.getActiveTimeline().getWriteTimeline().getFirstNonSavepointCommit().map(HoodieInstant::getTimestamp).orElse("");
     load();
@@ -132,10 +133,10 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
         // ==============================================================
         // LEGACY CODE
         // ==============================================================
-        // Fixes the completion time to reflect the completion sequence 
correctly
-        // if the file slice base instant time is not in datetime format. For 
example,
-        // 1. many test cases just use integer string as the instant time.
-        // 2. MDT uses compaction instant time as [delta_instant] + "001".
+        // Fixes the completion time to reflect the completion sequence 
correctly.
+        // The file slice base instant time is not in datetime format in the 
following scenarios:
+        //   1. many test cases just use integer string as the instant time.
+        //   2. MDT uses compaction instant time with pattern [delta_instant] 
+ "001".
 
         // CAUTION: this fix only works for OCC(Optimistic Concurrency 
Control).
         // for NB-CC(Non-blocking Concurrency Control), the file slicing may 
be incorrect.
@@ -157,17 +158,23 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
     if (completionTime != null) {
       return Option.of(completionTime);
     }
-    if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN, 
this.startInstant)) {
+    if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN_OR_EQUALS, 
this.cursorInstant)) {
       // the instant is still pending
       return Option.empty();
     }
     // the 'startTime' should be out of the eager loading range, switch to a 
lazy loading.
     // This operation is resource costly.
-    HoodieArchivedTimeline.loadInstants(metaClient,
-        new EqualsTimestampFilter(startTime),
-        HoodieArchivedTimeline.LoadMode.SLIM,
-        r -> true,
-        this::readCompletionTime);
+    synchronized (this) {
+      if (HoodieTimeline.compareTimestamps(startTime, LESSER_THAN, 
this.cursorInstant)) {
+        HoodieArchivedTimeline.loadInstants(metaClient,
+            new HoodieArchivedTimeline.ClosedOpenTimeRangeFilter(startTime, 
this.cursorInstant),
+            HoodieArchivedTimeline.LoadMode.SLIM,
+            r -> true,
+            this::readCompletionTime);
+      }
+      // refresh the start instant
+      this.cursorInstant = startTime;
+    }
     return 
Option.ofNullable(this.startToCompletionInstantTimeMap.get(startTime));
   }
 
@@ -183,7 +190,7 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
         .forEach(instant -> setCompletionTime(instant.getTimestamp(), 
instant.getCompletionTime()));
     // then load the archived instants.
     HoodieArchivedTimeline.loadInstants(metaClient,
-        new HoodieArchivedTimeline.StartTsFilter(this.startInstant),
+        new HoodieArchivedTimeline.StartTsFilter(this.cursorInstant),
         HoodieArchivedTimeline.LoadMode.SLIM,
         r -> true,
         this::readCompletionTime);
@@ -206,28 +213,12 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
     return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 : 
instant2;
   }
 
+  public String getCursorInstant() {
+    return cursorInstant;
+  }
+
   @Override
   public void close() throws Exception {
     this.startToCompletionInstantTimeMap.clear();
   }
-
-  // -------------------------------------------------------------------------
-  //  Inner class
-  // -------------------------------------------------------------------------
-
-  /**
-   * A time based filter with equality of specified timestamp.
-   */
-  public static class EqualsTimestampFilter extends 
HoodieArchivedTimeline.TimeRangeFilter {
-    private final String ts;
-
-    public EqualsTimestampFilter(String ts) {
-      super(ts, ts); // endTs is never used
-      this.ts = ts;
-    }
-
-    public boolean isInRange(String instantTime) {
-      return HoodieTimeline.compareTimestamps(instantTime, EQUALS, ts);
-    }
-  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index bdd5750684e..cdffd4c0b3c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -288,8 +288,8 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
    * A time based filter with range (startTs, endTs].
    */
   public static class TimeRangeFilter {
-    private final String startTs;
-    private final String endTs;
+    protected final String startTs;
+    protected final String endTs;
 
     public TimeRangeFilter(String startTs, String endTs) {
       this.startTs = startTs;
@@ -301,15 +301,27 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     }
   }
 
+  /**
+   * A time based filter with range [startTs, endTs).
+   */
+  public static class ClosedOpenTimeRangeFilter extends TimeRangeFilter {
+
+    public ClosedOpenTimeRangeFilter(String startTs, String endTs) {
+      super(startTs, endTs);
+    }
+
+    public boolean isInRange(String instantTime) {
+      return HoodieTimeline.isInClosedOpenRange(instantTime, this.startTs, 
this.endTs);
+    }
+  }
+
   /**
    * A time based filter with range [startTs, +&#8734).
    */
   public static class StartTsFilter extends TimeRangeFilter {
-    private final String startTs;
 
     public StartTsFilter(String startTs) {
       super(startTs, null); // endTs is never used
-      this.startTs = startTs;
     }
 
     public boolean isInRange(String instantTime) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 8644204ab82..82ec439bd25 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -423,6 +423,14 @@ public interface HoodieTimeline extends Serializable {
             && HoodieTimeline.compareTimestamps(timestamp, 
LESSER_THAN_OR_EQUALS, endTs);
   }
 
+  /**
+   * Return true if specified timestamp is in range [startTs, endTs).
+   */
+  static boolean isInClosedOpenRange(String timestamp, String startTs, String 
endTs) {
+    return HoodieTimeline.compareTimestamps(timestamp, GREATER_THAN_OR_EQUALS, 
startTs)
+        && HoodieTimeline.compareTimestamps(timestamp, LESSER_THAN, endTs);
+  }
+
   /**
    * Return true if specified timestamp is in range [startTs, endTs].
    */

Reply via email to