This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f49afd862 [core] Reset consumer action supports delete a consumer with 
a given consumer ID. (#2106)
f49afd862 is described below

commit f49afd862192c65c3d80fe6638f54f4ed5a1f2d4
Author: Kerwin <[email protected]>
AuthorDate: Tue Oct 10 15:18:49 2023 +0800

    [core] Reset consumer action supports delete a consumer with a given 
consumer ID. (#2106)
---
 docs/content/how-to/querying-tables.md             |  6 ++++--
 .../java/org/apache/paimon/utils/StringUtils.java  |  6 +++---
 .../apache/paimon/consumer/ConsumerManager.java    |  4 ++++
 .../paimon/flink/action/ResetConsumerAction.java   | 16 +++++++++++----
 .../flink/action/ResetConsumerActionFactory.java   | 23 ++++++++++++----------
 .../flink/procedure/ResetConsumerProcedure.java    | 17 +++++++++++++++-
 .../flink/procedure/RollbackToProcedure.java       |  2 +-
 .../paimon/flink/action/ConsumerActionITCase.java  | 15 ++++++++++++--
 8 files changed, 66 insertions(+), 23 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 5e2abd0ec..286103d12 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -303,7 +303,7 @@ NOTE: The consumer will prevent expiration of the snapshot. 
You can specify 'con
 lifetime of consumers.
 {{< /hint >}}
 
-You can reset a consumer with a given consumer ID and next snapshot ID.
+You can reset a consumer with a given consumer ID and next snapshot ID and 
delete a consumer with a given consumer ID.
 
 {{< hint info >}}
 First, you need to stop the streaming task using this consumer ID, and then 
execute the reset consumer action job.
@@ -323,10 +323,12 @@ Run the following command:
     --database <database-name> \ 
     --table <table-name> \
     --consumer-id <consumer-id> \
-    --next-snapshot <next-snapshot-id> \
+    [--next-snapshot <next-snapshot-id>] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]]
 ```
 
+please don't specify --next-snapshot parameter if you want to delete the 
consumer.
+
 {{< /tab >}}
 
 {{< /tabs >}}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index 9bd5e3d9e..adc4ce553 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -93,7 +93,7 @@ public class StringUtils {
      * @return True, if the string is null or blank, false otherwise.
      */
     public static boolean isNullOrWhitespaceOnly(String str) {
-        if (str == null || str.length() == 0) {
+        if (str == null || str.isEmpty()) {
             return true;
         }
 
@@ -302,8 +302,8 @@ public class StringUtils {
         }
         final int replLength = searchString.length();
         int increase = replacement.length() - replLength;
-        increase = increase < 0 ? 0 : increase;
-        increase *= max < 0 ? 16 : max > 64 ? 64 : max;
+        increase = Math.max(increase, 0);
+        increase *= max < 0 ? 16 : Math.min(max, 64);
         final StringBuilder buf = new StringBuilder(text.length() + increase);
         while (end != INDEX_NOT_FOUND) {
             buf.append(text, start, end).append(replacement);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java 
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index bf24dc753..b788c4a28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -68,6 +68,10 @@ public class ConsumerManager implements Serializable {
         }
     }
 
+    public void deleteConsumer(String consumerId) {
+        fileIO.deleteQuietly(consumerPath(consumerId));
+    }
+
     public OptionalLong minNextSnapshot() {
         try {
             return listOriginalVersionedFiles(fileIO, consumerDirectory(), 
CONSUMER_PREFIX)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
index 8a6507fbb..d4d13cc2e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
@@ -23,23 +23,27 @@ import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.table.FileStoreTable;
 
 import java.util.Map;
+import java.util.Objects;
 
 /** Reset consumer action for Flink. */
 public class ResetConsumerAction extends TableActionBase {
 
     private final String consumerId;
-    private final long nextSnapshotId;
+    private Long nextSnapshotId;
 
     protected ResetConsumerAction(
             String warehouse,
             String databaseName,
             String tableName,
             Map<String, String> catalogConfig,
-            String consumerId,
-            long nextSnapshotId) {
+            String consumerId) {
         super(warehouse, databaseName, tableName, catalogConfig);
         this.consumerId = consumerId;
+    }
+
+    public ResetConsumerAction withNextSnapshotIds(Long nextSnapshotId) {
         this.nextSnapshotId = nextSnapshotId;
+        return this;
     }
 
     @Override
@@ -47,6 +51,10 @@ public class ResetConsumerAction extends TableActionBase {
         FileStoreTable dataTable = (FileStoreTable) table;
         ConsumerManager consumerManager =
                 new ConsumerManager(dataTable.fileIO(), dataTable.location());
-        consumerManager.resetConsumer(consumerId, new 
Consumer(nextSnapshotId));
+        if (Objects.isNull(nextSnapshotId)) {
+            consumerManager.deleteConsumer(consumerId);
+        } else {
+            consumerManager.resetConsumer(consumerId, new 
Consumer(nextSnapshotId));
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
index 788cc519b..d8e04bf0e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
@@ -37,34 +37,37 @@ public class ResetConsumerActionFactory implements 
ActionFactory {
     @Override
     public Optional<Action> create(MultipleParameterTool params) {
         checkRequiredArgument(params, "consumer-id");
-        checkRequiredArgument(params, "next-snapshot");
 
         Tuple3<String, String, String> tablePath = getTablePath(params);
         Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
         String consumerId = params.get("consumer-id");
-        long nextSnapshotId = Long.parseLong(params.get("next-snapshot"));
 
         ResetConsumerAction action =
                 new ResetConsumerAction(
-                        tablePath.f0,
-                        tablePath.f1,
-                        tablePath.f2,
-                        catalogConfig,
-                        consumerId,
-                        nextSnapshotId);
+                        tablePath.f0, tablePath.f1, tablePath.f2, 
catalogConfig, consumerId);
+
+        if (params.has("next-snapshot")) {
+            
action.withNextSnapshotIds(Long.parseLong(params.get("next-snapshot")));
+        }
+
         return Optional.of(action);
     }
 
     @Override
     public void printHelp() {
         System.out.println(
-                "Action \"reset-consumer\" reset a consumer from the given 
next snapshot.");
+                "Action \"reset-consumer\" reset a consumer with a given 
consumer ID and next snapshot ID and delete a consumer with a given consumer 
ID.");
         System.out.println();
 
         System.out.println("Syntax:");
         System.out.println(
                 "  reset-consumer --warehouse <warehouse-path> --database 
<database-name> "
-                        + "--table <table-name> --consumer-id <consumer-id> 
--next-snapshot <next-snapshot-id>");
+                        + "--table <table-name> --consumer-id <consumer-id> 
[--next-snapshot <next-snapshot-id>]");
+
+        System.out.println();
+        System.out.println("Note:");
+        System.out.println(
+                "  please don't specify --next-snapshot parameter if you want 
to delete the consumer.");
         System.out.println();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
index 70a409b0b..8b6b4ee5b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -27,10 +27,14 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.flink.table.procedure.ProcedureContext;
 
 /**
- * Drop partition procedure. Usage:
+ * Reset consumer procedure. Usage:
  *
  * <pre><code>
+ *  -- reset the new next snapshot id in the consumer
  *  CALL reset_consumer('tableId', 'consumerId', nextSnapshotId)
+ *
+ *  -- delete consumer
+ *  CALL reset_consumer('tableId', 'consumerId')
  * </code></pre>
  */
 public class ResetConsumerProcedure extends ProcedureBase {
@@ -55,4 +59,15 @@ public class ResetConsumerProcedure extends ProcedureBase {
 
         return new String[] {"Success"};
     }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String consumerId)
+            throws Catalog.TableNotExistException {
+        FileStoreTable fileStoreTable =
+                (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
+        ConsumerManager consumerManager =
+                new ConsumerManager(fileStoreTable.fileIO(), 
fileStoreTable.location());
+        consumerManager.deleteConsumer(consumerId);
+
+        return new String[] {"Success"};
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
index b8d6dd38c..a59dd238f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -25,7 +25,7 @@ import org.apache.paimon.table.Table;
 import org.apache.flink.table.procedure.ProcedureContext;
 
 /**
- * Drop partition procedure. Usage:
+ * Rollback procedure. Usage:
  *
  * <pre><code>
  *  -- rollback to a snapshot
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 444d2f71e..c3aee72c5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -82,8 +82,8 @@ public class ConsumerActionITCase extends ActionITCaseBase {
 
         // reset consumer
         if (ThreadLocalRandom.current().nextBoolean()) {
-            new ResetConsumerAction(
-                            warehouse, database, tableName, 
Collections.emptyMap(), "myid", 1)
+            new ResetConsumerAction(warehouse, database, tableName, 
Collections.emptyMap(), "myid")
+                    .withNextSnapshotIds(1L)
                     .run();
         } else {
             callProcedure(
@@ -92,5 +92,16 @@ public class ConsumerActionITCase extends ActionITCaseBase {
         Optional<Consumer> consumer2 = consumerManager.consumer("myid");
         assertThat(consumer2).isPresent();
         assertThat(consumer2.get().nextSnapshot()).isEqualTo(1);
+
+        // delete consumer
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            new ResetConsumerAction(warehouse, database, tableName, 
Collections.emptyMap(), "myid")
+                    .run();
+        } else {
+            callProcedure(
+                    String.format("CALL reset_consumer('%s.%s', 'myid')", 
database, tableName));
+        }
+        Optional<Consumer> consumer3 = consumerManager.consumer("myid");
+        assertThat(consumer3).isNotPresent();
     }
 }

Reply via email to