Jason918 commented on a change in pull request #13901:
URL: https://github.com/apache/pulsar/pull/13901#discussion_r790120669



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
     }
 
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to skip all messages for subscription 
{} on topic {}",
-                        clientAppId(), subName, topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+        future.thenRun(() -> validateTopicOwnershipAsync(topicName, 
authoritative))
+            .thenRun(() -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP, subName))
+            .thenRun(() -> {
+                // If the topic name is a partition name, no need to get 
partition topic metadata again
+                if (topicName.isPartitioned()) {
+                    
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, 
authoritative);
+                } else {
+                    getPartitionedTopicMetadataAsync(topicName,
+                        authoritative, false).thenAccept(partitionMetadata -> {
+                        if (partitionMetadata.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            futures.add(pulsar()
-                                    .getAdminClient()
-                                    .topics()
-                                    
.skipAllMessagesAsync(topicNamePartition.toString(),
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    futures.add(pulsar()
+                                        .getAdminClient()
+                                        .topics()
+                                        
.skipAllMessagesAsync(topicNamePartition.toString(),
                                             subName));
-                        } catch (Exception e) {
-                            log.error("[{}] Failed to skip all messages {} {}",
-                                    clientAppId(), topicNamePartition, 
subName, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-                    }
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to skip all 
messages {} {}",
+                                        clientAppId(), topicNamePartition, 
subName, e);
+                                    asyncResponse.resume(new RestException(e));
+                                    return;
+                                }
+                            }
 
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
-                        if (exception != null) {
-                            Throwable t = exception.getCause();
-                            if (t instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                                return null;
-                            } else {
-                                log.error("[{}] Failed to skip all messages {} 
{}",
-                                        clientAppId(), topicName, subName, t);
-                                asyncResponse.resume(new RestException(t));
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                if (exception != null) {
+                                    Throwable t = exception.getCause();
+                                    if (t instanceof NotFoundException) {
+                                        asyncResponse.resume(
+                                            new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                                        return null;
+                                    } else {
+                                        log.error("[{}] Failed to skip all 
messages {} {}",
+                                            clientAppId(), topicName, subName, 
t);
+                                        asyncResponse.resume(new 
RestException(t));
+                                        return null;
+                                    }
+                                }
+
+                                
asyncResponse.resume(Response.noContent().build());
                                 return null;
-                            }
+                            });
+                        } else {
+                            
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, 
authoritative);
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
+                    }).exceptionally(ex -> {
+                        log.error("[{}] Failed to skip all messages for 
subscription {} on topic {}",
+                            clientAppId(), subName, topicName, ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
-                } else {
-                    
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, 
authoritative);
                 }
             }).exceptionally(ex -> {
                 log.error("[{}] Failed to skip all messages for subscription 
{} on topic {}",
-                        clientAppId(), subName, topicName, ex);
+                    clientAppId(), subName, topicName, ex);
                 resumeAsyncResponseExceptionally(asyncResponse, ex);
                 return null;
             });
-        }
     }
 
-    private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+    private void 
internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
                                                                String subName, 
boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
-            BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
-                if (ex != null) {
-                    asyncResponse.resume(new RestException(ex));
-                    log.error("[{}] Failed to skip all messages {} {}", 
clientAppId(), topicName, subName, ex);
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                    log.info("[{}] Cleared backlog on {} {}", clientAppId(), 
topicName, subName);
-                }
-            };
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
-                PersistentReplicator repl = (PersistentReplicator) 
topic.getPersistentReplicator(remoteCluster);
-                if (repl == null) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                    return;
-                }
-                repl.clearBacklog().whenComplete(biConsumer);
-            } else {
-                PersistentSubscription sub = topic.getSubscription(subName);
-                if (sub == null) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                    return;
-                }
-                sub.clearBacklog().whenComplete(biConsumer);
-            }
-        } catch (WebApplicationException wae) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to skip all messages for subscription 
on topic {},"
+        validateTopicOwnershipAsync(topicName, authoritative)
+            .thenRun(() ->
+                validateTopicOperationAsync(topicName, TopicOperation.SKIP, 
subName))
+            .thenRun(() -> {
+                try {
+                    PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
+                    BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
+                        if (ex != null) {
+                            asyncResponse.resume(new RestException(ex));
+                            log.error("[{}] Failed to skip all messages {} 
{}", clientAppId(), topicName, subName, ex);
+                        } else {
+                            asyncResponse.resume(Response.noContent().build());
+                            log.info("[{}] Cleared backlog on {} {}", 
clientAppId(), topicName, subName);
+                        }
+                    };
+                    if (subName.startsWith(topic.getReplicatorPrefix())) {
+                        String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
+                        PersistentReplicator repl = (PersistentReplicator) 
topic.getPersistentReplicator(remoteCluster);
+                        if (repl == null) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                            return;
+                        }
+                        repl.clearBacklog().whenComplete(biConsumer);
+                    } else {
+                        PersistentSubscription sub = 
topic.getSubscription(subName);
+                        if (sub == null) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                            return;
+                        }
+                        sub.clearBacklog().whenComplete(biConsumer);
+                    }
+                } catch (WebApplicationException wae) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Failed to skip all messages for 
subscription on topic {},"
                                 + " redirecting to other brokers.",
