Technoboy- commented on code in PR #15843:
URL: https://github.com/apache/pulsar/pull/15843#discussion_r885336858
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1003,32 +1007,37 @@ private void
internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncRespon
});
}
- protected void internalDeleteTopic(boolean authoritative, boolean force) {
+ protected CompletableFuture<Void> internalDeleteTopic(boolean
authoritative,
+ boolean force) {
if (force) {
- internalDeleteTopicForcefully(authoritative);
+ return internalDeleteTopicForcefully(authoritative);
} else {
- internalDeleteTopic(authoritative);
+ return internalDeleteTopic(authoritative);
}
}
- protected void internalDeleteTopic(boolean authoritative) {
- validateNamespaceOperation(topicName.getNamespaceObject(),
NamespaceOperation.DELETE_TOPIC);
- validateTopicOwnership(topicName, authoritative);
+ protected CompletableFuture<Void> internalDeleteTopic(boolean
authoritative) {
+ CompletableFuture<Void> ret;
+ ret = validateTopicOwnershipAsync(topicName,
authoritative).thenCompose(
+ __ ->
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+ NamespaceOperation.DELETE_TOPIC))
+ .thenCompose(__ ->
pulsar().getBrokerService().deleteTopic(topicName.toString(), false))
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof TopicBusyException) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Topic has active producers/subscriptions");
+ } else if (isManagedLedgerNotFoundException(realCause)) {
+ log.info("[{}] Topic was already not existing {}",
clientAppId(), topicName, realCause);
+ throw new RestException(Status.NOT_FOUND,
+
getTopicNotFoundErrorMessage(topicName.toString()));
+ } else {
+ log.error("[{}] Failed to delete topic {}",
clientAppId(), topicName, realCause);
Review Comment:
How about moving this log to the rest controller?
See below comment.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java:
##########
@@ -357,12 +357,19 @@ public void unloadTopic(@Suspended final AsyncResponse
asyncResponse, @PathParam
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Topic has active
producers/subscriptions")})
- public void deleteTopic(@PathParam("property") String property,
@PathParam("cluster") String cluster,
+ public void deleteTopic(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalDeleteTopic(authoritative, force);
+ internalDeleteTopic(authoritative, force).thenAccept(
+ __ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse,
ex.getCause());
Review Comment:
Add error log here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]