codelipenghui commented on a change in pull request #13668:
URL: https://github.com/apache/pulsar/pull/13668#discussion_r782862562
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1602,33 +1602,34 @@ protected void
internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse
asyncResponse,
String subName, boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
-
- Topic topic = getTopicReference(topicName);
- Subscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
- sub.deleteForcefully().get();
- log.info("[{}][{}] Deleted subscription forcefully {}",
clientAppId(), topicName, subName);
- asyncResponse.resume(Response.noContent().build());
- } catch (Exception e) {
- if (e instanceof WebApplicationException) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to delete subscription forcefully
from topic {},"
- + " redirecting to other brokers.",
- clientAppId(), topicName, e);
- }
- asyncResponse.resume(e);
- } else {
- log.error("[{}] Failed to delete subscription forcefully {}
{}",
- clientAppId(), topicName, subName, e);
- asyncResponse.resume(new RestException(e));
- }
- }
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenCompose(__ -> {
+ Topic topic = getTopicReference(topicName);
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND,
"Subscription not found");
+ }
+ return sub.deleteForcefully();
+ }).thenRun(() -> {
+ log.info("[{}][{}] Deleted subscription forcefully {}",
clientAppId(), topicName, subName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(e -> {
+ Throwable cause = e.getCause();
+ if (cause instanceof WebApplicationException) {
+ if (log.isDebugEnabled() && ((WebApplicationException)
e).getResponse().getStatus()
Review comment:
```suggestion
if (log.isDebugEnabled() &&
((WebApplicationException) cause).getResponse().getStatus()
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1602,33 +1602,34 @@ protected void
internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse
asyncResponse,
String subName, boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
-
- Topic topic = getTopicReference(topicName);
- Subscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
- sub.deleteForcefully().get();
- log.info("[{}][{}] Deleted subscription forcefully {}",
clientAppId(), topicName, subName);
- asyncResponse.resume(Response.noContent().build());
- } catch (Exception e) {
- if (e instanceof WebApplicationException) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to delete subscription forcefully
from topic {},"
- + " redirecting to other brokers.",
- clientAppId(), topicName, e);
- }
- asyncResponse.resume(e);
- } else {
- log.error("[{}] Failed to delete subscription forcefully {}
{}",
- clientAppId(), topicName, subName, e);
- asyncResponse.resume(new RestException(e));
- }
- }
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenCompose(__ -> {
+ Topic topic = getTopicReference(topicName);
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND,
"Subscription not found");
+ }
+ return sub.deleteForcefully();
+ }).thenRun(() -> {
+ log.info("[{}][{}] Deleted subscription forcefully {}",
clientAppId(), topicName, subName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(e -> {
+ Throwable cause = e.getCause();
+ if (cause instanceof WebApplicationException) {
+ if (log.isDebugEnabled() && ((WebApplicationException)
e).getResponse().getStatus()
+ == Status.TEMPORARY_REDIRECT.getStatusCode()) {
+ log.debug("[{}] Failed to delete subscription from
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, e);
Review comment:
```suggestion
clientAppId(), topicName, cause);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]