This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4088bc762 [procedure] Support multiple tags and branches delete (#3684)
4088bc762 is described below
commit 4088bc762d8cd8fa3782097bd13095382f24e8e1
Author: xuzifu666 <[email protected]>
AuthorDate: Fri Jul 5 22:15:21 2024 +0800
[procedure] Support multiple tags and branches delete (#3684)
---
docs/content/flink/procedures.md | 4 +-
docs/content/spark/procedures.md | 2 +-
.../paimon/flink/action/DeleteBranchAction.java | 11 ++--
.../paimon/flink/action/DeleteTagAction.java | 11 ++--
.../flink/procedure/DeleteBranchProcedure.java | 9 ++--
.../paimon/flink/procedure/DeleteTagProcedure.java | 7 ++-
.../paimon/flink/action/BranchActionITCase.java | 21 ++++++++
.../paimon/flink/action/TagActionITCase.java | 63 ++++++++++++++++++++++
.../spark/procedure/DeleteBranchProcedure.java | 7 ++-
.../CreateAndDeleteBranchProcedureTest.scala | 25 ++++++++-
.../CreateAndDeleteTagProcedureTest.scala | 26 +++++++++
11 files changed, 167 insertions(+), 19 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index fc9d48d67..e899e882c 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -127,7 +127,7 @@ All available procedures are listed below.
<td>
To delete a tag. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
- <li>tagName: name of the tag to be deleted.</li>
+ <li>tagName: name of the tag to be deleted.If you specify multiple
tags, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_tag('default.T', 'my_tag')
@@ -331,7 +331,7 @@ All available procedures are listed below.
<td>
To delete a branch. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
- <li>branchName: name of the branch to be deleted.</li>
+ <li>branchName: name of the branch to be deleted.If you specify
multiple branches, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_branch('default.T', 'branch1')
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 56a8abb61..c0e0131c4 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -169,7 +169,7 @@ This section introduce all available spark procedures about
paimon.
<td>
To merge a branch to main branch. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
- <li>branch: name of the branch to be merged.</li>
+ <li>branch: name of the branch to be merged.If you specify
multiple branches, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch')
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java
index b167f7b96..d5a48fcb4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java
@@ -23,20 +23,23 @@ import java.util.Map;
/** Delete branch action for Flink. */
public class DeleteBranchAction extends TableActionBase {
- private final String branchName;
+ private final String branchNames;
public DeleteBranchAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
- String branchName) {
+ String branchNames) {
super(warehouse, databaseName, tableName, catalogConfig);
- this.branchName = branchName;
+ this.branchNames = branchNames;
}
@Override
public void run() throws Exception {
- table.deleteBranch(branchName);
+ String[] branches = branchNames.split(",");
+ for (String branch : branches) {
+ table.deleteBranch(branch);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
index 46c15bfe3..dc7d5dc9f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
@@ -23,20 +23,23 @@ import java.util.Map;
/** Delete tag action for Flink. */
public class DeleteTagAction extends TableActionBase {
- private final String tagName;
+ private final String tagNameStr;
public DeleteTagAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
- String tagName) {
+ String tagNameStr) {
super(warehouse, databaseName, tableName, catalogConfig);
- this.tagName = tagName;
+ this.tagNameStr = tagNameStr;
}
@Override
public void run() throws Exception {
- table.deleteTag(tagName);
+ String[] tagNames = tagNameStr.split(",");
+ for (String tagName : tagNames) {
+ table.deleteTag(tagName);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
index c4d04b5a1..56f585ea3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
@@ -40,10 +40,13 @@ public class DeleteBranchProcedure extends ProcedureBase {
return IDENTIFIER;
}
- public String[] call(ProcedureContext procedureContext, String tableId,
String branchName)
+ public String[] call(ProcedureContext procedureContext, String tableId,
String branchStr)
throws Catalog.TableNotExistException {
- Table table = catalog.getTable(Identifier.fromString(tableId));
- table.deleteBranch(branchName);
+ String[] branchs = branchStr.split(",");
+ for (String branch : branchs) {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.deleteBranch(branch);
+ }
return new String[] {"Success"};
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
index 931448937..5f597ce48 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
@@ -35,10 +35,13 @@ public class DeleteTagProcedure extends ProcedureBase {
public static final String IDENTIFIER = "delete_tag";
- public String[] call(ProcedureContext procedureContext, String tableId,
String tagName)
+ public String[] call(ProcedureContext procedureContext, String tableId,
String tagNameStr)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
- table.deleteTag(tagName);
+ String[] tagNames = tagNameStr.split(",");
+ for (String tagName : tagNames) {
+ table.deleteTag(tagName);
+ }
return new String[] {"Success"};
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 21a35964e..a83e618cc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -159,6 +159,27 @@ class BranchActionITCase extends ActionITCaseBase {
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse();
+ // create branch1 and branch3
+ callProcedure(
+ String.format(
+ "CALL sys.create_branch('%s.%s',
'branch_name_with_snapshotId_1', 1)",
+ database, tableName));
+
assertThat(branchManager.branchExists("branch_name_with_snapshotId_1")).isTrue();
+
+ callProcedure(
+ String.format(
+ "CALL sys.create_branch('%s.%s',
'branch_name_with_snapshotId_3', 3)",
+ database, tableName));
+
assertThat(branchManager.branchExists("branch_name_with_snapshotId_3")).isTrue();
+
+ // delete branch1 and branch3 batch
+ callProcedure(
+ String.format(
+ "CALL sys.delete_branch('%s.%s',
'branch_name_with_snapshotId_1,branch_name_with_snapshotId_3')",
+ database, tableName));
+
assertThat(branchManager.branchExists("branch_name_with_snapshotId_1")).isFalse();
+
assertThat(branchManager.branchExists("branch_name_with_snapshotId_3")).isFalse();
+
createAction(
CreateBranchAction.class,
"create_branch",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index 6ccbd6526..b873a1bb6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -111,6 +111,69 @@ public class TagActionITCase extends ActionITCaseBase {
String.format("CALL sys.delete_tag('%s.%s', 'tag2')",
database, tableName));
}
assertThat(tagManager.tagExists("tag2")).isFalse();
+
+ // create tag1
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ createAction(
+ CreateTagAction.class,
+ "create_tag",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--tag_name",
+ "tag1",
+ "--snapshot",
+ "1")
+ .run();
+ } else {
+ callProcedure(
+ String.format("CALL sys.create_tag('%s.%s', 'tag1', 1)",
database, tableName));
+ }
+
+ // create tag3
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ createAction(
+ CreateTagAction.class,
+ "create_tag",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--tag_name",
+ "tag3",
+ "--snapshot",
+ "3")
+ .run();
+ } else {
+ callProcedure(
+ String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)",
database, tableName));
+ }
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ createAction(
+ DeleteTagAction.class,
+ "delete_tag",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--tag_name",
+ "tag1,tag3")
+ .run();
+ } else {
+ callProcedure(
+ String.format(
+ "CALL sys.delete_tag('%s.%s', 'tag1,tag3')",
database, tableName));
+ }
+ assertThat(tagManager.tagExists("tag1")).isFalse();
+ assertThat(tagManager.tagExists("tag3")).isFalse();
}
@Test
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
index 98d2a6ada..d5536250a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
@@ -60,12 +60,15 @@ public class DeleteBranchProcedure extends BaseProcedure {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- String branch = args.getString(1);
+ String branchStr = args.getString(1);
+ String[] branches = branchStr.split(",");
return modifyPaimonTable(
tableIdent,
table -> {
- table.deleteBranch(branch);
+ for (String branch : branches) {
+ table.deleteBranch(branch);
+ }
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
index 92f766a44..59a4b510f 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
@@ -68,7 +68,7 @@ class CreateAndDeleteBranchProcedureTest extends
PaimonSparkTestBase with Stream
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
- // create tag
+ // create tags
checkAnswer(
spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag =>
'test_tag', snapshot => 2)"),
@@ -106,6 +106,29 @@ class CreateAndDeleteBranchProcedureTest extends
PaimonSparkTestBase with Stream
"CALL paimon.sys.delete_branch(table => 'test.T', branch =>
'test_branch')"),
Row(true) :: Nil)
assert(!branchManager.branchExists("test_branch"))
+
+ // create branch with snapshot2
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.create_branch(table => 'test.T', branch =>
'snapshot_branch_2', snapshot => 2)"),
+ Row(true) :: Nil)
+ assert(branchManager.branchExists("snapshot_branch_2"))
+
+ // create branch with snapshot3
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.create_branch(table => 'test.T', branch =>
'snapshot_branch_3', snapshot => 3)"),
+ Row(true) :: Nil)
+ assert(branchManager.branchExists("snapshot_branch_3"))
+
+ // delete branch:snapshot_branch_2 and snapshot_branch_3
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.delete_branch(table => 'test.T', branch =>
'snapshot_branch_2,snapshot_branch_3')"),
+ Row(true) :: Nil)
+ assert(!branchManager.branchExists("snapshot_branch_2"))
+ assert(!branchManager.branchExists("snapshot_branch_3"))
+
} finally {
stream.stop()
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index 4351abd3f..296680919 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -92,6 +92,32 @@ class CreateAndDeleteTagProcedureTest extends
PaimonSparkTestBase with StreamTes
"CALL paimon.sys.delete_tag(table => 'test.T', tag =>
'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM
paimon.test.`T$tags`"), Nil)
+
+ // create test_tag_1 and test_tag_2
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.create_tag(" +
+ "table => 'test.T', tag => 'test_tag_1', snapshot => 1)"),
+ Row(true) :: Nil)
+
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.create_tag(" +
+ "table => 'test.T', tag => 'test_tag_2', snapshot => 2)"),
+ Row(true) :: Nil)
+
+ checkAnswer(
+ spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
+ Row("test_tag_1") :: Row("test_tag_2") :: Nil)
+
+ // delete test_tag_1 and test_tag_2
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.delete_tag(table => 'test.T', tag =>
'test_tag_1,test_tag_2')"),
+ Row(true) :: Nil)
+
+ checkAnswer(spark.sql("SELECT tag_name FROM
paimon.test.`T$tags`"), Nil)
+
} finally {
stream.stop()
}