This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 17977e8297 [spark] Support create auto tag syntax (#5909)
17977e8297 is described below
commit 17977e829755973ecdd6b0a6990c4ccb7d48f217
Author: JackeyLee007 <[email protected]>
AuthorDate: Fri Jul 18 13:50:20 2025 +0800
[spark] Support create auto tag syntax (#5909)
---
docs/content/flink/procedures.md | 13 ++++
docs/content/spark/procedures.md | 10 +++
.../paimon/privilege/PrivilegedFileStoreTable.java | 6 ++
.../paimon/table/AbstractFileStoreTable.java | 5 ++
.../paimon/table/DelegatedFileStoreTable.java | 5 ++
.../org/apache/paimon/table/FileStoreTable.java | 2 +
.../TriggerTagAutomaticCreationProcedure.java | 53 +++++++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
...TriggerTagAutomaticCreationProcedureITCase.java | 56 ++++++++++++++
.../org/apache/paimon/spark/SparkProcedures.java | 3 +
.../TriggerTagAutomaticCreationProcedure.java | 88 ++++++++++++++++++++++
.../TriggerTagAutomaticCreationProcedureTest.scala | 58 ++++++++++++++
12 files changed, 300 insertions(+)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 0e338b40b4..3757f0e661 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -269,6 +269,19 @@ All available procedures are listed below.
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06
11:00:00')
</td>
</tr>
+ <tr>
+ <td>trigger_tag_automatic_creation</td>
+ <td>
+ CALL [catalog.]sys.trigger_tag_automatic_creation('identifier')
+ </td>
+ <td>
+ Trigger the tag automatic creation. Arguments:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ </td>
+ <td>
+ CALL sys.trigger_tag_automatic_creation(table => 'default.T')
+ </td>
+ </tr>
<tr>
<td>merge_into</td>
<td>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index dced60f9df..89858ad5a6 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -161,6 +161,16 @@ This section introduce all available spark procedures
about paimon.
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06
11:00:00')
</td>
</tr>
+ <tr>
+ <td>trigger_tag_automatic_creation</td>
+ <td>
+ Trigger the tag automatic creation. Arguments:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ </td>
+ <td>
+ CALL sys.trigger_tag_automatic_creation(table => 'default.T')
+ </td>
+ </tr>
<tr>
<td>rollback</td>
<td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 0466f50555..5e513860bb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -149,6 +149,12 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
wrapped.createTag(tagName, fromSnapshotId, timeRetained);
}
+ @Override
+ public void createAutoTag() {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createAutoTag();
+ }
+
@Override
public void deleteTag(String tagName) {
privilegeChecker.assertCanInsert(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 5dd13a37c0..5f80ddc3b1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -562,6 +562,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
return snapshot;
}
+ @Override
+ public void createAutoTag() {
+ store().newTagCreationManager(this).run();
+ }
+
@Override
public void createTag(String tagName, long fromSnapshotId) {
createTag(tagName, findSnapshot(fromSnapshotId),
coreOptions().tagDefaultTimeRetained());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index a0e33f620a..3065f5fa19 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -223,6 +223,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
wrapped.createTag(tagName, fromSnapshotId, timeRetained);
}
+ @Override
+ public void createAutoTag() {
+ wrapped.createAutoTag();
+ }
+
@Override
public void renameTag(String tagName, String targetTagName) {
wrapped.renameTag(tagName, targetTagName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index e7b8129e3c..1e92ef3b8a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -133,6 +133,8 @@ public interface FileStoreTable extends DataTable {
@Override
FileStoreTable switchToBranch(String branchName);
+ void createAutoTag();
+
/** Purge all files in this table. */
default void purgeFiles() throws Exception {
// clear branches
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java
new file mode 100644
index 0000000000..60160f49c3
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+/**
+ * A procedure to trigger tag automatic creation for a table. Usage:
+ *
+ * <pre><code>
+ * -- create an auto tag if this is a tag automatic creation table.
+ * CALL sys.trigger_tag_automatic_creation(`table` => 'tableId')
+ * </code></pre>
+ */
+public class TriggerTagAutomaticCreationProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "trigger_tag_automatic_creation";
+
+ @ProcedureHint(argument = {@ArgumentHint(name = "table", type =
@DataTypeHint("STRING"))})
+ public @DataTypeHint("ROW<result STRING>") Row[] call(
+ ProcedureContext procedureContext, String tableId) throws
Exception {
+ ((FileStoreTable)
catalog.getTable(Identifier.fromString(tableId))).createAutoTag();
+ return new Row[] {Row.of("Success")};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index fbc33ca521..56ac74c698 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -95,3 +95,4 @@ org.apache.paimon.flink.procedure.CreateFunctionProcedure
org.apache.paimon.flink.procedure.DropFunctionProcedure
org.apache.paimon.flink.procedure.AlterFunctionProcedure
org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
+org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java
new file mode 100644
index 0000000000..8605b67be8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedureITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link PurgeFilesProcedure}. */
+public class TriggerTagAutomaticCreationProcedureITCase extends
CatalogITCaseBase {
+
+ @Test
+ public void testTriggerTagAutomaticCreation() {
+ sql(
+ "CREATE TABLE T (id INT, name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED)"
+ + " WITH ('bucket'='1')");
+
+ sql("INSERT INTO T VALUES (1, 'a')");
+ assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
+
+ assertThat(sql("select tag_name from
`T$tags`").stream().map(Row::toString))
+ .isNullOrEmpty();
+
+ sql(
+ "alter table T set ("
+ + "'tag.automatic-creation'='process-time',"
+ + "'tag.creation-period'='daily',"
+ + "'tag.creation-delay'='10 m',"
+ + "'tag.num-retained-max'='90')");
+
+ sql("CALL sys.trigger_tag_automatic_creation(`table` => 'default.T')");
+ assertThat(sql("select tag_name from
`T$tags`").stream().map(Row::toString))
+ .isNotNull()
+ .isNotEmpty();
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 6818b334e3..45341e9b94 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -49,6 +49,7 @@ import
org.apache.paimon.spark.procedure.ResetConsumerProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;
import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure;
import org.apache.paimon.spark.procedure.RollbackToWatermarkProcedure;
+import org.apache.paimon.spark.procedure.TriggerTagAutomaticCreationProcedure;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -106,6 +107,8 @@ public class SparkProcedures {
procedureBuilders.put("create_function",
CreateFunctionProcedure::builder);
procedureBuilders.put("alter_function",
AlterFunctionProcedure::builder);
procedureBuilders.put("drop_function", DropFunctionProcedure::builder);
+ procedureBuilders.put(
+ "trigger_tag_automatic_creation",
TriggerTagAutomaticCreationProcedure::builder);
return procedureBuilders.build();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java
new file mode 100644
index 0000000000..0bec16bff1
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedure.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** A procedure to trigger the tag automatic creation for a table. */
+public class TriggerTagAutomaticCreationProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {ProcedureParameter.required("table",
StringType)};
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ });
+
+ private TriggerTagAutomaticCreationProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ try {
+ ((FileStoreTable) table).createAutoTag();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return new InternalRow[] {newInternalRow(true)};
+ });
+ }
+
+ public static ProcedureBuilder builder() {
+ return new Builder<TriggerTagAutomaticCreationProcedure>() {
+ @Override
+ public TriggerTagAutomaticCreationProcedure doBuild() {
+ return new
TriggerTagAutomaticCreationProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "TriggerTagAutomaticCreationProcedure";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala
new file mode 100644
index 0000000000..422cf23060
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/TriggerTagAutomaticCreationProcedureTest.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+class TriggerTagAutomaticCreationProcedureTest extends PaimonSparkTestBase {
+
+ test("Paimon procedure: trigger tag automatic creation test") {
+ spark.sql("""CREATE TABLE T (id INT, name STRING)
+ |USING PAIMON
+ |TBLPROPERTIES (
+ |'primary-key'='id'
+ |)""".stripMargin)
+
+ spark.sql("insert into T values(1, 'a')")
+
+ val table = loadTable("T")
+ assertResult(1)(table.snapshotManager().snapshotCount())
+
+ assertResult(0)(spark.sql("show tags T").count())
+
+ spark.sql("""alter table T set tblproperties(
+ |'tag.automatic-creation'='process-time',
+ |'tag.creation-period'='daily',
+ |'tag.creation-delay'='10 m',
+ |'tag.num-retained-max'='90'
+ |)""".stripMargin)
+
+ spark.sql("CALL paimon.sys.trigger_tag_automatic_creation(table =>
'test.T')")
+ assertResult(1)(spark.sql("show tags T").count())
+ assertResult(
+ spark
+ .sql("select date_format(date_sub(current_date(), 1), 'yyyy-MM-dd')")
+ .head()
+
.getString(0))(loadTable("T").tagManager().tagObjects().get(0).getRight)
+ }
+
+}