This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 631b13ad23d [improve][client] PIP-313 Support force unsubscribe using 
consumer api (#21687)
631b13ad23d is described below

commit 631b13ad23d7e48c6e82d38f97c23d129062cb7c
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Dec 18 21:23:18 2023 -0800

    [improve][client] PIP-313 Support force unsubscribe using consumer api 
(#21687)
    
    Co-authored-by: Jiwe Guo <techno...@apache.org>
---
 .../org/apache/pulsar/broker/service/Consumer.java |  4 +--
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../apache/pulsar/broker/service/Subscription.java |  2 ++
 .../nonpersistent/NonPersistentSubscription.java   | 17 ++++++++--
 .../service/persistent/PersistentSubscription.java | 20 ++++++++++--
 .../client/impl/BrokerClientIntegrationTest.java   | 37 ++++++++++++++++++++++
 .../org/apache/pulsar/client/api/Consumer.java     | 25 +++++++++++++++
 .../pulsar/client/api/PulsarClientException.java   |  4 +++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 14 ++++++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  6 ++--
 .../client/impl/MultiTopicsConsumerImpl.java       |  4 +--
 .../apache/pulsar/common/protocol/Commands.java    |  5 +--
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 13 files changed, 125 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 5ec76d07feb..83dcd8d6c16 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -421,8 +421,8 @@ public class Consumer {
         }
     }
 
-    public void doUnsubscribe(final long requestId) {
-        subscription.doUnsubscribe(this).thenAccept(v -> {
+    public void doUnsubscribe(final long requestId, boolean force) {
+        subscription.doUnsubscribe(this, force).thenAccept(v -> {
             log.info("Unsubscribed successfully from {}", subscription);
             cnx.removedConsumer(this);
             cnx.getCommandSender().sendSuccessResponse(requestId);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2baa55b80e7..9f2b98aeb40 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1958,7 +1958,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         CompletableFuture<Consumer> consumerFuture = 
consumers.get(unsubscribe.getConsumerId());
 
         if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
-            
consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId());
+            
consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId(), 
unsubscribe.isForce());
         } else {
             commandSender.sendErrorResponse(unsubscribe.getRequestId(), 
ServerError.MetadataError,
                     "Consumer not found");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 6805d197521..61107b7b0db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -75,6 +75,8 @@ public interface Subscription extends MessageExpirer {
 
     CompletableFuture<Void> doUnsubscribe(Consumer consumer);
 
+    CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean 
forcefully);
+
     CompletableFuture<Void> clearBacklog();
 
     CompletableFuture<Void> skipMessages(int numMessagesToSkip);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 28ea9f39ac8..92aba6221da 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -429,11 +429,24 @@ public class NonPersistentSubscription extends 
AbstractSubscription implements S
      */
     @Override
     public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
+        return doUnsubscribe(consumer, false);
+    }
+
+    /**
+     * Handle unsubscribe command from the client API Check with the 
dispatcher is this consumer can proceed with
+     * unsubscribe.
+     *
+     * @param consumer consumer object that is initiating the unsubscribe 
operation
+     * @param force unsubscribe forcefully by disconnecting consumers and 
closing subscription
+     * @return CompletableFuture indicating the completion of ubsubscribe 
operation
+     */
+    @Override
+    public CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean 
force) {
         CompletableFuture<Void> future = new CompletableFuture<>();
         try {
-            if (dispatcher.canUnsubscribe(consumer)) {
+            if (force || dispatcher.canUnsubscribe(consumer)) {
                 consumer.close();
-                return delete();
+                return delete(force);
             }
             future.completeExceptionally(
                     new ServerMetadataException("Unconnected or shared 
consumer attempting to unsubscribe"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 86e3558f550..dc79146110f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1074,11 +1074,27 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
      */
     @Override
     public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
+        return doUnsubscribe(consumer, false);
+    }
+
+    /**
+     * Handle unsubscribe command from the client API Check with the 
dispatcher is this consumer can proceed with
+     * unsubscribe.
+     *
+     * @param consumer consumer object that is initiating the unsubscribe 
operation
+     * @param force unsubscribe forcefully by disconnecting consumers and 
closing subscription
+     * @return CompletableFuture indicating the completion of unsubscribe 
operation
+     */
+    @Override
+    public CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean 
force) {
         CompletableFuture<Void> future = new CompletableFuture<>();
         try {
-            if (dispatcher.canUnsubscribe(consumer)) {
+            if (force || dispatcher.canUnsubscribe(consumer)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] unsubscribing forcefully {}-{}", 
topicName, subName, consumer.consumerName());
+                }
                 consumer.close();
-                return delete();
+                return delete(force);
             }
             future.completeExceptionally(
                     new ServerMetadataException("Unconnected or shared 
consumer attempting to unsubscribe"));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 0395c59d583..c2715de986a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -1073,4 +1073,41 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         });
     }
 
+    @Test
+    public void testSharedConsumerUnsubscribe() throws Exception {
+        String topic = "persistent://my-property/my-ns/sharedUnsubscribe";
+        String sub = "my-subscriber-name";
+        @Cleanup
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(sub).subscribe();
+        @Cleanup
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(sub).subscribe();
+        try {
+            consumer1.unsubscribe();
+            fail("should have failed as consumer-2 is already connected");
+        } catch (Exception e) {
+            // Ok
+        }
+
+        consumer1.unsubscribe(true);
+        try {
+            consumer2.unsubscribe(true);
+        } catch (PulsarClientException.NotConnectedException e) {
+            // Ok. consumer-2 is already disconnected with force unsubscription
+        }
+        assertFalse(consumer1.isConnected());
+        assertFalse(consumer2.isConnected());
+    }
+
+    @Test(dataProvider = "subType")
+    public void testUnsubscribeForce(SubscriptionType type) throws Exception {
+        String topic = "persistent://my-property/my-ns/sharedUnsubscribe";
+        String sub = "my-subscriber-name";
+        @Cleanup
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topic).subscriptionType(type)
+                .subscriptionName(sub).subscribe();
+        consumer1.unsubscribe(true);
+        assertFalse(consumer1.isConnected());
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index c67ad08c836..d24d674c018 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -73,6 +73,31 @@ public interface Consumer<T> extends Closeable, 
MessageAcknowledger {
      */
     CompletableFuture<Void> unsubscribeAsync();
 
+
+    /**
+     * Unsubscribe the consumer.
+     *
+     * <p>This call blocks until the consumer is unsubscribed.
+     *
+     * <p>Unsubscribing will the subscription to be deleted and all the
+     * data retained can potentially be deleted as well.
+     *
+     * <p>The operation will fail when performed on a shared subscription
+     * where multiple consumers are currently connected.
+     *
+     * @param force forcefully unsubscribe by disconnecting connected 
consumers.
+     * @throws PulsarClientException if the operation fails
+     */
+    void unsubscribe(boolean force) throws PulsarClientException;
+
+    /**
+     * Asynchronously unsubscribe the consumer.
+     *
+     * @see Consumer#unsubscribe()
+     * @param force forcefully unsubscribe by disconnecting connected 
consumers.
+     * @return {@link CompletableFuture} to track the operation
+     */
+    CompletableFuture<Void> unsubscribeAsync(boolean force);
     /**
      * Receives a single message.
      *
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 9409eefe2e0..007308ec7ab 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -658,6 +658,10 @@ public class PulsarClientException extends IOException {
         public NotConnectedException(long sequenceId) {
             super("Not connected to broker", sequenceId);
         }
+
+        public NotConnectedException(String msg) {
+            super(msg);
+        }
     }
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 6e27701fcea..4f29c0aa76c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -711,8 +711,13 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
 
     @Override
     public void unsubscribe() throws PulsarClientException {
+        unsubscribe(false);
+    }
+
+    @Override
+    public void unsubscribe(boolean force) throws PulsarClientException {
         try {
-            unsubscribeAsync().get();
+            unsubscribeAsync(force).get();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw PulsarClientException.unwrap(e);
@@ -722,7 +727,12 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     }
 
     @Override
-    public abstract CompletableFuture<Void> unsubscribeAsync();
+    public CompletableFuture<Void> unsubscribeAsync() {
+        return unsubscribeAsync(false);
+    }
+
+    @Override
+    public abstract CompletableFuture<Void> unsubscribeAsync(boolean force);
 
     @Override
     public void close() throws PulsarClientException {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e7be0b2dbd4..b43cd79959c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -404,7 +404,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
-    public CompletableFuture<Void> unsubscribeAsync() {
+    public CompletableFuture<Void> unsubscribeAsync(boolean force) {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil
                     .failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
@@ -413,7 +413,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (isConnected()) {
             setState(State.Closing);
             long requestId = client.newRequestId();
-            ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId, 
requestId);
+            ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId, 
requestId, force);
             ClientCnx cnx = cnx();
             cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
                 closeConsumerTasks();
@@ -433,7 +433,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             });
         } else {
             unsubscribeFuture.completeExceptionally(
-                new PulsarClientException(
+                new PulsarClientException.NotConnectedException(
                     String.format("The client is not connected to the broker 
when unsubscribing the "
                             + "subscription %s of the topic %s", subscription, 
topicName.toString())));
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8a515a9f9b8..6ba3aaaaa46 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -559,7 +559,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     @Override
-    public CompletableFuture<Void> unsubscribeAsync() {
+    public CompletableFuture<Void> unsubscribeAsync(boolean force) {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil.failedFuture(
                     new PulsarClientException.AlreadyClosedException("Topics 
Consumer was already closed"));
@@ -568,7 +568,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futureList = consumers.values().stream()
-            .map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList());
+            .map(c -> c.unsubscribeAsync(force)).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
             .thenComposeAsync((r) -> {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e715173be52..34d47e2836b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -698,11 +698,12 @@ public class Commands {
         }
     }
 
-    public static ByteBuf newUnsubscribe(long consumerId, long requestId) {
+    public static ByteBuf newUnsubscribe(long consumerId, long requestId, 
boolean force) {
         BaseCommand cmd = localCmd(Type.UNSUBSCRIBE);
         cmd.setUnsubscribe()
                 .setConsumerId(consumerId)
-                .setRequestId(requestId);
+                .setRequestId(requestId)
+                .setForce(force);
         return serializeWithSize(cmd);
     }
 
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 819c6dfd594..387e4e3ff67 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -607,6 +607,7 @@ message CommandFlow {
 message CommandUnsubscribe {
     required uint64 consumer_id = 1;
     required uint64 request_id  = 2;
+    optional bool force         = 3 [default = false];
 }
 
 // Reset an existing consumer to a particular message id

Reply via email to