-                        clientAppId(), topicName, wae);
-            }
-            resumeAsyncResponseExceptionally(asyncResponse, wae);
-        } catch (Exception e) {
-            log.error("[{}] Failed to skip all messages for subscription {} on 
topic {}",
-                    clientAppId(), subName, topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-        }
+                            clientAppId(), topicName, wae);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, wae);
+                } catch (Exception e) {
+                    log.error("[{}] Failed to skip all messages for 
subscription {} on topic {}",
+                        clientAppId(), subName, topicName, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                }
+            });

Review comment:
       Need to handle exception with `exceptionally` here.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
     }
 
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to skip all messages for subscription 
{} on topic {}",
-                        clientAppId(), subName, topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+        future.thenRun(() -> validateTopicOwnershipAsync(topicName, 
authoritative))
+            .thenRun(() -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP, subName))
+            .thenRun(() -> {
+                // If the topic name is a partition name, no need to get 
partition topic metadata again
+                if (topicName.isPartitioned()) {
+                    
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, 
authoritative);
+                } else {
+                    getPartitionedTopicMetadataAsync(topicName,
+                        authoritative, false).thenAccept(partitionMetadata -> {
+                        if (partitionMetadata.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            futures.add(pulsar()
-                                    .getAdminClient()
-                                    .topics()
-                                    
.skipAllMessagesAsync(topicNamePartition.toString(),
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    futures.add(pulsar()
+                                        .getAdminClient()
+                                        .topics()
+                                        
.skipAllMessagesAsync(topicNamePartition.toString(),
                                             subName));
-                        } catch (Exception e) {
-                            log.error("[{}] Failed to skip all messages {} {}",
-                                    clientAppId(), topicNamePartition, 
subName, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-                    }
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to skip all 
messages {} {}",
+                                        clientAppId(), topicNamePartition, 
subName, e);
+                                    asyncResponse.resume(new RestException(e));
+                                    return;
+                                }
+                            }
 
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
-                        if (exception != null) {
-                            Throwable t = exception.getCause();
-                            if (t instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                                return null;
-                            } else {
-                                log.error("[{}] Failed to skip all messages {} 
{}",
-                                        clientAppId(), topicName, subName, t);
-                                asyncResponse.resume(new RestException(t));
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                if (exception != null) {
+                                    Throwable t = exception.getCause();
+                                    if (t instanceof NotFoundException) {
+                                        asyncResponse.resume(
+                                            new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                                        return null;
+                                    } else {
+                                        log.error("[{}] Failed to skip all 
messages {} {}",
+                                            clientAppId(), topicName, subName, 
t);
+                                        asyncResponse.resume(new 
RestException(t));
+                                        return null;
+                                    }
+                                }
+
+                                
asyncResponse.resume(Response.noContent().build());
                                 return null;
-                            }
+                            });
+                        } else {
+                            
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, 
authoritative);
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
+                    }).exceptionally(ex -> {
+                        log.error("[{}] Failed to skip all messages for 
subscription {} on topic {}",
+                            clientAppId(), subName, topicName, ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
-                } else {
-                    
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, 
authoritative);
                 }
             }).exceptionally(ex -> {
                 log.error("[{}] Failed to skip all messages for subscription 
{} on topic {}",
-                        clientAppId(), subName, topicName, ex);
+                    clientAppId(), subName, topicName, ex);
                 resumeAsyncResponseExceptionally(asyncResponse, ex);
                 return null;
             });
