This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 17977e8297 [spark] Support create auto tag syntax (#5909)
17977e8297 is described below

commit 17977e829755973ecdd6b0a6990c4ccb7d48f217
Author: JackeyLee007 <[email protected]>
AuthorDate: Fri Jul 18 13:50:20 2025 +0800

    [spark] Support create auto tag syntax (#5909)
---
 docs/content/flink/procedures.md                   | 13 ++++
 docs/content/spark/procedures.md                   | 10 +++
 .../paimon/privilege/PrivilegedFileStoreTable.java |  6 ++
 .../paimon/table/AbstractFileStoreTable.java       |  5 ++
 .../paimon/table/DelegatedFileStoreTable.java      |  5 ++
 .../org/apache/paimon/table/FileStoreTable.java    |  2 +
 .../TriggerTagAutomaticCreationProcedure.java      | 53 +++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  1 +
 ...TriggerTagAutomaticCreationProcedureITCase.java | 56 ++++++++++++++
 .../org/apache/paimon/spark/SparkProcedures.java   |  3 +
 .../TriggerTagAutomaticCreationProcedure.java      | 88 ++++++++++++++++++++++
 .../TriggerTagAutomaticCreationProcedureTest.scala | 58 ++++++++++++++
 12 files changed, 300 insertions(+)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 0e338b40b4..3757f0e661 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -269,6 +269,19 @@ All available procedures are listed below.
          CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 
11:00:00')
       </td>
    </tr>
+   <tr>
+      <td>trigger_tag_automatic_creation</td>
+      <td>
+         CALL [catalog.]sys.trigger_tag_automatic_creation('identifier')
+      </td>
+      <td>
+         Trigger the tag automatic creation. Arguments:
+            <li>table: the target table identifier. Cannot be empty.</li>
+      </td>
+      <td>
+         CALL sys.trigger_tag_automatic_creation(table => 'default.T')
+      </td>
+   </tr>
    <tr>
       <td>merge_into</td>
       <td>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index dced60f9df..89858ad5a6 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -161,6 +161,16 @@ This section introduce all available spark procedures 
about paimon.
          CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 
11:00:00')
       </td>
     </tr>
+    <tr>
+      <td>trigger_tag_automatic_creation</td>
+      <td>
+         Trigger the tag automatic creation. Arguments:
+            <li>table: the target table identifier. Cannot be empty.</li>
+      </td>
+      <td>
+         CALL sys.trigger_tag_automatic_creation(table => 'default.T')
+      </td>
+    </tr>
     <tr>
       <td>rollback</td>
       <td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 0466f50555..5e513860bb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -149,6 +149,12 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
         wrapped.createTag(tagName, fromSnapshotId, timeRetained);
     }
 
+    @Override
+    public void createAutoTag() {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createAutoTag();
+    }
+
     @Override
     public void deleteTag(String tagName) {
         privilegeChecker.assertCanInsert(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 5dd13a37c0..5f80ddc3b1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -562,6 +562,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         return snapshot;
     }
 
+    @Override
+    public void createAutoTag() {
+        store().newTagCreationManager(this).run();
+    }
+
     @Override
     public void createTag(String tagName, long fromSnapshotId) {
         createTag(tagName, findSnapshot(fromSnapshotId), 
coreOptions().tagDefaultTimeRetained());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index a0e33f620a..3065f5fa19 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -223,6 +223,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         wrapped.createTag(tagName, fromSnapshotId, timeRetained);
     }
 
+    @Override
+    public void createAutoTag() {
+        wrapped.createAutoTag();
+    }
+
     @Override
     public void renameTag(String tagName, String targetTagName) {
         wrapped.renameTag(tagName, targetTagName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index e7b8129e3c..1e92ef3b8a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -133,6 +133,8 @@ public interface FileStoreTable extends DataTable {
     @Override
     FileStoreTable switchToBranch(String branchName);
 
+    void createAutoTag();
+
     /** Purge all files in this table. */
     default void purgeFiles() throws Exception {
         // clear branches
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java
new file mode 100644
index 0000000000..60160f49c3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+/**
+ * A procedure to trigger tag automatic creation for a table. Usage:
+ *
+ * <pre><code>
+ *  -- create an auto tag if this is a tag automatic creation table.
+ *  CALL sys.trigger_tag_automatic_creation(`table` => 'tableId')
+ * </code></pre>
+ */
+public class TriggerTagAutomaticCreationProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "trigger_tag_automatic_creation";
+
+    @ProcedureHint(argument = {@ArgumentHint(name = "table", type = 
@DataTypeHint("STRING"))})
+    public @DataTypeHint("ROW<result STRING>") Row[] call(
+            ProcedureContext procedureContext, String tableId) throws 
Exception {
+        ((FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId))).createAutoTag();
+        return new Row[] {Row.of("Success")};
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index fbc33ca521..56ac74c698 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -95,3 +95,4 @@ org.apache.paimon.flink.procedure.CreateFunctionProcedure
 org.apache.paimon.flink.procedure.DropFunctionProcedure
 org.apache.paimon.flink.procedure.AlterFunctionProcedure
 org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
+org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java
new file mode 100644
index 0000000000..8605b67be8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link PurgeFilesProcedure}. */
+public class TriggerTagAutomaticCreationProcedureITCase extends 
CatalogITCaseBase {
+
+    @Test
+    public void testTriggerTagAutomaticCreation() {
+        sql(
+                "CREATE TABLE T (id INT, name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED)"
+                        + " WITH ('bucket'='1')");
+
+        sql("INSERT INTO T VALUES (1, 'a')");
+        assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
+
+        assertThat(sql("select tag_name from 
`T$tags`").stream().map(Row::toString))
+                .isNullOrEmpty();
+
+        sql(
+                "alter table T set ("
+                        + "'tag.automatic-creation'='process-time',"
+                        + "'tag.creation-period'='daily',"
+                        + "'tag.creation-delay'='10 m',"
+                        + "'tag.num-retained-max'='90')");
+
+        sql("CALL sys.trigger_tag_automatic_creation(`table` => 'default.T')");
+        assertThat(sql("select tag_name from 
`T$tags`").stream().map(Row::toString))
+                .isNotNull()
+                .isNotEmpty();
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 6818b334e3..45341e9b94 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -49,6 +49,7 @@ import 
org.apache.paimon.spark.procedure.ResetConsumerProcedure;
 import org.apache.paimon.spark.procedure.RollbackProcedure;
 import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure;
 import org.apache.paimon.spark.procedure.RollbackToWatermarkProcedure;
+import org.apache.paimon.spark.procedure.TriggerTagAutomaticCreationProcedure;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
@@ -106,6 +107,8 @@ public class SparkProcedures {
         procedureBuilders.put("create_function", 
CreateFunctionProcedure::builder);
         procedureBuilders.put("alter_function", 
AlterFunctionProcedure::builder);
         procedureBuilders.put("drop_function", DropFunctionProcedure::builder);
+        procedureBuilders.put(
+                "trigger_tag_automatic_creation", 
TriggerTagAutomaticCreationProcedure::builder);
         return procedureBuilders.build();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java
new file mode 100644
index 0000000000..0bec16bff1
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** A procedure to trigger the tag automatic creation for a table. */
+public class TriggerTagAutomaticCreationProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {ProcedureParameter.required("table", 
StringType)};
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    private TriggerTagAutomaticCreationProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        return modifyPaimonTable(
+                tableIdent,
+                table -> {
+                    try {
+                        ((FileStoreTable) table).createAutoTag();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+
+                    return new InternalRow[] {newInternalRow(true)};
+                });
+    }
+
+    public static ProcedureBuilder builder() {
+        return new Builder<TriggerTagAutomaticCreationProcedure>() {
+            @Override
+            public TriggerTagAutomaticCreationProcedure doBuild() {
+                return new 
TriggerTagAutomaticCreationProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "TriggerTagAutomaticCreationProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala
new file mode 100644
index 0000000000..422cf23060
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+class TriggerTagAutomaticCreationProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon procedure: trigger tag automatic creation test") {
+    spark.sql("""CREATE TABLE T (id INT, name STRING)
+                |USING PAIMON
+                |TBLPROPERTIES (
+                |'primary-key'='id'
+                |)""".stripMargin)
+
+    spark.sql("insert into T values(1, 'a')")
+
+    val table = loadTable("T")
+    assertResult(1)(table.snapshotManager().snapshotCount())
+
+    assertResult(0)(spark.sql("show tags T").count())
+
+    spark.sql("""alter table T set tblproperties(
+                |'tag.automatic-creation'='process-time',
+                |'tag.creation-period'='daily',
+                |'tag.creation-delay'='10 m',
+                |'tag.num-retained-max'='90'
+                |)""".stripMargin)
+
+    spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table => 
'test.T')")
+    assertResult(1)(spark.sql("show tags T").count())
+    assertResult(
+      spark
+        .sql("select date_format(date_sub(current_date(), 1), 'yyyy-MM-dd')")
+        .head()
+        
.getString(0))(loadTable("T").tagManager().tagObjects().get(0).getRight)
+  }
+
+}

Reply via email to