This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4088bc762 [procedure] Support multiple tags and branches delete (#3684)
4088bc762 is described below

commit 4088bc762d8cd8fa3782097bd13095382f24e8e1
Author: xuzifu666 <[email protected]>
AuthorDate: Fri Jul 5 22:15:21 2024 +0800

    [procedure] Support multiple tags and branches delete (#3684)
---
 docs/content/flink/procedures.md                   |  4 +-
 docs/content/spark/procedures.md                   |  2 +-
 .../paimon/flink/action/DeleteBranchAction.java    | 11 ++--
 .../paimon/flink/action/DeleteTagAction.java       | 11 ++--
 .../flink/procedure/DeleteBranchProcedure.java     |  9 ++--
 .../paimon/flink/procedure/DeleteTagProcedure.java |  7 ++-
 .../paimon/flink/action/BranchActionITCase.java    | 21 ++++++++
 .../paimon/flink/action/TagActionITCase.java       | 63 ++++++++++++++++++++++
 .../spark/procedure/DeleteBranchProcedure.java     |  7 ++-
 .../CreateAndDeleteBranchProcedureTest.scala       | 25 ++++++++-
 .../CreateAndDeleteTagProcedureTest.scala          | 26 +++++++++
 11 files changed, 167 insertions(+), 19 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index fc9d48d67..e899e882c 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -127,7 +127,7 @@ All available procedures are listed below.
       <td>
          To delete a tag. Arguments:
             <li>identifier: the target table identifier. Cannot be empty.</li>
-            <li>tagName: name of the tag to be deleted.</li>
+            <li>tagName: name of the tag to be deleted.If you specify multiple 
tags, delimiter is ','.</li>
       </td>
       <td>
          CALL sys.delete_tag('default.T', 'my_tag')
@@ -331,7 +331,7 @@ All available procedures are listed below.
       <td>
          To delete a branch. Arguments:
             <li>identifier: the target table identifier. Cannot be empty.</li>
-            <li>branchName: name of the branch to be deleted.</li>
+            <li>branchName: name of the branch to be deleted.If you specify 
multiple branches, delimiter is ','.</li>
       </td>
       <td>
          CALL sys.delete_branch('default.T', 'branch1')
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 56a8abb61..c0e0131c4 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -169,7 +169,7 @@ This section introduce all available spark procedures about 
paimon.
       <td>
          To merge a branch to main branch. Arguments:
             <li>table: the target table identifier. Cannot be empty.</li>
-            <li>branch: name of the branch to be merged.</li>
+            <li>branch: name of the branch to be merged.If you specify 
multiple branches, delimiter is ','.</li>
       </td>
       <td>
           CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch')
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java
index b167f7b96..d5a48fcb4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java
@@ -23,20 +23,23 @@ import java.util.Map;
 /** Delete branch action for Flink. */
 public class DeleteBranchAction extends TableActionBase {
 
-    private final String branchName;
+    private final String branchNames;
 
     public DeleteBranchAction(
             String warehouse,
             String databaseName,
             String tableName,
             Map<String, String> catalogConfig,
-            String branchName) {
+            String branchNames) {
         super(warehouse, databaseName, tableName, catalogConfig);
-        this.branchName = branchName;
+        this.branchNames = branchNames;
     }
 
     @Override
     public void run() throws Exception {
-        table.deleteBranch(branchName);
+        String[] branches = branchNames.split(",");
+        for (String branch : branches) {
+            table.deleteBranch(branch);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
index 46c15bfe3..dc7d5dc9f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
@@ -23,20 +23,23 @@ import java.util.Map;
 /** Delete tag action for Flink. */
 public class DeleteTagAction extends TableActionBase {
 
-    private final String tagName;
+    private final String tagNameStr;
 
     public DeleteTagAction(
             String warehouse,
             String databaseName,
             String tableName,
             Map<String, String> catalogConfig,
-            String tagName) {
+            String tagNameStr) {
         super(warehouse, databaseName, tableName, catalogConfig);
-        this.tagName = tagName;
+        this.tagNameStr = tagNameStr;
     }
 
     @Override
     public void run() throws Exception {
-        table.deleteTag(tagName);
+        String[] tagNames = tagNameStr.split(",");
+        for (String tagName : tagNames) {
+            table.deleteTag(tagName);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
index c4d04b5a1..56f585ea3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
@@ -40,10 +40,13 @@ public class DeleteBranchProcedure extends ProcedureBase {
         return IDENTIFIER;
     }
 
-    public String[] call(ProcedureContext procedureContext, String tableId, 
String branchName)
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String branchStr)
             throws Catalog.TableNotExistException {
-        Table table = catalog.getTable(Identifier.fromString(tableId));
-        table.deleteBranch(branchName);
+        String[] branchs = branchStr.split(",");
+        for (String branch : branchs) {
+            Table table = catalog.getTable(Identifier.fromString(tableId));
+            table.deleteBranch(branch);
+        }
 
         return new String[] {"Success"};
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
index 931448937..5f597ce48 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
@@ -35,10 +35,13 @@ public class DeleteTagProcedure extends ProcedureBase {
 
     public static final String IDENTIFIER = "delete_tag";
 
-    public String[] call(ProcedureContext procedureContext, String tableId, 
String tagName)
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String tagNameStr)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
-        table.deleteTag(tagName);
+        String[] tagNames = tagNameStr.split(",");
+        for (String tagName : tagNames) {
+            table.deleteTag(tagName);
+        }
 
         return new String[] {"Success"};
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 21a35964e..a83e618cc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -159,6 +159,27 @@ class BranchActionITCase extends ActionITCaseBase {
                         database, tableName));
         
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse();
 
+        // create branch1 and branch3
+        callProcedure(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 
'branch_name_with_snapshotId_1', 1)",
+                        database, tableName));
+        
assertThat(branchManager.branchExists("branch_name_with_snapshotId_1")).isTrue();
+
+        callProcedure(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 
'branch_name_with_snapshotId_3', 3)",
+                        database, tableName));
+        
assertThat(branchManager.branchExists("branch_name_with_snapshotId_3")).isTrue();
+
+        // delete branch1 and branch3 batch
+        callProcedure(
+                String.format(
+                        "CALL sys.delete_branch('%s.%s', 
'branch_name_with_snapshotId_1,branch_name_with_snapshotId_3')",
+                        database, tableName));
+        
assertThat(branchManager.branchExists("branch_name_with_snapshotId_1")).isFalse();
+        
assertThat(branchManager.branchExists("branch_name_with_snapshotId_3")).isFalse();
+
         createAction(
                         CreateBranchAction.class,
                         "create_branch",
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index 6ccbd6526..b873a1bb6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -111,6 +111,69 @@ public class TagActionITCase extends ActionITCaseBase {
                     String.format("CALL sys.delete_tag('%s.%s', 'tag2')", 
database, tableName));
         }
         assertThat(tagManager.tagExists("tag2")).isFalse();
+
+        // create tag1
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            createAction(
+                            CreateTagAction.class,
+                            "create_tag",
+                            "--warehouse",
+                            warehouse,
+                            "--database",
+                            database,
+                            "--table",
+                            tableName,
+                            "--tag_name",
+                            "tag1",
+                            "--snapshot",
+                            "1")
+                    .run();
+        } else {
+            callProcedure(
+                    String.format("CALL sys.create_tag('%s.%s', 'tag1', 1)", 
database, tableName));
+        }
+
+        // create tag3
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            createAction(
+                            CreateTagAction.class,
+                            "create_tag",
+                            "--warehouse",
+                            warehouse,
+                            "--database",
+                            database,
+                            "--table",
+                            tableName,
+                            "--tag_name",
+                            "tag3",
+                            "--snapshot",
+                            "3")
+                    .run();
+        } else {
+            callProcedure(
+                    String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", 
database, tableName));
+        }
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            createAction(
+                            DeleteTagAction.class,
+                            "delete_tag",
+                            "--warehouse",
+                            warehouse,
+                            "--database",
+                            database,
+                            "--table",
+                            tableName,
+                            "--tag_name",
+                            "tag1,tag3")
+                    .run();
+        } else {
+            callProcedure(
+                    String.format(
+                            "CALL sys.delete_tag('%s.%s', 'tag1,tag3')", 
database, tableName));
+        }
+        assertThat(tagManager.tagExists("tag1")).isFalse();
+        assertThat(tagManager.tagExists("tag3")).isFalse();
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
index 98d2a6ada..d5536250a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
@@ -60,12 +60,15 @@ public class DeleteBranchProcedure extends BaseProcedure {
     @Override
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-        String branch = args.getString(1);
+        String branchStr = args.getString(1);
+        String[] branches = branchStr.split(",");
 
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
-                    table.deleteBranch(branch);
+                    for (String branch : branches) {
+                        table.deleteBranch(branch);
+                    }
                     InternalRow outputRow = newInternalRow(true);
                     return new InternalRow[] {outputRow};
                 });
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
index 92f766a44..59a4b510f 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
@@ -68,7 +68,7 @@ class CreateAndDeleteBranchProcedureTest extends 
PaimonSparkTestBase with Stream
             stream.processAllAvailable()
             checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
 