-        }
     }
 
-    private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+    private void 
internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
                                                                String subName, 
boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
-            BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
-                if (ex != null) {
-                    asyncResponse.resume(new RestException(ex));
-                    log.error("[{}] Failed to skip all messages {} {}", 
clientAppId(), topicName, subName, ex);
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                    log.info("[{}] Cleared backlog on {} {}", clientAppId(), 
topicName, subName);
-                }
-            };
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
-                PersistentReplicator repl = (PersistentReplicator) 
topic.getPersistentReplicator(remoteCluster);
-                if (repl == null) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                    return;
-                }
-                repl.clearBacklog().whenComplete(biConsumer);
-            } else {
-                PersistentSubscription sub = topic.getSubscription(subName);
-                if (sub == null) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                    return;
-                }
-                sub.clearBacklog().whenComplete(biConsumer);
-            }
-        } catch (WebApplicationException wae) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to skip all messages for subscription 
on topic {},"
+        validateTopicOwnershipAsync(topicName, authoritative)
+            .thenRun(() ->
+                validateTopicOperationAsync(topicName, TopicOperation.SKIP, 
subName))
+            .thenRun(() -> {
+                try {
+                    PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);

Review comment:
       `getTopicReference` is blocking

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
     }
 
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to skip all messages for subscription 
{} on topic {}",
-                        clientAppId(), subName, topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+        future.thenRun(() -> validateTopicOwnershipAsync(topicName, 
authoritative))
+            .thenRun(() -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP, subName))
+            .thenRun(() -> {
+                // If the topic name is a partition name, no need to get 
partition topic metadata again
+                if (topicName.isPartitioned()) {
+                    
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, 
authoritative);
+                } else {
+                    getPartitionedTopicMetadataAsync(topicName,
+                        authoritative, false).thenAccept(partitionMetadata -> {
+                        if (partitionMetadata.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            futures.add(pulsar()
-                                    .getAdminClient()
-                                    .topics()
-                                    
.skipAllMessagesAsync(topicNamePartition.toString(),
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    futures.add(pulsar()
+                                        .getAdminClient()
+                                        .topics()
+                                        
.skipAllMessagesAsync(topicNamePartition.toString(),
                                             subName));
-                        } catch (Exception e) {
-                            log.error("[{}] Failed to skip all messages {} {}",
-                                    clientAppId(), topicNamePartition, 
subName, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-                    }
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to skip all 
messages {} {}",
+                                        clientAppId(), topicNamePartition, 
subName, e);
+                                    asyncResponse.resume(new RestException(e));
+                                    return;
+                                }
+                            }
 
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
-                        if (exception != null) {
-                            Throwable t = exception.getCause();
-                            if (t instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                                return null;
-                            } else {
-                                log.error("[{}] Failed to skip all messages {} 
{}",
-                                        clientAppId(), topicName, subName, t);
-                                asyncResponse.resume(new RestException(t));
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                                if (exception != null) {
+                                    Throwable t = exception.getCause();
+                                    if (t instanceof NotFoundException) {
+                                        asyncResponse.resume(
+                                            new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                                        return null;
+                                    } else {
+                                        log.error("[{}] Failed to skip all 
messages {} {}",
+                                            clientAppId(), topicName, subName, 
t);
+                                        asyncResponse.resume(new 
RestException(t));
+                                        return null;
+                                    }
+                                }
+
+                                
asyncResponse.resume(Response.noContent().build());
                                 return null;
-                            }
+                            });
+                        } else {
+                            
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, 
authoritative);
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
+                    }).exceptionally(ex -> {
+                        log.error("[{}] Failed to skip all messages for 
subscription {} on topic {}",
+                            clientAppId(), subName, topicName, ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
-                } else {
-                    
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, 
authoritative);
                 }
             }).exceptionally(ex -> {
                 log.error("[{}] Failed to skip all messages for subscription 
{} on topic {}",

Review comment:
       We should use `ex.getCause()`

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
     }
 
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.error("[{}] Failed to skip all messages for subscription 
{} on topic {}",
-                        clientAppId(), subName, topicName, e);
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                if (partitionMetadata.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+        future.thenRun(() -> validateTopicOwnershipAsync(topicName, 
authoritative))

Review comment:
       Should use `thenCompose`, we need the result to continue the async work 
flow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to