This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 6580d14  [Issue 6043] Support force deleting subscription
6580d14 is described below

commit 6580d146b0bb85f37ef73956f2dac3229e874aeb
Author: Fangbin Sun <[email protected]>
AuthorDate: Wed Mar 25 09:42:19 2020 +0800

    [Issue 6043] Support force deleting subscription
    
    Signed-off-by: xiaolong.ran <[email protected]>
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 91 +++++++++++++++++++++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 10 ++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  7 +-
 .../apache/pulsar/broker/service/Subscription.java |  2 +
 .../nonpersistent/NonPersistentSubscription.java   | 43 +++++++++-
 .../service/persistent/PersistentSubscription.java | 43 +++++++++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 50 ++++++++++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  2 +-
 .../org/apache/pulsar/client/admin/Topics.java     | 43 ++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 20 +++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  4 +-
 .../pulsar/admin/cli/CmdPersistentTopics.java      |  6 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  6 +-
 site2/docs/reference-pulsar-admin.md               |  1 +
 14 files changed, 315 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0322820..273901b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1045,6 +1045,14 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
+    protected void internalDeleteSubscription(AsyncResponse asyncResponse, 
String subName, boolean authoritative, boolean force) {
+        if (force) {
+            internalDeleteSubscriptionForcefully(asyncResponse, subName, 
authoritative);
+        } else {
+            internalDeleteSubscription(asyncResponse, subName, authoritative);
+        }
+    }
+
     protected void internalDeleteSubscription(AsyncResponse asyncResponse, 
String subName, boolean authoritative) {
         if (topicName.isGlobal()) {
             try {
@@ -1067,7 +1075,7 @@ public class PersistentTopicsBase extends AdminResource {
                         TopicName topicNamePartition = 
topicName.getPartition(i);
                         try {
                             futures.add(pulsar().getAdminClient().topics()
-                                    
.deleteSubscriptionAsync(topicNamePartition.toString(), subName));
+                                    
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
                         } catch (Exception e) {
                             log.error("[{}] Failed to delete subscription {} 
{}", clientAppId(), topicNamePartition, subName,
                                     e);
@@ -1133,6 +1141,87 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected void internalDeleteSubscriptionForcefully(AsyncResponse 
asyncResponse, String subName, boolean authoritative) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to delete subscription forcefully {} 
from topic {}", clientAppId(), subName, topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, 
subName, authoritative);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics()
+                                
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to delete subscription 
forcefully {} {}", clientAppId(), topicNamePartition, subName,
+                                e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+
+                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                                return null;
+                            } else {
+                                log.error("[{}] Failed to delete subscription 
forcefully {} {}", clientAppId(), topicName, subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
+                        }
+
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, 
subName, authoritative);
+                }
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to delete subscription forcefully {} 
from topic {}", clientAppId(), subName, topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        }
+    }
+
+    private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse 
asyncResponse, String subName, boolean authoritative) {
+        try {
+            validateAdminAccessForSubscriber(subName, authoritative);
+            Topic topic = getTopicReference(topicName);
+            Subscription sub = topic.getSubscription(subName);
+            if (sub == null) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
+                return;
+            }
+            sub.deleteForcefully().get();
+            log.info("[{}][{}] Deleted subscription forcefully {}", 
clientAppId(), topicName, subName);
+            asyncResponse.resume(Response.noContent().build());
+        } catch (Exception e) {
+            log.error("[{}] Failed to delete subscription forcefully {} from 
topic {}", clientAppId(), subName, topicName, e);
+            if (e instanceof WebApplicationException) {
+                asyncResponse.resume(e);
+            } else {
+                log.error("[{}] Failed to delete subscription forcefully {} 
{}", clientAppId(), topicName, subName, e);
+                asyncResponse.resume(new RestException(e));
+            }
+        }
+    }
+
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {
         if (topicName.isGlobal()) {
             try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 027e7bb..9c2cde1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -343,17 +343,21 @@ public class PersistentTopics extends 
PersistentTopicsBase {
 
     @DELETE
     @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
-    @ApiOperation(hidden = true, value = "Delete a subscription.", notes = 
"There should not be any active consumers on the subscription.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+    @ApiOperation(hidden = true, value = "Delete a subscription.", notes = 
"The subscription cannot be deleted if delete is not forcefully and there are 
any active consumers attached to it. "
+            + "Force delete ignores connected consumers and deletes 
subscription by explicitly closing them.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 412, message = "Subscription has active 
consumers") })
     public void deleteSubscription(@Suspended final AsyncResponse 
asyncResponse, @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
             @PathParam("topic") @Encoded String encodedTopic, 
@PathParam("subName") String encodedSubName,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalDeleteSubscription(asyncResponse, decode(encodedSubName), 
authoritative);
+            internalDeleteSubscription(asyncResponse, decode(encodedSubName), 
authoritative, force);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6f187ff..ab05c06 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -589,7 +589,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
-    @ApiOperation(value = "Delete a subscription.", notes = "There should not 
be any active consumers on the subscription.")
+    @ApiOperation(value = "Delete a subscription.", notes = "The subscription 
cannot be deleted if delete is not forcefully and there are any active 
consumers attached to it. "
+            + "Force delete ignores connected consumers and deletes 
subscription by explicitly closing them.")
     @ApiResponses(value = {
             @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
@@ -607,11 +608,13 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Subscription to be deleted")
             @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Disconnect and close all consumers and delete 
subscription forcefully", defaultValue = "false", type = "boolean")
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
-            internalDeleteSubscription(asyncResponse, decode(encodedSubName), 
authoritative);
+            internalDeleteSubscription(asyncResponse, decode(encodedSubName), 
authoritative, force);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index f7d9687..85b5415 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -65,6 +65,8 @@ public interface Subscription {
 
     CompletableFuture<Void> delete();
 
+    CompletableFuture<Void> deleteForcefully();
+
     CompletableFuture<Void> disconnect();
 
     CompletableFuture<Void> doUnsubscribe(Consumer consumer);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 316024a..d3a6197 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -304,12 +304,53 @@ public class NonPersistentSubscription implements 
Subscription {
      */
     @Override
     public CompletableFuture<Void> delete() {
+        return delete(false);
+    }
+
+    /**
+     * Forcefully close all consumers and deletes the subscription.
+     * @return
+     */
+    @Override
+    public CompletableFuture<Void> deleteForcefully() {
+        return delete(true);
+    }
+
+    /**
+     * Delete the subscription by closing and deleting its managed cursor. 
Handle unsubscribe call from admin layer.
+     *
+     * @param closeIfConsumersConnected
+     *            Flag indicate whether explicitly close connected consumers 
before trying to delete subscription. If
+     *            any consumer is connected to it and if this flag is disable 
then this operation fails.
+     * @return CompletableFuture indicating the completion of delete operation
+     */
+    private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
         CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
 
         log.info("[{}][{}] Unsubscribing", topicName, subName);
 
+        CompletableFuture<Void> closeSubscriptionFuture = new 
CompletableFuture<>();
+
+        if (closeIfConsumersConnected) {
+            this.disconnect().thenRun(() -> {
+                closeSubscriptionFuture.complete(null);
+            }).exceptionally(ex -> {
+                log.error("[{}][{}] Error disconnecting and closing 
subscription", topicName, subName, ex);
+                closeSubscriptionFuture.completeExceptionally(ex);
+                return null;
+            });
+        } else {
+            this.close().thenRun(() -> {
+                closeSubscriptionFuture.complete(null);
+            }).exceptionally(exception -> {
+                log.error("[{}][{}] Error closing subscription", topicName, 
subName, exception);
+                closeSubscriptionFuture.completeExceptionally(exception);
+                return null;
+            });
+        }
+
         // cursor close handles pending delete (ack) operations
-        this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v 
-> {
+        closeSubscriptionFuture.thenCompose(v -> 
topic.unsubscribe(subName)).thenAccept(v -> {
             synchronized (this) {
                 (dispatcher != null ? dispatcher.close() : 
CompletableFuture.completedFuture(null)).thenRun(() -> {
                     log.info("[{}][{}] Successfully deleted subscription", 
topicName, subName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 25633f9..ea4883b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -850,12 +850,53 @@ public class PersistentSubscription implements 
Subscription {
      */
     @Override
     public CompletableFuture<Void> delete() {
+        return delete(false);
+    }
+
+    /**
+     * Forcefully close all consumers and deletes the subscription.
+     * @return
+     */
+    @Override
+    public CompletableFuture<Void> deleteForcefully() {
+        return delete(true);
+    }
+
+    /**
+     * Delete the subscription by closing and deleting its managed cursor. 
Handle unsubscribe call from admin layer.
+     *
+     * @param closeIfConsumersConnected
+     *            Flag indicate whether explicitly close connected consumers 
before trying to delete subscription. If
+     *            any consumer is connected to it and if this flag is disable 
then this operation fails.
+     * @return CompletableFuture indicating the completion of delete operation
+     */
+    private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
         CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
 
         log.info("[{}][{}] Unsubscribing", topicName, subName);
 
+        CompletableFuture<Void> closeSubscriptionFuture = new 
CompletableFuture<>();
+
+        if (closeIfConsumersConnected) {
+            this.disconnect().thenRun(() -> {
+                closeSubscriptionFuture.complete(null);
+            }).exceptionally(ex -> {
+                log.error("[{}][{}] Error disconnecting and closing 
subscription", topicName, subName, ex);
+                closeSubscriptionFuture.completeExceptionally(ex);
+                return null;
+            });
+        } else {
+            this.close().thenRun(() -> {
+                closeSubscriptionFuture.complete(null);
+            }).exceptionally(exception -> {
+                log.error("[{}][{}] Error closing subscription", topicName, 
subName, exception);
+                closeSubscriptionFuture.completeExceptionally(exception);
+                return null;
+            });
+        }
+
         // cursor close handles pending delete (ack) operations
-        this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v 
-> {
+        closeSubscriptionFuture.thenCompose(v -> 
topic.unsubscribe(subName)).thenAccept(v -> {
             synchronized (this) {
                 (dispatcher != null ? dispatcher.close() : 
CompletableFuture.completedFuture(null)).thenRun(() -> {
                     log.info("[{}][{}] Successfully deleted subscription", 
topicName, subName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 82808b1..d1292fe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -1302,6 +1303,55 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2");
     }
 
+    @Test
+    public void testDeleteSubscription() throws Exception {
+        final String subName = "test-sub";
+        final String persistentTopicName = 
"persistent://prop-xyz/ns1/test-sub-topic";
+
+        // disable auto subscription creation
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+        // create a topic and produce some messages
+        publishMessagesOnPersistentTopic(persistentTopicName, 5);
+        assertEquals(admin.topics().getList("prop-xyz/ns1"),
+            Lists.newArrayList(persistentTopicName));
+
+        // create the subscription by PulsarAdmin
+        admin.topics().createSubscription(persistentTopicName, subName, 
MessageId.earliest);
+
+        assertEquals(admin.topics().getSubscriptions(persistentTopicName), 
Lists.newArrayList(subName));
+
+        // create consumer and subscription
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsar.getWebServiceAddress())
+            .statsInterval(0, TimeUnit.SECONDS)
+            .build();
+        Consumer<byte[]> consumer = 
client.newConsumer().topic(persistentTopicName).subscriptionName(subName)
+            .subscriptionType(SubscriptionType.Exclusive).subscribe();
+
+        // try to delete the subscription with a connected consumer
+        try {
+            admin.topics().deleteSubscription(persistentTopicName, subName);
+            fail("should have failed");
+        } catch (PulsarAdminException.PreconditionFailedException e) {
+            assertEquals(e.getStatusCode(), 
Status.PRECONDITION_FAILED.getStatusCode());
+        }
+
+        // failed to delete the subscription
+        assertEquals(admin.topics().getSubscriptions(persistentTopicName), 
Lists.newArrayList(subName));
+
+        // try to delete the subscription with a connected consumer forcefully
+        admin.topics().deleteSubscription(persistentTopicName, subName, true);
+
+        // delete the subscription successfully
+        
assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0);
+
+        // reset to default
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+
+        client.close();
+    }
+
     @Test(dataProvider = "bundling")
     public void testClearBacklogOnNamespace(Integer numBundles) throws 
Exception {
         admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index bec3101..d2ef142 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -183,7 +183,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 
         // 6) Delete the subscription
         response = mock(AsyncResponse.class);
-        persistentTopics.deleteSubscription(response, testTenant, 
testNamespace, testLocalTopicName, "test", true);
+        persistentTopics.deleteSubscription(response, testTenant, 
testNamespace, testLocalTopicName, "test", false,true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 0a00462..4df3c31 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -792,6 +792,31 @@ public interface Topics {
     void deleteSubscription(String topic, String subName) throws 
PulsarAdminException;
 
     /**
+     * Delete a subscription.
+     * <p>
+     * Delete a persistent subscription from a topic. There should not be any 
active consumers on the subscription.
+     * Force flag deletes subscription forcefully by closing all active 
consumers.
+     * <p>
+     *
+     * @param topic
+     *            topic name
+     * @param subName
+     *            Subscription name
+     * @param force
+     *            Delete topic forcefully
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic or subscription does not exist
+     * @throws PreconditionFailedException
+     *             Subscription has active consumers
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void deleteSubscription(String topic, String subName, boolean force) 
throws PulsarAdminException;
+
+    /**
      * Delete a subscription asynchronously.
      * <p>
      * Delete a persistent subscription from a topic. There should not be any 
active consumers on the subscription.
@@ -807,6 +832,24 @@ public interface Topics {
     CompletableFuture<Void> deleteSubscriptionAsync(String topic, String 
subName);
 
     /**
+     * Delete a subscription asynchronously.
+     * <p>
+     * Delete a persistent subscription from a topic. There should not be any 
active consumers on the subscription.
+     * Force flag deletes subscription forcefully by closing all active 
consumers.
+     * <p>
+     *
+     * @param topic
+     *            topic name
+     * @param subName
+     *            Subscription name
+     * @param force
+     *            Delete topic forcefully
+     *
+     * @return a future that can be used to track when the subscription is 
deleted
+     */
+    CompletableFuture<Void> deleteSubscriptionAsync(String topic, String 
subName, boolean force);
+
+    /**
      * Skip all messages on a topic subscription.
      * <p>
      * Completely clears the backlog on the subscription.
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 0b9b3d6..01491ac 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -634,10 +634,30 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public void deleteSubscription(String topic, String subName, boolean 
force) throws PulsarAdminException {
+        try {
+            deleteSubscriptionAsync(topic, subName, 
force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> deleteSubscriptionAsync(String topic, 
String subName) {
+        return deleteSubscriptionAsync(topic, subName, false);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteSubscriptionAsync(String topic, 
String subName, boolean force) {
         TopicName tn = validateTopic(topic);
         String encodedSubName = Codec.encode(subName);
         WebTarget path = topicPath(tn, "subscription", encodedSubName);
+        path = path.queryParam("force", force);
         return asyncDeleteRequest(path);
     }
 
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 0a9d119..10a62e9 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -637,7 +637,7 @@ public class PulsarAdminToolTest {
         
verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s 
sub1"));
-        
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", 
"sub1");
+        
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", 
"sub1", false);
 
         cmdTopics.run(split("stats persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", 
false);
@@ -723,7 +723,7 @@ public class PulsarAdminToolTest {
         
verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1");
 
         topics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s 
sub1"));
-        
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", 
"sub1");
+        
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", 
"sub1", false);
 
         topics.run(split("stats persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 9df331f..64d63e0 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -297,13 +297,17 @@ public class CmdPersistentTopics extends CmdBase {
         @Parameter(description = 
"persistent://property/cluster/namespace/topic", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-f",
+            "--force" }, description = "Disconnect and close all consumers and 
delete subscription forcefully")
+        private boolean force = false;
+
         @Parameter(names = { "-s", "--subscription" }, description = 
"Subscription to be deleted", required = true)
         private String subName;
 
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            persistentTopics.deleteSubscription(persistentTopic, subName);
+            persistentTopics.deleteSubscription(persistentTopic, subName, 
force);
         }
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index f5954cf..079405c 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -354,13 +354,17 @@ public class CmdTopics extends CmdBase {
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-f",
+            "--force" }, description = "Disconnect and close all consumers and 
delete subscription forcefully")
+        private boolean force = false;
+
         @Parameter(names = { "-s", "--subscription" }, description = 
"Subscription to be deleted", required = true)
         private String subName;
 
         @Override
         void run() throws PulsarAdminException {
             String topic = validateTopicName(params);
-            topics.deleteSubscription(topic, subName);
+            topics.deleteSubscription(topic, subName, force);
         }
     }
 
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index ef0dfa7..fbd8927 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1910,6 +1910,7 @@ Options
 |Flag|Description|Default|
 |---|---|---|
 |`-s`, `--subscription`|The subscription to delete||
+|`-f`, `--force`|Disconnect and close all consumers and delete subscription 
forcefully|false|
 
 
 ### `stats`

Reply via email to