Jason918 commented on a change in pull request #13901: URL: https://github.com/apache/pulsar/pull/13901#discussion_r790120669
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java ########## @@ -1651,118 +1651,123 @@ private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes } protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) { + CompletableFuture<Void> future; if (topicName.isGlobal()) { - try { - validateGlobalNamespaceOwnership(namespaceName); - } catch (Exception e) { - log.error("[{}] Failed to skip all messages for subscription {} on topic {}", - clientAppId(), subName, topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - return; - } + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); } - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.SKIP, subName); - - // If the topic name is a partition name, no need to get partition topic metadata again - if (topicName.isPartitioned()) { - internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative); - } else { - getPartitionedTopicMetadataAsync(topicName, - authoritative, false).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions > 0) { - final List<CompletableFuture<Void>> futures = Lists.newArrayList(); + future.thenRun(() -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenRun(() -> validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)) + .thenRun(() -> { + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, authoritative); + } else { + getPartitionedTopicMetadataAsync(topicName, + authoritative, false).thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List<CompletableFuture<Void>> futures = Lists.newArrayList(); - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar() - .getAdminClient() - .topics() - .skipAllMessagesAsync(topicNamePartition.toString(), + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar() + .getAdminClient() + .topics() + .skipAllMessagesAsync(topicNamePartition.toString(), subName)); - } catch (Exception e) { - log.error("[{}] Failed to skip all messages {} {}", - clientAppId(), topicNamePartition, subName, e); - asyncResponse.resume(new RestException(e)); - return; - } - } + } catch (Exception e) { + log.error("[{}] Failed to skip all messages {} {}", + clientAppId(), topicNamePartition, subName, e); + asyncResponse.resume(new RestException(e)); + return; + } + } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return null; - } else { - log.error("[{}] Failed to skip all messages {} {}", - clientAppId(), topicName, subName, t); - asyncResponse.resume(new RestException(t)); + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume( + new RestException(Status.NOT_FOUND, "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to skip all messages {} {}", + clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); return null; - } + }); + } else { + internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, authoritative); } - - asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to skip all messages for subscription {} on topic {}", + clientAppId(), subName, topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); - } else { - internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative); } }).exceptionally(ex -> { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", - clientAppId(), subName, topicName, ex); + clientAppId(), subName, topicName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); - } } - private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncResponse, + private void internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse, String subName, boolean authoritative) { - try { - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.SKIP, subName); - - PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); - BiConsumer<Void, Throwable> biConsumer = (v, ex) -> { - if (ex != null) { - asyncResponse.resume(new RestException(ex)); - log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex); - } else { - asyncResponse.resume(Response.noContent().build()); - log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName); - } - }; - if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); - if (repl == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } - repl.clearBacklog().whenComplete(biConsumer); - } else { - PersistentSubscription sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } - sub.clearBacklog().whenComplete(biConsumer); - } - } catch (WebApplicationException wae) { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed to skip all messages for subscription on topic {}," + validateTopicOwnershipAsync(topicName, authoritative) + .thenRun(() -> + validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)) + .thenRun(() -> { + try { + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + BiConsumer<Void, Throwable> biConsumer = (v, ex) -> { + if (ex != null) { + asyncResponse.resume(new RestException(ex)); + log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex); + } else { + asyncResponse.resume(Response.noContent().build()); + log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName); + } + }; + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); + if (repl == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } + repl.clearBacklog().whenComplete(biConsumer); + } else { + PersistentSubscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } + sub.clearBacklog().whenComplete(biConsumer); + } + } catch (WebApplicationException wae) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to skip all messages for subscription on topic {}," + " redirecting to other brokers.", - clientAppId(), topicName, wae); - } - resumeAsyncResponseExceptionally(asyncResponse, wae); - } catch (Exception e) { - log.error("[{}] Failed to skip all messages for subscription {} on topic {}", - clientAppId(), subName, topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - } + clientAppId(), topicName, wae); + } + resumeAsyncResponseExceptionally(asyncResponse, wae); + } catch (Exception e) { + log.error("[{}] Failed to skip all messages for subscription {} on topic {}", + clientAppId(), subName, topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + } + }); Review comment: Need to handle exception with `exceptionally` here. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java ########## @@ -1651,118 +1651,123 @@ private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes } protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) { + CompletableFuture<Void> future; if (topicName.isGlobal()) { - try { - validateGlobalNamespaceOwnership(namespaceName); - } catch (Exception e) { - log.error("[{}] Failed to skip all messages for subscription {} on topic {}", - clientAppId(), subName, topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - return; - } + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); } - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.SKIP, subName); - - // If the topic name is a partition name, no need to get partition topic metadata again - if (topicName.isPartitioned()) { - internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative); - } else { - getPartitionedTopicMetadataAsync(topicName, - authoritative, false).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions > 0) { - final List<CompletableFuture<Void>> futures = Lists.newArrayList(); + future.thenRun(() -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenRun(() -> validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)) + .thenRun(() -> { + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, authoritative); + } else { + getPartitionedTopicMetadataAsync(topicName, + authoritative, false).thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List<CompletableFuture<Void>> futures = Lists.newArrayList(); - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar() - .getAdminClient() - .topics() - .skipAllMessagesAsync(topicNamePartition.toString(), + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar() + .getAdminClient() + .topics() + .skipAllMessagesAsync(topicNamePartition.toString(), subName)); - } catch (Exception e) { - log.error("[{}] Failed to skip all messages {} {}", - clientAppId(), topicNamePartition, subName, e); - asyncResponse.resume(new RestException(e)); - return; - } - } + } catch (Exception e) { + log.error("[{}] Failed to skip all messages {} {}", + clientAppId(), topicNamePartition, subName, e); + asyncResponse.resume(new RestException(e)); + return; + } + } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return null; - } else { - log.error("[{}] Failed to skip all messages {} {}", - clientAppId(), topicName, subName, t); - asyncResponse.resume(new RestException(t)); + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume( + new RestException(Status.NOT_FOUND, "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to skip all messages {} {}", + clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); return null; - } + }); + } else { + internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, authoritative); } - - asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to skip all messages for subscription {} on topic {}", + clientAppId(), subName, topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); - } else { - internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative); } }).exceptionally(ex -> { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", - clientAppId(), subName, topicName, ex); + clientAppId(), subName, topicName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); - } } - private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncResponse, + private void internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse, String subName, boolean authoritative) { - try { - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.SKIP, subName); - - PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); - BiConsumer<Void, Throwable> biConsumer = (v, ex) -> { - if (ex != null) { - asyncResponse.resume(new RestException(ex)); - log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex); - } else { - asyncResponse.resume(Response.noContent().build()); - log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName); - } - }; - if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); - if (repl == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } - repl.clearBacklog().whenComplete(biConsumer); - } else { - PersistentSubscription sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } - sub.clearBacklog().whenComplete(biConsumer); - } - } catch (WebApplicationException wae) { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed to skip all messages for subscription on topic {}," + validateTopicOwnershipAsync(topicName, authoritative) + .thenRun(() -> + validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)) + .thenRun(() -> { + try { + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); Review comment: `getTopicReference` is blocking ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java ########## @@ -1651,118 +1651,123 @@ private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes } protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) { + CompletableFuture<Void> future; if (topicName.isGlobal()) { - try { - validateGlobalNamespaceOwnership(namespaceName); - } catch (Exception e) { - log.error("[{}] Failed to skip all messages for subscription {} on topic {}", - clientAppId(), subName, topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - return; - } + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); } - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.SKIP, subName); - - // If the topic name is a partition name, no need to get partition topic metadata again - if (topicName.isPartitioned()) { - internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative); - } else { - getPartitionedTopicMetadataAsync(topicName, - authoritative, false).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions > 0) { - final List<CompletableFuture<Void>> futures = Lists.newArrayList(); + future.thenRun(() -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenRun(() -> validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)) + .thenRun(() -> { + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, authoritative); + } else { + getPartitionedTopicMetadataAsync(topicName, + authoritative, false).thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List<CompletableFuture<Void>> futures = Lists.newArrayList(); - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar() - .getAdminClient() - .topics() - .skipAllMessagesAsync(topicNamePartition.toString(), + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar() + .getAdminClient() + .topics() + .skipAllMessagesAsync(topicNamePartition.toString(), subName)); - } catch (Exception e) { - log.error("[{}] Failed to skip all messages {} {}", - clientAppId(), topicNamePartition, subName, e); - asyncResponse.resume(new RestException(e)); - return; - } - } + } catch (Exception e) { + log.error("[{}] Failed to skip all messages {} {}", + clientAppId(), topicNamePartition, subName, e); + asyncResponse.resume(new RestException(e)); + return; + } + } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return null; - } else { - log.error("[{}] Failed to skip all messages {} {}", - clientAppId(), topicName, subName, t); - asyncResponse.resume(new RestException(t)); + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume( + new RestException(Status.NOT_FOUND, "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to skip all messages {} {}", + clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); return null; - } + }); + } else { + internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, authoritative); } - - asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to skip all messages for subscription {} on topic {}", + clientAppId(), subName, topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); - } else { - internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative); } }).exceptionally(ex -> { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", Review comment: We should use `ex.getCause()` ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java ########## @@ -1651,118 +1651,123 @@ private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes } protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) { + CompletableFuture<Void> future; if (topicName.isGlobal()) { - try { - validateGlobalNamespaceOwnership(namespaceName); - } catch (Exception e) { - log.error("[{}] Failed to skip all messages for subscription {} on topic {}", - clientAppId(), subName, topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - return; - } + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); } - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.SKIP, subName); - - // If the topic name is a partition name, no need to get partition topic metadata again - if (topicName.isPartitioned()) { - internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative); - } else { - getPartitionedTopicMetadataAsync(topicName, - authoritative, false).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions > 0) { - final List<CompletableFuture<Void>> futures = Lists.newArrayList(); + future.thenRun(() -> validateTopicOwnershipAsync(topicName, authoritative)) Review comment: Should use `thenCompose`, we need the result to continue the async work flow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org