-            // create tag
+            // create tags
             checkAnswer(
               spark.sql(
                 "CALL paimon.sys.create_tag(table => 'test.T', tag => 
'test_tag', snapshot => 2)"),
@@ -106,6 +106,29 @@ class CreateAndDeleteBranchProcedureTest extends 
PaimonSparkTestBase with Stream
                 "CALL paimon.sys.delete_branch(table => 'test.T', branch => 
'test_branch')"),
               Row(true) :: Nil)
             assert(!branchManager.branchExists("test_branch"))
+
+            // create branch with snapshot2
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.create_branch(table => 'test.T', branch => 
'snapshot_branch_2', snapshot => 2)"),
+              Row(true) :: Nil)
+            assert(branchManager.branchExists("snapshot_branch_2"))
+
+            // create branch with snapshot3
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.create_branch(table => 'test.T', branch => 
'snapshot_branch_3', snapshot => 3)"),
+              Row(true) :: Nil)
+            assert(branchManager.branchExists("snapshot_branch_3"))
+
+            // delete branch:snapshot_branch_2 and snapshot_branch_3
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.delete_branch(table => 'test.T', branch => 
'snapshot_branch_2,snapshot_branch_3')"),
+              Row(true) :: Nil)
+            assert(!branchManager.branchExists("snapshot_branch_2"))
+            assert(!branchManager.branchExists("snapshot_branch_3"))
+
           } finally {
             stream.stop()
           }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index 4351abd3f..296680919 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -92,6 +92,32 @@ class CreateAndDeleteTagProcedureTest extends 
PaimonSparkTestBase with StreamTes
                 "CALL paimon.sys.delete_tag(table => 'test.T', tag => 
'test_latestSnapshot_tag')"),
               Row(true) :: Nil)
             checkAnswer(spark.sql("SELECT tag_name FROM 
paimon.test.`T$tags`"), Nil)
+
+            // create test_tag_1 and test_tag_2
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.create_tag(" +
+                  "table => 'test.T', tag => 'test_tag_1', snapshot => 1)"),
+              Row(true) :: Nil)
+
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.create_tag(" +
+                  "table => 'test.T', tag => 'test_tag_2', snapshot => 2)"),
+              Row(true) :: Nil)
+
+            checkAnswer(
+              spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
+              Row("test_tag_1") :: Row("test_tag_2") :: Nil)
+
+            // delete test_tag_1 and test_tag_2
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.delete_tag(table => 'test.T', tag => 
'test_tag_1,test_tag_2')"),
+              Row(true) :: Nil)
+
+            checkAnswer(spark.sql("SELECT tag_name FROM 
paimon.test.`T$tags`"), Nil)
+
           } finally {
             stream.stop()
           }

Reply via email to