This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2a0be4b794e6d8a5738bba7073fe09f06b7961ef Author: Qiang Zhao <74767115+mattisonc...@users.noreply.github.com> AuthorDate: Thu Jan 13 00:21:29 2022 +0800 Avoid call sync method in async rest API for force delete subscription (#13668) --- .../broker/admin/impl/PersistentTopicsBase.java | 55 +++++++++++----------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index fa57199..9e8728c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1585,33 +1585,34 @@ public class PersistentTopicsBase extends AdminResource { private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { - try { - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE); - - Topic topic = getTopicReference(topicName); - Subscription sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } - sub.deleteForcefully().get(); - log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName); - asyncResponse.resume(Response.noContent().build()); - } catch (Exception e) { - if (e instanceof WebApplicationException) { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed to delete subscription forcefully from topic {}," - + " redirecting to other brokers.", - clientAppId(), topicName, e); - } - asyncResponse.resume(e); - } else { - log.error("[{}] Failed to delete subscription forcefully {} {}", - clientAppId(), topicName, subName, e); - asyncResponse.resume(new RestException(e)); - } - } + validateTopicOwnershipAsync(topicName, authoritative) + .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE)) + .thenCompose(__ -> { + Topic topic = getTopicReference(topicName); + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + throw new RestException(Status.NOT_FOUND, "Subscription not found"); + } + return sub.deleteForcefully(); + }).thenRun(() -> { + log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(e -> { + Throwable cause = e.getCause(); + if (cause instanceof WebApplicationException) { + if (log.isDebugEnabled() && ((WebApplicationException) cause).getResponse().getStatus() + == Status.TEMPORARY_REDIRECT.getStatusCode()) { + log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.", + clientAppId(), topicName, cause); + } + asyncResponse.resume(cause); + } else { + log.error("[{}] Failed to delete subscription forcefully {} {}", + clientAppId(), topicName, subName, cause); + asyncResponse.resume(new RestException(cause)); + } + return null; + }); } protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {