This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f46484668 [core] Support enable and disable tag time expiration 
(#6884)
6f46484668 is described below

commit 6f464846680b8b4ae815b6a27dcefb18f46eff18
Author: yuzelin <[email protected]>
AuthorDate: Thu Dec 25 10:39:22 2025 +0800

    [core] Support enable and disable tag time expiration (#6884)
---
 docs/layouts/shortcodes/generated/core_configuration.html |  6 ++++++
 .../src/main/java/org/apache/paimon/CoreOptions.java      | 10 ++++++++++
 .../main/java/org/apache/paimon/tag/TagAutoManager.java   | 15 +++++++++++----
 .../paimon/flink/procedure/ExpireTagsProcedure.java       |  6 ++++++
 .../procedure/ProcedurePositionalArgumentsITCase.java     |  9 +++++++--
 .../paimon/flink/procedure/ExpireTagsProcedure.java       |  6 ++++++
 .../apache/paimon/flink/action/ExpireTagsActionTest.java  |  6 +++++-
 .../paimon/flink/procedure/ExpireTagsProcedureITCase.java |  5 ++++-
 .../paimon/spark/procedure/ExpireTagsProcedure.java       |  6 ++++++
 .../paimon/spark/procedure/ExpireTagsProcedureTest.scala  |  2 ++
 10 files changed, 63 insertions(+), 8 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 027d798c06..fc7a0c3839 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1362,6 +1362,12 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td><p>Enum</p></td>
             <td>The date format for tag periods.<br /><br />Possible 
values:<ul><li>"with_dashes": Dates and hours with dashes, e.g., 'yyyy-MM-dd 
HH'</li><li>"without_dashes": Dates and hours without dashes, e.g., 'yyyyMMdd 
HH'</li><li>"without_dashes_and_spaces": Dates and hours without dashes and 
spaces, e.g., 'yyyyMMddHH'</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>tag.time-expire-enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enable tag expiration by retained time.</td>
+        </tr>
         <tr>
             <td><h5>target-file-size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d5bd5789e8..20aa5182d6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1635,6 +1635,12 @@ public class CoreOptions implements Serializable {
                             "The default maximum time retained for newly 
created tags. "
                                     + "It affects both auto-created tags and 
manually created (by procedure) tags.");
 
+    public static final ConfigOption<Boolean> TAG_TIME_EXPIRE_ENABLED =
+            key("tag.time-expire-enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether to enable tag expiration by 
retained time.");
+
     public static final ConfigOption<Boolean> TAG_AUTOMATIC_COMPLETION =
             key("tag.automatic-completion")
                     .booleanType()
@@ -3053,6 +3059,10 @@ public class CoreOptions implements Serializable {
         return options.get(TAG_DEFAULT_TIME_RETAINED);
     }
 
+    public boolean tagTimeExpireEnabled() {
+        return options.get(TAG_TIME_EXPIRE_ENABLED);
+    }
+
     public boolean tagAutomaticCompletion() {
         return options.get(TAG_AUTOMATIC_COMPLETION);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
index 817c20af46..31ca385d1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java
@@ -24,15 +24,18 @@ import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /** A manager to create and expire tags. */
 public class TagAutoManager {
 
-    private final TagAutoCreation tagAutoCreation;
-    private final TagTimeExpire tagTimeExpire;
+    @Nullable private final TagAutoCreation tagAutoCreation;
+    @Nullable private final TagTimeExpire tagTimeExpire;
 
-    private TagAutoManager(TagAutoCreation tagAutoCreation, TagTimeExpire 
tagTimeExpire) {
+    private TagAutoManager(
+            @Nullable TagAutoCreation tagAutoCreation, @Nullable TagTimeExpire 
tagTimeExpire) {
         this.tagAutoCreation = tagAutoCreation;
         this.tagTimeExpire = tagTimeExpire;
     }
@@ -60,13 +63,17 @@ public class TagAutoManager {
                         ? null
                         : TagAutoCreation.create(
                                 options, snapshotManager, tagManager, 
tagDeletion, callbacks),
-                TagTimeExpire.create(snapshotManager, tagManager, tagDeletion, 
callbacks));
+                options.tagTimeExpireEnabled()
+                        ? TagTimeExpire.create(snapshotManager, tagManager, 
tagDeletion, callbacks)
+                        : null);
     }
 
+    @Nullable
     public TagAutoCreation getTagAutoCreation() {
         return tagAutoCreation;
     }
 
+    @Nullable
     public TagTimeExpire getTagTimeExpire() {
         return tagTimeExpire;
     }
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
index d2b649bd0f..037c4bb71d 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.procedure;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.tag.TagTimeExpire;
@@ -26,6 +27,7 @@ import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.flink.table.procedure.ProcedureContext;
 
 import java.time.LocalDateTime;
+import java.util.Collections;
 import java.util.List;
 import java.util.TimeZone;
 
@@ -42,6 +44,10 @@ public class ExpireTagsProcedure extends ProcedureBase {
     public String[] call(ProcedureContext procedureContext, String tableId, 
String olderThanStr)
             throws Catalog.TableNotExistException {
         FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+        fileStoreTable =
+                fileStoreTable.copy(
+                        Collections.singletonMap(
+                                CoreOptions.TAG_TIME_EXPIRE_ENABLED.key(), 
"true"));
         TagTimeExpire tagTimeExpire =
                 
fileStoreTable.store().newTagAutoManager(fileStoreTable).getTagTimeExpire();
         if (olderThanStr != null) {
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 2f32c326a7..5dac8cd200 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -499,14 +500,18 @@ public class ProcedurePositionalArgumentsITCase extends 
CatalogITCaseBase {
 
     @Test
     public void testExpireTags() throws Exception {
+        boolean tagTimeExpireEnabled = 
ThreadLocalRandom.current().nextBoolean();
         sql(
                 "CREATE TABLE T ("
                         + " k STRING,"
                         + " dt STRING,"
                         + " PRIMARY KEY (k, dt) NOT ENFORCED"
                         + ") PARTITIONED BY (dt) WITH ("
-                        + " 'bucket' = '1'"
-                        + ")");
+                        + " 'bucket' = '1',"
+                        + " 'tag.time-expire-enabled' = '%s'"
+                        + ")",
+                tagTimeExpireEnabled);
+
         FileStoreTable table = paimonTable("T");
         for (int i = 1; i <= 3; i++) {
             sql("INSERT INTO T VALUES ('" + i + "', '" + i + "')");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
index 3fdb36a18b..2c03cf31d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.procedure;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.tag.TagTimeExpire;
@@ -32,6 +33,7 @@ import org.apache.flink.types.Row;
 import javax.annotation.Nullable;
 
 import java.time.LocalDateTime;
+import java.util.Collections;
 import java.util.List;
 import java.util.TimeZone;
 
@@ -52,6 +54,10 @@ public class ExpireTagsProcedure extends ProcedureBase {
             ProcedureContext procedureContext, String tableId, @Nullable 
String olderThanStr)
             throws Catalog.TableNotExistException {
         FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+        fileStoreTable =
+                fileStoreTable.copy(
+                        Collections.singletonMap(
+                                CoreOptions.TAG_TIME_EXPIRE_ENABLED.key(), 
"true"));
         TagTimeExpire tagTimeExpire =
                 
fileStoreTable.store().newTagAutoManager(fileStoreTable).getTagTimeExpire();
         if (olderThanStr != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
index 73a6b10f48..b07cf50816 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.nio.file.Path;
 import java.time.LocalDateTime;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
@@ -47,10 +48,13 @@ public class ExpireTagsActionTest extends ActionITCaseBase {
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void testExpireTags(boolean startFlinkJob) throws Exception {
+        boolean tagTimeExpireEnabled = 
ThreadLocalRandom.current().nextBoolean();
         bEnv.executeSql(
                 "CREATE TABLE T (id STRING, name STRING,"
                         + " PRIMARY KEY (id) NOT ENFORCED)"
-                        + " WITH ('bucket'='1', 'write-only'='true')");
+                        + " WITH ('bucket'='1', 'write-only'='true', 
'tag.time-expire-enabled' = '"
+                        + tagTimeExpireEnabled
+                        + "')");
 
         expireTags(startFlinkJob);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
index 90e5bc6702..e44769e648 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.time.LocalDateTime;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -37,10 +38,12 @@ public class ExpireTagsProcedureITCase extends 
CatalogITCaseBase {
 
     @Test
     public void testExpireTagsByTagCreateTimeAndTagTimeRetained() throws 
Exception {
+        boolean tagTimeExpireEnabled = 
ThreadLocalRandom.current().nextBoolean();
         sql(
                 "CREATE TABLE T (id STRING, name STRING,"
                         + " PRIMARY KEY (id) NOT ENFORCED)"
-                        + " WITH ('bucket'='1', 'write-only'='true')");
+                        + " WITH ('bucket'='1', 'write-only'='true', 
'tag.time-expire-enabled'='%s')",
+                tagTimeExpireEnabled);
 
         FileStoreTable table = paimonTable("T");
         SnapshotManager snapshotManager = table.snapshotManager();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
index 17c56b7077..f8e685cf2e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.procedure;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.tag.TagTimeExpire;
 import org.apache.paimon.utils.DateTimeUtils;
@@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import java.time.LocalDateTime;
+import java.util.Collections;
 import java.util.List;
 import java.util.TimeZone;
 
@@ -73,6 +75,10 @@ public class ExpireTagsProcedure extends BaseProcedure {
                 tableIdent,
                 table -> {
                     FileStoreTable fileStoreTable = (FileStoreTable) table;
+                    fileStoreTable =
+                            fileStoreTable.copy(
+                                    Collections.singletonMap(
+                                            
CoreOptions.TAG_TIME_EXPIRE_ENABLED.key(), "true"));
                     TagTimeExpire tagTimeExpire =
                             fileStoreTable
                                     .store()
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
index 1ac9709c87..d1e1d3f52d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala
@@ -28,9 +28,11 @@ import org.assertj.core.api.Assertions.assertThat
 class ExpireTagsProcedureTest extends PaimonSparkTestBase {
 
   test("Paimon procedure: expire tags that reached its timeRetained") {
+    val tagTimeExpireEnabled = scala.util.Random.nextBoolean()
     spark.sql(s"""
                  |CREATE TABLE T (id STRING, name STRING)
                  |USING PAIMON
+                 |TBLPROPERTIES ('tag.time-expire-enabled' = 
'$tagTimeExpireEnabled')
                  |""".stripMargin)
 
     val table = loadTable("T")

Reply via email to