This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 29c134788 [spark] Add replace tag ddl syntax (#4457)
29c134788 is described below

commit 29c1347889e31c9a4789e8f4a3656ece55edac8c
Author: askwang <[email protected]>
AuthorDate: Tue Nov 5 19:35:59 2024 +0800

    [spark] Add replace tag ddl syntax (#4457)
---
 docs/content/spark/sql-ddl.md                      | 14 ++++-
 .../PaimonSqlExtensions.g4                         | 13 +++--
 ...mmand.scala => CreateOrReplaceTagCommand.scala} |  4 +-
 ...eTagExec.scala => CreateOrReplaceTagExec.scala} | 29 +++++++---
 .../paimon/spark/execution/PaimonStrategy.scala    |  8 +--
 .../PaimonSparkSqlExtensionsParser.scala           | 10 ++--
 .../extensions/PaimonSqlExtensionsAstBuilder.scala | 63 +++++++++++++---------
 .../paimon/spark/sql/PaimonTagDdlTestBase.scala    | 29 ++++++++++
 8 files changed, 125 insertions(+), 45 deletions(-)

diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index 43b101730..3b4d1722b 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -211,8 +211,12 @@ CREATE TABLE my_table_all_as PARTITIONED BY (dt) 
TBLPROPERTIES ('primary-key' =
 ```
 
 ## Tag DDL
-### Create Tag
-Create a tag based on snapshot or retention.
+### Create or replace Tag
+Create or replace a tag syntax with the following options.
+- Create a tag with or without the snapshot id and time retention.
+- Create an existed tag is not failed if using `IF NOT EXISTS` syntax.
+- Update a tag using `REPLACE TAG` or `CREATE OR REPLACE TAG` syntax.
+
 ```sql
 -- create a tag based on the latest snapshot and no retention.
 ALTER TABLE T CREATE TAG `TAG-1`;
@@ -228,6 +232,12 @@ ALTER TABLE T CREATE TAG `TAG-3` AS OF VERSION 1;
 
 -- create a tag based on snapshot-2 and retain it for 12 hour.
 ALTER TABLE T CREATE TAG `TAG-4` AS OF VERSION 2 RETAIN 12 HOURS;
+
+-- replace a existed tag with new snapshot id and new retention
+ALTER TABLE T REPLACE TAG `TAG-4` AS OF VERSION 2 RETAIN 24 HOURS;
+
+-- create or replace a tag, create tag if it not exist, replace tag if it 
exists.
+ALTER TABLE T CREATE OR REPLACE TAG `TAG-5` AS OF VERSION 2 RETAIN 24 HOURS;
 ```
 
 ### Delete Tag
diff --git 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index e835b00cd..207d97321 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++ 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -71,7 +71,7 @@ singleStatement
 statement
     : CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')'     
             #call
     | SHOW TAGS multipartIdentifier                                            
             #showTags
-    | ALTER TABLE multipartIdentifier CREATE TAG (IF NOT EXISTS)? identifier 
tagOptions     #createTag
+    | ALTER TABLE multipartIdentifier createReplaceTagClause                   
             #createOrReplaceTag
     | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier       
             #deleteTag
     | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier      
             #renameTag
   ;
@@ -81,6 +81,11 @@ callArgument
     | identifier '=>' expression    #namedArgument
     ;
 
+createReplaceTagClause
+    : CREATE TAG (IF NOT EXISTS)? identifier tagOptions
+    | (CREATE OR)? REPLACE TAG identifier tagOptions
+    ;
+
 tagOptions
   : (AS OF VERSION snapshotId)? (timeRetain)?
   ;
@@ -146,8 +151,8 @@ quotedIdentifier
     ;
 
 nonReserved
-    : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | 
OF | TABLE
-    | RETAIN | VERSION | TAG
+    : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | 
OF | OR | TABLE
+    | REPLACE | RETAIN | VERSION | TAG
     | TRUE | FALSE
     | MAP
     ;
@@ -164,7 +169,9 @@ IF : 'IF';
 MINUTES: 'MINUTES';
 NOT: 'NOT';
 OF: 'OF';
+OR: 'OR';
 RENAME: 'RENAME';
+REPLACE: 'REPLACE';
 RETAIN: 'RETAIN';
 SHOW: 'SHOW';
 TABLE: 'TABLE';
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateOrReplaceTagCommand.scala
similarity index 93%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateOrReplaceTagCommand.scala
index 226311663..0830fc9ed 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateOrReplaceTagCommand.scala
@@ -22,10 +22,12 @@ import org.apache.paimon.spark.leafnode.PaimonLeafCommand
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 
-case class CreateTagCommand(
+case class CreateOrReplaceTagCommand(
     table: Seq[String],
     tagName: String,
     tagOptions: TagOptions,
+    create: Boolean,
+    replace: Boolean,
     ifNotExists: Boolean)
   extends PaimonLeafCommand {
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
similarity index 72%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
index 57593c3a6..0506ed42f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
@@ -27,11 +27,13 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
 
-case class CreateTagExec(
+case class CreateOrReplaceTagExec(
     catalog: TableCatalog,
     ident: Identifier,
     tagName: String,
     tagOptions: TagOptions,
+    create: Boolean,
+    replace: Boolean,
     ifNotExists: Boolean)
   extends PaimonLeafV2CommandExec {
 
@@ -42,14 +44,27 @@ case class CreateTagExec(
     table.asInstanceOf[SparkTable].getTable match {
       case paimonTable: FileStoreTable =>
         val tagIsExists = paimonTable.tagManager().tagExists(tagName)
-        if (tagIsExists && ifNotExists) {
-          return Nil
-        }
         val timeRetained = tagOptions.timeRetained.orNull
-        if (tagOptions.snapshotId.isEmpty) {
-          paimonTable.createTag(tagName, timeRetained)
+        val snapshotId = tagOptions.snapshotId
+
+        if (create && replace && !tagIsExists) {
+          if (snapshotId.isEmpty) {
+            paimonTable.createTag(tagName, timeRetained)
+          } else {
+            paimonTable.createTag(tagName, snapshotId.get, timeRetained)
+          }
+        } else if (replace) {
+          paimonTable.replaceTag(tagName, snapshotId.get, timeRetained)
         } else {
-          paimonTable.createTag(tagName, tagOptions.snapshotId.get, 
timeRetained)
+          if (tagIsExists && ifNotExists) {
+            return Nil
+          }
+
+          if (snapshotId.isEmpty) {
+            paimonTable.createTag(tagName, timeRetained)
+          } else {
+            paimonTable.createTag(tagName, snapshotId.get, timeRetained)
+          }
         }
       case t =>
         throw new UnsupportedOperationException(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index d715ef2f5..0c3d3e6b6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.execution
 
 import org.apache.paimon.spark.{SparkCatalog, SparkUtils}
-import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, 
DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}
+import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}
 
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -50,12 +50,14 @@ case class PaimonStrategy(spark: SparkSession)
     case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) =>
       ShowTagsExec(catalog, ident, t.output) :: Nil
 
-    case CreateTagCommand(
+    case CreateOrReplaceTagCommand(
           PaimonCatalogAndIdentifier(table, ident),
           tagName,
           tagOptions,
+          create,
+          replace,
           ifNotExists) =>
-      CreateTagExec(table, ident, tagName, tagOptions, ifNotExists) :: Nil
+      CreateOrReplaceTagExec(table, ident, tagName, tagOptions, create, 
replace, ifNotExists) :: Nil
 
     case DeleteTagCommand(PaimonCatalogAndIdentifier(catalog, ident), tagStr, 
ifExists) =>
       DeleteTagExec(catalog, ident, tagStr, ifExists) :: Nil
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
index f7e8a8506..78a7f80ea 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
@@ -105,10 +105,12 @@ class PaimonSparkSqlExtensionsParser(delegate: 
ParserInterface)
   }
 
   private def isTagRefDdl(normalized: String): Boolean = {
-    normalized.startsWith("show tags") || (normalized
-      .startsWith("alter table") && (normalized.contains("create tag") ||
-      normalized.contains("rename tag") ||
-      normalized.contains("delete tag")))
+    normalized.startsWith("show tags") ||
+    (normalized.startsWith("alter table") &&
+      (normalized.contains("create tag") ||
+        normalized.contains("replace tag") ||
+        normalized.contains("rename tag") ||
+        normalized.contains("delete tag")))
   }
 
   protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser 
=> T): T = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index 06d57d597..b864894e7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.catalyst.parser.extensions
 
 import org.apache.paimon.spark.catalyst.plans.logical
-import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, 
DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, 
PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
+import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, 
PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
 import org.apache.paimon.utils.TimeUtils
 
 import org.antlr.v4.runtime._
@@ -98,31 +98,44 @@ class PaimonSqlExtensionsAstBuilder(delegate: 
ParserInterface)
     ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier))
   }
 
-  /** Create a CREATE TAG logical command. */
-  override def visitCreateTag(ctx: CreateTagContext): CreateTagCommand = 
withOrigin(ctx) {
-    val tagName = ctx.identifier().getText
-    val tagOptionsContext = Option(ctx.tagOptions())
-    val snapshotId =
-      tagOptionsContext.flatMap(tagOptions => 
Option(tagOptions.snapshotId())).map(_.getText.toLong)
-    val timeRetainCtx = tagOptionsContext.flatMap(tagOptions => 
Option(tagOptions.timeRetain()))
-    val timeRetained = if (timeRetainCtx.nonEmpty) {
-      val (number, timeUnit) =
-        timeRetainCtx.map(retain => (retain.number().getText.toLong, 
retain.timeUnit().getText)).get
-      Option(TimeUtils.parseDuration(number, timeUnit))
-    } else {
-      None
+  /** Create a CREATE OR REPLACE TAG logical command. */
+  override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): 
CreateOrReplaceTagCommand =
+    withOrigin(ctx) {
+      val createTagClause = ctx.createReplaceTagClause()
+
+      val tagName = createTagClause.identifier().getText
+      val tagOptionsContext = Option(createTagClause.tagOptions())
+      val snapshotId =
+        tagOptionsContext
+          .flatMap(tagOptions => Option(tagOptions.snapshotId()))
+          .map(_.getText.toLong)
+      val timeRetainCtx = tagOptionsContext.flatMap(tagOptions => 
Option(tagOptions.timeRetain()))
+      val timeRetained = if (timeRetainCtx.nonEmpty) {
+        val (number, timeUnit) =
+          timeRetainCtx
+            .map(retain => (retain.number().getText.toLong, 
retain.timeUnit().getText))
+            .get
+        Option(TimeUtils.parseDuration(number, timeUnit))
+      } else {
+        None
+      }
+      val tagOptions = TagOptions(
+        snapshotId,
+        timeRetained
+      )
+
+      val create = createTagClause.CREATE() != null
+      val replace = createTagClause.REPLACE() != null
+      val ifNotExists = createTagClause.EXISTS() != null
+
+      CreateOrReplaceTagCommand(
+        typedVisit[Seq[String]](ctx.multipartIdentifier),
+        tagName,
+        tagOptions,
+        create,
+        replace,
+        ifNotExists)
     }
-    val tagOptions = TagOptions(
-      snapshotId,
-      timeRetained
-    )
-    val ifNotExists = ctx.EXISTS() != null
-    CreateTagCommand(
-      typedVisit[Seq[String]](ctx.multipartIdentifier),
-      tagName,
-      tagOptions,
-      ifNotExists)
-  }
 
   /** Create a DELETE TAG logical command. */
   override def visitDeleteTag(ctx: DeleteTagContext): DeleteTagCommand = 
withOrigin(ctx) {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
index 7c551f0c9..5ad687b4d 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
@@ -71,6 +71,35 @@ abstract class PaimonTagDdlTestBase extends 
PaimonSparkTestBase {
       Row("tag-1", 3, null))
   }
 
+  test("Tag ddl: alter table t create or replace tag syntax") {
+    spark.sql("""CREATE TABLE T (id INT, name STRING)
+                |USING PAIMON
+                |TBLPROPERTIES ('primary-key'='id')""".stripMargin)
+
+    spark.sql("insert into T values(1, 'a')")
+    spark.sql("insert into T values(2, 'b')")
+    assertResult(2)(loadTable("T").snapshotManager().snapshotCount())
+
+    // test 'replace' syntax
+    spark.sql("alter table T create tag `tag-1` as of version 1")
+    spark.sql("alter table T replace tag `tag-1` as of version 2 RETAIN 1 
HOURS")
+    checkAnswer(
+      spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
+      Row("tag-1", 2, "PT1H") :: Nil)
+
+    // test 'create or replace' syntax
+    // tag-2 not exist, create it
+    spark.sql("alter table T create or replace tag `tag-2` as of version 1")
+    checkAnswer(
+      spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where 
tag_name = 'tag-2'"),
+      Row("tag-2", 1, null) :: Nil)
+    // tag-2 exists, replace it
+    spark.sql("alter table T create or replace tag `tag-2` as of version 2 
RETAIN 1 HOURS")
+    checkAnswer(
+      spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where 
tag_name = 'tag-2'"),
+      Row("tag-2", 2, "PT1H") :: Nil)
+  }
+
   test("Tag ddl: alter table t delete tag syntax") {
     spark.sql("""CREATE TABLE T (id INT, name STRING)
                 |USING PAIMON

Reply via email to