This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bf9ac789 [core] Add option to auto complete missing tags (#3648)
8bf9ac789 is described below

commit 8bf9ac789ad5bef95381d47838531257c604baf8
Author: luowanghaoyun <[email protected]>
AuthorDate: Fri Jul 5 20:47:16 2024 +0800

    [core] Add option to auto complete missing tags (#3648)
---
 .../shortcodes/generated/core_configuration.html   |  6 +++++
 .../main/java/org/apache/paimon/CoreOptions.java   | 10 +++++++
 .../org/apache/paimon/tag/TagAutoCreation.java     |  7 +++++
 .../org/apache/paimon/tag/TagAutoManagerTest.java  | 31 ++++++++++++++++++++++
 4 files changed, 54 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index e66438fcd..6c0e3f47c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -720,6 +720,12 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td>Boolean</td>
             <td>Whether to read the changes from overwrite in streaming mode. 
Cannot be set to true when changelog producer is full-compaction or lookup 
because it will read duplicated changes.</td>
         </tr>
+        <tr>
+            <td><h5>tag.automatic-completion</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to automatically complete missing tags.</td>
+        </tr>
         <tr>
             <td><h5>tag.automatic-creation</h5></td>
             <td style="word-wrap: break-word;">none</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0f4a4afd0..ab2efdaa2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1069,6 +1069,12 @@ public class CoreOptions implements Serializable {
                             "The default maximum time retained for newly 
created tags. "
                                     + "It affects both auto-created tags and 
manually created (by procedure) tags.");
 
+    public static final ConfigOption<Boolean> TAG_AUTOMATIC_COMPLETION =
+            key("tag.automatic-completion")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to automatically complete 
missing tags.");
+
     public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT 
=
             key("snapshot.watermark-idle-timeout")
                     .durationType()
@@ -1825,6 +1831,10 @@ public class CoreOptions implements Serializable {
         return options.get(TAG_DEFAULT_TIME_RETAINED);
     }
 
+    public boolean tagAutomaticCompletion() {
+        return options.get(TAG_AUTOMATIC_COMPLETION);
+    }
+
     public Duration snapshotWatermarkIdleTimeout() {
         return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index af7947925..58241033f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -59,6 +59,7 @@ public class TagAutoCreation {
     @Nullable private final Duration defaultTimeRetained;
     private final List<TagCallback> callbacks;
     private final Duration idlenessTimeout;
+    private final boolean automaticCompletion;
 
     private LocalDateTime nextTag;
     private long nextSnapshot;
@@ -73,6 +74,7 @@ public class TagAutoCreation {
             @Nullable Integer numRetainedMax,
             @Nullable Duration defaultTimeRetained,
             Duration idlenessTimeout,
+            boolean automaticCompletion,
             List<TagCallback> callbacks) {
         this.snapshotManager = snapshotManager;
         this.tagManager = tagManager;
@@ -84,6 +86,7 @@ public class TagAutoCreation {
         this.defaultTimeRetained = defaultTimeRetained;
         this.callbacks = callbacks;
         this.idlenessTimeout = idlenessTimeout;
+        this.automaticCompletion = automaticCompletion;
 
         this.periodHandler.validateDelay(delay);
 
@@ -155,6 +158,9 @@ public class TagAutoCreation {
         if (nextTag == null
                 || isAfterOrEqual(time.minus(delay), 
periodHandler.nextTagTime(nextTag))) {
             LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time);
+            if (automaticCompletion && nextTag != null) {
+                thisTag = nextTag;
+            }
             String tagName = periodHandler.timeToTag(thisTag);
             if (!tagManager.tagExists(tagName)) {
                 tagManager.createTag(snapshot, tagName, defaultTimeRetained, 
callbacks);
@@ -221,6 +227,7 @@ public class TagAutoCreation {
                 options.tagNumRetainedMax(),
                 options.tagDefaultTimeRetained(),
                 options.snapshotWatermarkIdleTimeout(),
+                options.tagAutomaticCompletion(),
                 callbacks);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index 84678048a..1bebbe5fb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -40,6 +40,7 @@ import static 
org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_WATERMARK_IDLE_TIMEOUT;
+import static org.apache.paimon.CoreOptions.TAG_AUTOMATIC_COMPLETION;
 import static org.apache.paimon.CoreOptions.TAG_AUTOMATIC_CREATION;
 import static org.apache.paimon.CoreOptions.TAG_CREATION_DELAY;
 import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD;
@@ -425,6 +426,36 @@ public class TagAutoManagerTest extends 
PrimaryKeyTableTestBase {
         commit.close();
     }
 
+    @Test
+    public void testAutoCompleteTags() throws Exception {
+        Options options = new Options();
+        options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+        options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+        options.set(TAG_NUM_RETAINED_MAX, 3);
+        options.set(TAG_AUTOMATIC_COMPLETION, true);
+        FileStoreTable table = this.table.copy(options.toMap());
+        TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
+        TagManager tagManager = table.store().newTagManager();
+
+        // test normal creation
+        commit.commit(new ManifestCommittable(0, 
utcMills("2024-06-26T16:12:00")));
+        assertThat(tagManager.allTagNames()).containsOnly("2024-06-26 15");
+
+        // task stop 2h...
+
+        // task restart after 18:00
+        // first commit, add tag 2024-06-26 16
+        commit.commit(new ManifestCommittable(1, 
utcMills("2024-06-26T18:05:00")));
+        assertThat(tagManager.allTagNames()).containsOnly("2024-06-26 15", 
"2024-06-26 16");
+
+        // second commit, add tag 2024-06-26 17
+        commit.commit(new ManifestCommittable(2, 
utcMills("2024-06-26T18:10:00")));
+        assertThat(tagManager.allTagNames())
+                .containsOnly("2024-06-26 15", "2024-06-26 16", "2024-06-26 
17");
+
+        commit.close();
+    }
+
     private long localZoneMills(String timestamp) {
         return LocalDateTime.parse(timestamp)
                 .atZone(ZoneId.systemDefault())

Reply via email to