This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2a0be4b794e6d8a5738bba7073fe09f06b7961ef
Author: Qiang Zhao <74767115+mattisonc...@users.noreply.github.com>
AuthorDate: Thu Jan 13 00:21:29 2022 +0800

    Avoid call sync method in async rest API for force delete subscription  
(#13668)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 55 +++++++++++-----------
 1 file changed, 28 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index fa57199..9e8728c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1585,33 +1585,34 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse 
asyncResponse,
                                                                             
String subName, boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
-
-            Topic topic = getTopicReference(topicName);
-            Subscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                return;
-            }
-            sub.deleteForcefully().get();
-            log.info("[{}][{}] Deleted subscription forcefully {}", 
clientAppId(), topicName, subName);
-            asyncResponse.resume(Response.noContent().build());
-        } catch (Exception e) {
-            if (e instanceof WebApplicationException) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Failed to delete subscription forcefully 
from topic {},"
-                                    + " redirecting to other brokers.",
-                            clientAppId(), topicName, e);
-                }
-                asyncResponse.resume(e);
-            } else {
-                log.error("[{}] Failed to delete subscription forcefully {} 
{}",
-                        clientAppId(), topicName, subName, e);
-                asyncResponse.resume(new RestException(e));
-            }
-        }
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE))
+                .thenCompose(__ -> {
+                    Topic topic = getTopicReference(topicName);
+                    Subscription sub = topic.getSubscription(subName);
+                    if (sub == null) {
+                        throw new RestException(Status.NOT_FOUND, 
"Subscription not found");
+                    }
+                    return sub.deleteForcefully();
+                }).thenRun(() -> {
+                    log.info("[{}][{}] Deleted subscription forcefully {}", 
clientAppId(), topicName, subName);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(e -> {
+                    Throwable cause = e.getCause();
+                    if (cause instanceof WebApplicationException) {
+                        if (log.isDebugEnabled() && ((WebApplicationException) 
cause).getResponse().getStatus()
+                                == Status.TEMPORARY_REDIRECT.getStatusCode()) {
+                            log.debug("[{}] Failed to delete subscription from 
topic {}, redirecting to other brokers.",
+                                    clientAppId(), topicName, cause);
+                        }
+                        asyncResponse.resume(cause);
+                    } else {
+                        log.error("[{}] Failed to delete subscription 
forcefully {} {}",
+                                clientAppId(), topicName, subName, cause);
+                        asyncResponse.resume(new RestException(cause));
+                    }
+                    return null;
+                });
     }
 
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {

Reply via email to