This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new bb8fc3e9f63 [HUDI-6929] Lazy loading dynamically for CompletionTimeQueryView (#9898) bb8fc3e9f63 is described below commit bb8fc3e9f632a1fc3647fda63d482849355df2b7 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Mon Oct 23 11:39:25 2023 +0800 [HUDI-6929] Lazy loading dynamically for CompletionTimeQueryView (#9898) --- .../timeline/TestCompletionTimeQueryView.java | 1 + .../table/timeline/CompletionTimeQueryView.java | 67 ++++++++++------------ .../table/timeline/HoodieArchivedTimeline.java | 20 +++++-- .../hudi/common/table/timeline/HoodieTimeline.java | 8 +++ 4 files changed, 54 insertions(+), 42 deletions(-) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java index ae34f4d606f..9df49d4b9d0 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java @@ -82,6 +82,7 @@ public class TestCompletionTimeQueryView { for (int i = 1; i < 3; i++) { assertThat(view.getCompletionTime(String.format("%08d", i)).orElse(""), is(String.format("%08d", i + 1000))); } + assertThat("The cursor instant should be slided", view.getCursorInstant(), is(String.format("%08d", 1))); // query with inflight start time assertFalse(view.getCompletionTime(String.format("%08d", 11)).isPresent()); // query with non-exist start time diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java index 290f31ff344..1e2881809f3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java @@ -30,8 +30,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import static org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.EQUALS; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; @@ -53,9 +51,12 @@ public class CompletionTimeQueryView implements AutoCloseable, Serializable { private final Map<String, String> startToCompletionInstantTimeMap; /** - * The start instant time to eagerly load from, by default load last N days of completed instants. + * The cursor instant time to eagerly load from, by default load last N days of completed instants. + * It is tuned dynamically with lazy loading occurs, assumes an initial cursor instant as t10, + * a completion query for t5 would trigger a lazy loading with this cursor instant been updated as t5. + * The sliding of the cursor instant economizes redundant loading from different queries. */ - private final String startInstant; + private volatile String cursorInstant; /** * The first write instant on the active timeline, used for query optimization. @@ -75,12 +76,12 @@ public class CompletionTimeQueryView implements AutoCloseable, Serializable { * The constructor. * * @param metaClient The table meta client. - * @param startInstant The earliest instant time to eagerly load from, by default load last N days of completed instants. + * @param cursorInstant The earliest instant time to eagerly load from, by default load last N days of completed instants. */ - public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String startInstant) { + public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String cursorInstant) { this.metaClient = metaClient; this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>(); - this.startInstant = minInstant(startInstant, metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse("")); + this.cursorInstant = minInstant(cursorInstant, metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse("")); // Note: use getWriteTimeline() to keep sync with the fs view visibleCommitsAndCompactionTimeline, see AbstractTableFileSystemView.refreshTimeline. this.firstNonSavepointCommit = metaClient.getActiveTimeline().getWriteTimeline().getFirstNonSavepointCommit().map(HoodieInstant::getTimestamp).orElse(""); load(); @@ -132,10 +133,10 @@ public class CompletionTimeQueryView implements AutoCloseable, Serializable { // ============================================================== // LEGACY CODE // ============================================================== - // Fixes the completion time to reflect the completion sequence correctly - // if the file slice base instant time is not in datetime format. For example, - // 1. many test cases just use integer string as the instant time. - // 2. MDT uses compaction instant time as [delta_instant] + "001". + // Fixes the completion time to reflect the completion sequence correctly. + // The file slice base instant time is not in datetime format in the following scenarios: + // 1. many test cases just use integer string as the instant time. + // 2. MDT uses compaction instant time with pattern [delta_instant] + "001". // CAUTION: this fix only works for OCC(Optimistic Concurrency Control). // for NB-CC(Non-blocking Concurrency Control), the file slicing may be incorrect. @@ -157,17 +158,23 @@ public class CompletionTimeQueryView implements AutoCloseable, Serializable { if (completionTime != null) { return Option.of(completionTime); } - if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN, this.startInstant)) { + if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN_OR_EQUALS, this.cursorInstant)) { // the instant is still pending return Option.empty(); } // the 'startTime' should be out of the eager loading range, switch to a lazy loading. // This operation is resource costly. - HoodieArchivedTimeline.loadInstants(metaClient, - new EqualsTimestampFilter(startTime), - HoodieArchivedTimeline.LoadMode.SLIM, - r -> true, - this::readCompletionTime); + synchronized (this) { + if (HoodieTimeline.compareTimestamps(startTime, LESSER_THAN, this.cursorInstant)) { + HoodieArchivedTimeline.loadInstants(metaClient, + new HoodieArchivedTimeline.ClosedOpenTimeRangeFilter(startTime, this.cursorInstant), + HoodieArchivedTimeline.LoadMode.SLIM, + r -> true, + this::readCompletionTime); + } + // refresh the start instant + this.cursorInstant = startTime; + } return Option.ofNullable(this.startToCompletionInstantTimeMap.get(startTime)); } @@ -183,7 +190,7 @@ public class CompletionTimeQueryView implements AutoCloseable, Serializable { .forEach(instant -> setCompletionTime(instant.getTimestamp(), instant.getCompletionTime())); // then load the archived instants. HoodieArchivedTimeline.loadInstants(metaClient, - new HoodieArchivedTimeline.StartTsFilter(this.startInstant), + new HoodieArchivedTimeline.StartTsFilter(this.cursorInstant), HoodieArchivedTimeline.LoadMode.SLIM, r -> true, this::readCompletionTime); @@ -206,28 +213,12 @@ public class CompletionTimeQueryView implements AutoCloseable, Serializable { return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 : instant2; } + public String getCursorInstant() { + return cursorInstant; + } + @Override public void close() throws Exception { this.startToCompletionInstantTimeMap.clear(); } - - // ------------------------------------------------------------------------- - // Inner class - // ------------------------------------------------------------------------- - - /** - * A time based filter with equality of specified timestamp. - */ - public static class EqualsTimestampFilter extends HoodieArchivedTimeline.TimeRangeFilter { - private final String ts; - - public EqualsTimestampFilter(String ts) { - super(ts, ts); // endTs is never used - this.ts = ts; - } - - public boolean isInRange(String instantTime) { - return HoodieTimeline.compareTimestamps(instantTime, EQUALS, ts); - } - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index bdd5750684e..cdffd4c0b3c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -288,8 +288,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { * A time based filter with range (startTs, endTs]. */ public static class TimeRangeFilter { - private final String startTs; - private final String endTs; + protected final String startTs; + protected final String endTs; public TimeRangeFilter(String startTs, String endTs) { this.startTs = startTs; @@ -301,15 +301,27 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { } } + /** + * A time based filter with range [startTs, endTs). + */ + public static class ClosedOpenTimeRangeFilter extends TimeRangeFilter { + + public ClosedOpenTimeRangeFilter(String startTs, String endTs) { + super(startTs, endTs); + } + + public boolean isInRange(String instantTime) { + return HoodieTimeline.isInClosedOpenRange(instantTime, this.startTs, this.endTs); + } + } + /** * A time based filter with range [startTs, +∞). */ public static class StartTsFilter extends TimeRangeFilter { - private final String startTs; public StartTsFilter(String startTs) { super(startTs, null); // endTs is never used - this.startTs = startTs; } public boolean isInRange(String instantTime) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 8644204ab82..82ec439bd25 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -423,6 +423,14 @@ public interface HoodieTimeline extends Serializable { && HoodieTimeline.compareTimestamps(timestamp, LESSER_THAN_OR_EQUALS, endTs); } + /** + * Return true if specified timestamp is in range [startTs, endTs). + */ + static boolean isInClosedOpenRange(String timestamp, String startTs, String endTs) { + return HoodieTimeline.compareTimestamps(timestamp, GREATER_THAN_OR_EQUALS, startTs) + && HoodieTimeline.compareTimestamps(timestamp, LESSER_THAN, endTs); + } + /** * Return true if specified timestamp is in range [startTs, endTs]. */