This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 29c134788 [spark] Add replace tag ddl syntax (#4457)
29c134788 is described below
commit 29c1347889e31c9a4789e8f4a3656ece55edac8c
Author: askwang <[email protected]>
AuthorDate: Tue Nov 5 19:35:59 2024 +0800
[spark] Add replace tag ddl syntax (#4457)
---
docs/content/spark/sql-ddl.md | 14 ++++-
.../PaimonSqlExtensions.g4 | 13 +++--
...mmand.scala => CreateOrReplaceTagCommand.scala} | 4 +-
...eTagExec.scala => CreateOrReplaceTagExec.scala} | 29 +++++++---
.../paimon/spark/execution/PaimonStrategy.scala | 8 +--
.../PaimonSparkSqlExtensionsParser.scala | 10 ++--
.../extensions/PaimonSqlExtensionsAstBuilder.scala | 63 +++++++++++++---------
.../paimon/spark/sql/PaimonTagDdlTestBase.scala | 29 ++++++++++
8 files changed, 125 insertions(+), 45 deletions(-)
diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index 43b101730..3b4d1722b 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -211,8 +211,12 @@ CREATE TABLE my_table_all_as PARTITIONED BY (dt)
TBLPROPERTIES ('primary-key' =
```
## Tag DDL
-### Create Tag
-Create a tag based on snapshot or retention.
+### Create or replace Tag
+Create or replace a tag syntax with the following options.
+- Create a tag with or without the snapshot id and time retention.
+- Create an existed tag is not failed if using `IF NOT EXISTS` syntax.
+- Update a tag using `REPLACE TAG` or `CREATE OR REPLACE TAG` syntax.
+
```sql
-- create a tag based on the latest snapshot and no retention.
ALTER TABLE T CREATE TAG `TAG-1`;
@@ -228,6 +232,12 @@ ALTER TABLE T CREATE TAG `TAG-3` AS OF VERSION 1;
-- create a tag based on snapshot-2 and retain it for 12 hour.
ALTER TABLE T CREATE TAG `TAG-4` AS OF VERSION 2 RETAIN 12 HOURS;
+
+-- replace a existed tag with new snapshot id and new retention
+ALTER TABLE T REPLACE TAG `TAG-4` AS OF VERSION 2 RETAIN 24 HOURS;
+
+-- create or replace a tag, create tag if it not exist, replace tag if it
exists.
+ALTER TABLE T CREATE OR REPLACE TAG `TAG-5` AS OF VERSION 2 RETAIN 24 HOURS;
```
### Delete Tag
diff --git
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index e835b00cd..207d97321 100644
---
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -71,7 +71,7 @@ singleStatement
statement
: CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')'
#call
| SHOW TAGS multipartIdentifier
#showTags
- | ALTER TABLE multipartIdentifier CREATE TAG (IF NOT EXISTS)? identifier
tagOptions #createTag
+ | ALTER TABLE multipartIdentifier createReplaceTagClause
#createOrReplaceTag
| ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier
#deleteTag
| ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier
#renameTag
;
@@ -81,6 +81,11 @@ callArgument
| identifier '=>' expression #namedArgument
;
+createReplaceTagClause
+ : CREATE TAG (IF NOT EXISTS)? identifier tagOptions
+ | (CREATE OR)? REPLACE TAG identifier tagOptions
+ ;
+
tagOptions
: (AS OF VERSION snapshotId)? (timeRetain)?
;
@@ -146,8 +151,8 @@ quotedIdentifier
;
nonReserved
- : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT |
OF | TABLE
- | RETAIN | VERSION | TAG
+ : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT |
OF | OR | TABLE
+ | REPLACE | RETAIN | VERSION | TAG
| TRUE | FALSE
| MAP
;
@@ -164,7 +169,9 @@ IF : 'IF';
MINUTES: 'MINUTES';
NOT: 'NOT';
OF: 'OF';
+OR: 'OR';
RENAME: 'RENAME';
+REPLACE: 'REPLACE';
RETAIN: 'RETAIN';
SHOW: 'SHOW';
TABLE: 'TABLE';
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateOrReplaceTagCommand.scala
similarity index 93%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateOrReplaceTagCommand.scala
index 226311663..0830fc9ed 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateOrReplaceTagCommand.scala
@@ -22,10 +22,12 @@ import org.apache.paimon.spark.leafnode.PaimonLeafCommand
import org.apache.spark.sql.catalyst.expressions.Attribute
-case class CreateTagCommand(
+case class CreateOrReplaceTagCommand(
table: Seq[String],
tagName: String,
tagOptions: TagOptions,
+ create: Boolean,
+ replace: Boolean,
ifNotExists: Boolean)
extends PaimonLeafCommand {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
similarity index 72%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
index 57593c3a6..0506ed42f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala
@@ -27,11 +27,13 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
-case class CreateTagExec(
+case class CreateOrReplaceTagExec(
catalog: TableCatalog,
ident: Identifier,
tagName: String,
tagOptions: TagOptions,
+ create: Boolean,
+ replace: Boolean,
ifNotExists: Boolean)
extends PaimonLeafV2CommandExec {
@@ -42,14 +44,27 @@ case class CreateTagExec(
table.asInstanceOf[SparkTable].getTable match {
case paimonTable: FileStoreTable =>
val tagIsExists = paimonTable.tagManager().tagExists(tagName)
- if (tagIsExists && ifNotExists) {
- return Nil
- }
val timeRetained = tagOptions.timeRetained.orNull
- if (tagOptions.snapshotId.isEmpty) {
- paimonTable.createTag(tagName, timeRetained)
+ val snapshotId = tagOptions.snapshotId
+
+ if (create && replace && !tagIsExists) {
+ if (snapshotId.isEmpty) {
+ paimonTable.createTag(tagName, timeRetained)
+ } else {
+ paimonTable.createTag(tagName, snapshotId.get, timeRetained)
+ }
+ } else if (replace) {
+ paimonTable.replaceTag(tagName, snapshotId.get, timeRetained)
} else {
- paimonTable.createTag(tagName, tagOptions.snapshotId.get,
timeRetained)
+ if (tagIsExists && ifNotExists) {
+ return Nil
+ }
+
+ if (snapshotId.isEmpty) {
+ paimonTable.createTag(tagName, timeRetained)
+ } else {
+ paimonTable.createTag(tagName, snapshotId.get, timeRetained)
+ }
}
case t =>
throw new UnsupportedOperationException(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index d715ef2f5..0c3d3e6b6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.execution
import org.apache.paimon.spark.{SparkCatalog, SparkUtils}
-import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand,
DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}
+import
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand,
DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
@@ -50,12 +50,14 @@ case class PaimonStrategy(spark: SparkSession)
case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) =>
ShowTagsExec(catalog, ident, t.output) :: Nil
- case CreateTagCommand(
+ case CreateOrReplaceTagCommand(
PaimonCatalogAndIdentifier(table, ident),
tagName,
tagOptions,
+ create,
+ replace,
ifNotExists) =>
- CreateTagExec(table, ident, tagName, tagOptions, ifNotExists) :: Nil
+ CreateOrReplaceTagExec(table, ident, tagName, tagOptions, create,
replace, ifNotExists) :: Nil
case DeleteTagCommand(PaimonCatalogAndIdentifier(catalog, ident), tagStr,
ifExists) =>
DeleteTagExec(catalog, ident, tagStr, ifExists) :: Nil
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
index f7e8a8506..78a7f80ea 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
@@ -105,10 +105,12 @@ class PaimonSparkSqlExtensionsParser(delegate:
ParserInterface)
}
private def isTagRefDdl(normalized: String): Boolean = {
- normalized.startsWith("show tags") || (normalized
- .startsWith("alter table") && (normalized.contains("create tag") ||
- normalized.contains("rename tag") ||
- normalized.contains("delete tag")))
+ normalized.startsWith("show tags") ||
+ (normalized.startsWith("alter table") &&
+ (normalized.contains("create tag") ||
+ normalized.contains("replace tag") ||
+ normalized.contains("rename tag") ||
+ normalized.contains("delete tag")))
}
protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser
=> T): T = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index 06d57d597..b864894e7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.catalyst.parser.extensions
import org.apache.paimon.spark.catalyst.plans.logical
-import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand,
DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument,
PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
+import
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand,
DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument,
PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
import org.apache.paimon.utils.TimeUtils
import org.antlr.v4.runtime._
@@ -98,31 +98,44 @@ class PaimonSqlExtensionsAstBuilder(delegate:
ParserInterface)
ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier))
}
- /** Create a CREATE TAG logical command. */
- override def visitCreateTag(ctx: CreateTagContext): CreateTagCommand =
withOrigin(ctx) {
- val tagName = ctx.identifier().getText
- val tagOptionsContext = Option(ctx.tagOptions())
- val snapshotId =
- tagOptionsContext.flatMap(tagOptions =>
Option(tagOptions.snapshotId())).map(_.getText.toLong)
- val timeRetainCtx = tagOptionsContext.flatMap(tagOptions =>
Option(tagOptions.timeRetain()))
- val timeRetained = if (timeRetainCtx.nonEmpty) {
- val (number, timeUnit) =
- timeRetainCtx.map(retain => (retain.number().getText.toLong,
retain.timeUnit().getText)).get
- Option(TimeUtils.parseDuration(number, timeUnit))
- } else {
- None
+ /** Create a CREATE OR REPLACE TAG logical command. */
+ override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext):
CreateOrReplaceTagCommand =
+ withOrigin(ctx) {
+ val createTagClause = ctx.createReplaceTagClause()
+
+ val tagName = createTagClause.identifier().getText
+ val tagOptionsContext = Option(createTagClause.tagOptions())
+ val snapshotId =
+ tagOptionsContext
+ .flatMap(tagOptions => Option(tagOptions.snapshotId()))
+ .map(_.getText.toLong)
+ val timeRetainCtx = tagOptionsContext.flatMap(tagOptions =>
Option(tagOptions.timeRetain()))
+ val timeRetained = if (timeRetainCtx.nonEmpty) {
+ val (number, timeUnit) =
+ timeRetainCtx
+ .map(retain => (retain.number().getText.toLong,
retain.timeUnit().getText))
+ .get
+ Option(TimeUtils.parseDuration(number, timeUnit))
+ } else {
+ None
+ }
+ val tagOptions = TagOptions(
+ snapshotId,
+ timeRetained
+ )
+
+ val create = createTagClause.CREATE() != null
+ val replace = createTagClause.REPLACE() != null
+ val ifNotExists = createTagClause.EXISTS() != null
+
+ CreateOrReplaceTagCommand(
+ typedVisit[Seq[String]](ctx.multipartIdentifier),
+ tagName,
+ tagOptions,
+ create,
+ replace,
+ ifNotExists)
}
- val tagOptions = TagOptions(
- snapshotId,
- timeRetained
- )
- val ifNotExists = ctx.EXISTS() != null
- CreateTagCommand(
- typedVisit[Seq[String]](ctx.multipartIdentifier),
- tagName,
- tagOptions,
- ifNotExists)
- }
/** Create a DELETE TAG logical command. */
override def visitDeleteTag(ctx: DeleteTagContext): DeleteTagCommand =
withOrigin(ctx) {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
index 7c551f0c9..5ad687b4d 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
@@ -71,6 +71,35 @@ abstract class PaimonTagDdlTestBase extends
PaimonSparkTestBase {
Row("tag-1", 3, null))
}
+ test("Tag ddl: alter table t create or replace tag syntax") {
+ spark.sql("""CREATE TABLE T (id INT, name STRING)
+ |USING PAIMON
+ |TBLPROPERTIES ('primary-key'='id')""".stripMargin)
+
+ spark.sql("insert into T values(1, 'a')")
+ spark.sql("insert into T values(2, 'b')")
+ assertResult(2)(loadTable("T").snapshotManager().snapshotCount())
+
+ // test 'replace' syntax
+ spark.sql("alter table T create tag `tag-1` as of version 1")
+ spark.sql("alter table T replace tag `tag-1` as of version 2 RETAIN 1
HOURS")
+ checkAnswer(
+ spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
+ Row("tag-1", 2, "PT1H") :: Nil)
+
+ // test 'create or replace' syntax
+ // tag-2 not exist, create it
+ spark.sql("alter table T create or replace tag `tag-2` as of version 1")
+ checkAnswer(
+ spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where
tag_name = 'tag-2'"),
+ Row("tag-2", 1, null) :: Nil)
+ // tag-2 exists, replace it
+ spark.sql("alter table T create or replace tag `tag-2` as of version 2
RETAIN 1 HOURS")
+ checkAnswer(
+ spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where
tag_name = 'tag-2'"),
+ Row("tag-2", 2, "PT1H") :: Nil)
+ }
+
test("Tag ddl: alter table t delete tag syntax") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON