This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 631b13ad23d [improve][client] PIP-313 Support force unsubscribe using consumer api (#21687) 631b13ad23d is described below commit 631b13ad23d7e48c6e82d38f97c23d129062cb7c Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Dec 18 21:23:18 2023 -0800 [improve][client] PIP-313 Support force unsubscribe using consumer api (#21687) Co-authored-by: Jiwe Guo <techno...@apache.org> --- .../org/apache/pulsar/broker/service/Consumer.java | 4 +-- .../apache/pulsar/broker/service/ServerCnx.java | 2 +- .../apache/pulsar/broker/service/Subscription.java | 2 ++ .../nonpersistent/NonPersistentSubscription.java | 17 ++++++++-- .../service/persistent/PersistentSubscription.java | 20 ++++++++++-- .../client/impl/BrokerClientIntegrationTest.java | 37 ++++++++++++++++++++++ .../org/apache/pulsar/client/api/Consumer.java | 25 +++++++++++++++ .../pulsar/client/api/PulsarClientException.java | 4 +++ .../apache/pulsar/client/impl/ConsumerBase.java | 14 ++++++-- .../apache/pulsar/client/impl/ConsumerImpl.java | 6 ++-- .../client/impl/MultiTopicsConsumerImpl.java | 4 +-- .../apache/pulsar/common/protocol/Commands.java | 5 +-- pulsar-common/src/main/proto/PulsarApi.proto | 1 + 13 files changed, 125 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 5ec76d07feb..83dcd8d6c16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -421,8 +421,8 @@ public class Consumer { } } - public void doUnsubscribe(final long requestId) { - subscription.doUnsubscribe(this).thenAccept(v -> { + public void doUnsubscribe(final long requestId, boolean force) { + subscription.doUnsubscribe(this, force).thenAccept(v -> { log.info("Unsubscribed successfully from {}", subscription); cnx.removedConsumer(this); cnx.getCommandSender().sendSuccessResponse(requestId); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2baa55b80e7..9f2b98aeb40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1958,7 +1958,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { CompletableFuture<Consumer> consumerFuture = consumers.get(unsubscribe.getConsumerId()); if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { - consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId()); + consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId(), unsubscribe.isForce()); } else { commandSender.sendErrorResponse(unsubscribe.getRequestId(), ServerError.MetadataError, "Consumer not found"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 6805d197521..61107b7b0db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -75,6 +75,8 @@ public interface Subscription extends MessageExpirer { CompletableFuture<Void> doUnsubscribe(Consumer consumer); + CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean forcefully); + CompletableFuture<Void> clearBacklog(); CompletableFuture<Void> skipMessages(int numMessagesToSkip); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 28ea9f39ac8..92aba6221da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -429,11 +429,24 @@ public class NonPersistentSubscription extends AbstractSubscription implements S */ @Override public CompletableFuture<Void> doUnsubscribe(Consumer consumer) { + return doUnsubscribe(consumer, false); + } + + /** + * Handle unsubscribe command from the client API Check with the dispatcher is this consumer can proceed with + * unsubscribe. + * + * @param consumer consumer object that is initiating the unsubscribe operation + * @param force unsubscribe forcefully by disconnecting consumers and closing subscription + * @return CompletableFuture indicating the completion of ubsubscribe operation + */ + @Override + public CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean force) { CompletableFuture<Void> future = new CompletableFuture<>(); try { - if (dispatcher.canUnsubscribe(consumer)) { + if (force || dispatcher.canUnsubscribe(consumer)) { consumer.close(); - return delete(); + return delete(force); } future.completeExceptionally( new ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 86e3558f550..dc79146110f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1074,11 +1074,27 @@ public class PersistentSubscription extends AbstractSubscription implements Subs */ @Override public CompletableFuture<Void> doUnsubscribe(Consumer consumer) { + return doUnsubscribe(consumer, false); + } + + /** + * Handle unsubscribe command from the client API Check with the dispatcher is this consumer can proceed with + * unsubscribe. + * + * @param consumer consumer object that is initiating the unsubscribe operation + * @param force unsubscribe forcefully by disconnecting consumers and closing subscription + * @return CompletableFuture indicating the completion of unsubscribe operation + */ + @Override + public CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean force) { CompletableFuture<Void> future = new CompletableFuture<>(); try { - if (dispatcher.canUnsubscribe(consumer)) { + if (force || dispatcher.canUnsubscribe(consumer)) { + if (log.isDebugEnabled()) { + log.debug("[{}] unsubscribing forcefully {}-{}", topicName, subName, consumer.consumerName()); + } consumer.close(); - return delete(); + return delete(force); } future.completeExceptionally( new ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index 0395c59d583..c2715de986a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -1073,4 +1073,41 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { }); } + @Test + public void testSharedConsumerUnsubscribe() throws Exception { + String topic = "persistent://my-property/my-ns/sharedUnsubscribe"; + String sub = "my-subscriber-name"; + @Cleanup + Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared) + .subscriptionName(sub).subscribe(); + @Cleanup + Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared) + .subscriptionName(sub).subscribe(); + try { + consumer1.unsubscribe(); + fail("should have failed as consumer-2 is already connected"); + } catch (Exception e) { + // Ok + } + + consumer1.unsubscribe(true); + try { + consumer2.unsubscribe(true); + } catch (PulsarClientException.NotConnectedException e) { + // Ok. consumer-2 is already disconnected with force unsubscription + } + assertFalse(consumer1.isConnected()); + assertFalse(consumer2.isConnected()); + } + + @Test(dataProvider = "subType") + public void testUnsubscribeForce(SubscriptionType type) throws Exception { + String topic = "persistent://my-property/my-ns/sharedUnsubscribe"; + String sub = "my-subscriber-name"; + @Cleanup + Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionType(type) + .subscriptionName(sub).subscribe(); + consumer1.unsubscribe(true); + assertFalse(consumer1.isConnected()); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index c67ad08c836..d24d674c018 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -73,6 +73,31 @@ public interface Consumer<T> extends Closeable, MessageAcknowledger { */ CompletableFuture<Void> unsubscribeAsync(); + + /** + * Unsubscribe the consumer. + * + * <p>This call blocks until the consumer is unsubscribed. + * + * <p>Unsubscribing will the subscription to be deleted and all the + * data retained can potentially be deleted as well. + * + * <p>The operation will fail when performed on a shared subscription + * where multiple consumers are currently connected. + * + * @param force forcefully unsubscribe by disconnecting connected consumers. + * @throws PulsarClientException if the operation fails + */ + void unsubscribe(boolean force) throws PulsarClientException; + + /** + * Asynchronously unsubscribe the consumer. + * + * @see Consumer#unsubscribe() + * @param force forcefully unsubscribe by disconnecting connected consumers. + * @return {@link CompletableFuture} to track the operation + */ + CompletableFuture<Void> unsubscribeAsync(boolean force); /** * Receives a single message. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 9409eefe2e0..007308ec7ab 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -658,6 +658,10 @@ public class PulsarClientException extends IOException { public NotConnectedException(long sequenceId) { super("Not connected to broker", sequenceId); } + + public NotConnectedException(String msg) { + super(msg); + } } /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 6e27701fcea..4f29c0aa76c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -711,8 +711,13 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T @Override public void unsubscribe() throws PulsarClientException { + unsubscribe(false); + } + + @Override + public void unsubscribe(boolean force) throws PulsarClientException { try { - unsubscribeAsync().get(); + unsubscribeAsync(force).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw PulsarClientException.unwrap(e); @@ -722,7 +727,12 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } @Override - public abstract CompletableFuture<Void> unsubscribeAsync(); + public CompletableFuture<Void> unsubscribeAsync() { + return unsubscribeAsync(false); + } + + @Override + public abstract CompletableFuture<Void> unsubscribeAsync(boolean force); @Override public void close() throws PulsarClientException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e7be0b2dbd4..b43cd79959c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -404,7 +404,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } @Override - public CompletableFuture<Void> unsubscribeAsync() { + public CompletableFuture<Void> unsubscribeAsync(boolean force) { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil .failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); @@ -413,7 +413,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (isConnected()) { setState(State.Closing); long requestId = client.newRequestId(); - ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId, requestId); + ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId, requestId, force); ClientCnx cnx = cnx(); cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> { closeConsumerTasks(); @@ -433,7 +433,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle }); } else { unsubscribeFuture.completeExceptionally( - new PulsarClientException( + new PulsarClientException.NotConnectedException( String.format("The client is not connected to the broker when unsubscribing the " + "subscription %s of the topic %s", subscription, topicName.toString()))); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 8a515a9f9b8..6ba3aaaaa46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -559,7 +559,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } @Override - public CompletableFuture<Void> unsubscribeAsync() { + public CompletableFuture<Void> unsubscribeAsync(boolean force) { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil.failedFuture( new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed")); @@ -568,7 +568,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>(); List<CompletableFuture<Void>> futureList = consumers.values().stream() - .map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList()); + .map(c -> c.unsubscribeAsync(force)).collect(Collectors.toList()); FutureUtil.waitForAll(futureList) .thenComposeAsync((r) -> { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index e715173be52..34d47e2836b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -698,11 +698,12 @@ public class Commands { } } - public static ByteBuf newUnsubscribe(long consumerId, long requestId) { + public static ByteBuf newUnsubscribe(long consumerId, long requestId, boolean force) { BaseCommand cmd = localCmd(Type.UNSUBSCRIBE); cmd.setUnsubscribe() .setConsumerId(consumerId) - .setRequestId(requestId); + .setRequestId(requestId) + .setForce(force); return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 819c6dfd594..387e4e3ff67 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -607,6 +607,7 @@ message CommandFlow { message CommandUnsubscribe { required uint64 consumer_id = 1; required uint64 request_id = 2; + optional bool force = 3 [default = false]; } // Reset an existing consumer to a particular message id