This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8bf9ac789 [core] Add option to auto complete missing tags (#3648)
8bf9ac789 is described below
commit 8bf9ac789ad5bef95381d47838531257c604baf8
Author: luowanghaoyun <[email protected]>
AuthorDate: Fri Jul 5 20:47:16 2024 +0800
[core] Add option to auto complete missing tags (#3648)
---
.../shortcodes/generated/core_configuration.html | 6 +++++
.../main/java/org/apache/paimon/CoreOptions.java | 10 +++++++
.../org/apache/paimon/tag/TagAutoCreation.java | 7 +++++
.../org/apache/paimon/tag/TagAutoManagerTest.java | 31 ++++++++++++++++++++++
4 files changed, 54 insertions(+)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index e66438fcd..6c0e3f47c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -720,6 +720,12 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>Boolean</td>
<td>Whether to read the changes from overwrite in streaming mode.
Cannot be set to true when changelog producer is full-compaction or lookup
because it will read duplicated changes.</td>
</tr>
+ <tr>
+ <td><h5>tag.automatic-completion</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to automatically complete missing tags.</td>
+ </tr>
<tr>
<td><h5>tag.automatic-creation</h5></td>
<td style="word-wrap: break-word;">none</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0f4a4afd0..ab2efdaa2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1069,6 +1069,12 @@ public class CoreOptions implements Serializable {
"The default maximum time retained for newly
created tags. "
+ "It affects both auto-created tags and
manually created (by procedure) tags.");
+ public static final ConfigOption<Boolean> TAG_AUTOMATIC_COMPLETION =
+ key("tag.automatic-completion")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to automatically complete
missing tags.");
+
public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT
=
key("snapshot.watermark-idle-timeout")
.durationType()
@@ -1825,6 +1831,10 @@ public class CoreOptions implements Serializable {
return options.get(TAG_DEFAULT_TIME_RETAINED);
}
+ public boolean tagAutomaticCompletion() {
+ return options.get(TAG_AUTOMATIC_COMPLETION);
+ }
+
public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index af7947925..58241033f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -59,6 +59,7 @@ public class TagAutoCreation {
@Nullable private final Duration defaultTimeRetained;
private final List<TagCallback> callbacks;
private final Duration idlenessTimeout;
+ private final boolean automaticCompletion;
private LocalDateTime nextTag;
private long nextSnapshot;
@@ -73,6 +74,7 @@ public class TagAutoCreation {
@Nullable Integer numRetainedMax,
@Nullable Duration defaultTimeRetained,
Duration idlenessTimeout,
+ boolean automaticCompletion,
List<TagCallback> callbacks) {
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
@@ -84,6 +86,7 @@ public class TagAutoCreation {
this.defaultTimeRetained = defaultTimeRetained;
this.callbacks = callbacks;
this.idlenessTimeout = idlenessTimeout;
+ this.automaticCompletion = automaticCompletion;
this.periodHandler.validateDelay(delay);
@@ -155,6 +158,9 @@ public class TagAutoCreation {
if (nextTag == null
|| isAfterOrEqual(time.minus(delay),
periodHandler.nextTagTime(nextTag))) {
LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time);
+ if (automaticCompletion && nextTag != null) {
+ thisTag = nextTag;
+ }
String tagName = periodHandler.timeToTag(thisTag);
if (!tagManager.tagExists(tagName)) {
tagManager.createTag(snapshot, tagName, defaultTimeRetained,
callbacks);
@@ -221,6 +227,7 @@ public class TagAutoCreation {
options.tagNumRetainedMax(),
options.tagDefaultTimeRetained(),
options.snapshotWatermarkIdleTimeout(),
+ options.tagAutomaticCompletion(),
callbacks);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index 84678048a..1bebbe5fb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -40,6 +40,7 @@ import static
org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.SNAPSHOT_WATERMARK_IDLE_TIMEOUT;
+import static org.apache.paimon.CoreOptions.TAG_AUTOMATIC_COMPLETION;
import static org.apache.paimon.CoreOptions.TAG_AUTOMATIC_CREATION;
import static org.apache.paimon.CoreOptions.TAG_CREATION_DELAY;
import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD;
@@ -425,6 +426,36 @@ public class TagAutoManagerTest extends
PrimaryKeyTableTestBase {
commit.close();
}
+ @Test
+ public void testAutoCompleteTags() throws Exception {
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
+ options.set(TAG_NUM_RETAINED_MAX, 3);
+ options.set(TAG_AUTOMATIC_COMPLETION, true);
+ FileStoreTable table = this.table.copy(options.toMap());
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.store().newTagManager();
+
+ // test normal creation
+ commit.commit(new ManifestCommittable(0,
utcMills("2024-06-26T16:12:00")));
+ assertThat(tagManager.allTagNames()).containsOnly("2024-06-26 15");
+
+ // task stop 2h...
+
+ // task restart after 18:00
+ // first commit, add tag 2024-06-26 16
+ commit.commit(new ManifestCommittable(1,
utcMills("2024-06-26T18:05:00")));
+ assertThat(tagManager.allTagNames()).containsOnly("2024-06-26 15",
"2024-06-26 16");
+
+ // second commit, add tag 2024-06-26 17
+ commit.commit(new ManifestCommittable(2,
utcMills("2024-06-26T18:10:00")));
+ assertThat(tagManager.allTagNames())
+ .containsOnly("2024-06-26 15", "2024-06-26 16", "2024-06-26
17");
+
+ commit.close();
+ }
+
private long localZoneMills(String timestamp) {
return LocalDateTime.parse(timestamp)
.atZone(ZoneId.systemDefault())