oneby-wang commented on code in PR #25127:
URL: https://github.com/apache/pulsar/pull/25127#discussion_r2671791952


##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                               
String subscriptionName,
+                                                                               
Optional<MessageId> startPosition,
+                                                                               
long backlogScanMaxEntries) {
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new 
CompletableFuture<>();
+        AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new 
AtomicReference<>();
+        int partitionIndex = TopicName.get(topic).getPartitionIndex();
+        AtomicReference<Optional<MessageId>> startPositionRef = new 
AtomicReference<>(startPosition);
+
+        Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> 
resultSupplier =
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPositionRef.get());
+        BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction 
= new BiConsumer<>() {
+            @Override
+            public void accept(AnalyzeSubscriptionBacklogResult currentResult, 
Throwable throwable) {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+
+                AnalyzeSubscriptionBacklogResult mergedResult = 
mergeBacklogResults(currentResult, resultRef.get());
+                resultRef.set(mergedResult);
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {
+                    future.complete(mergedResult);
+                    return;
+                }
+
+                // To avoid infinite loops, we ensure the entry count is 
incremented after each loop.
+                if (currentResult.getEntries() <= 0) {
+                    log.warn("[{}][{}] Scanned total entry count is null, 
abort analyze backlog, start position is: {}",
+                            topic, subscriptionName, startPositionRef.get());
+                    future.completeExceptionally(
+                            new PulsarAdminException("Incorrect total entry 
count returned from server"));
+                }
+
+                // In analyze-backlog, lastMessageId is null only when: total 
entries is 0,
+                // with false aborted flag returned.
+                if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+                    log.warn("[{}][{}] Scanned last message id is blank, abort 
analyze backlog, start position is: {}",
+                            topic, subscriptionName, startPositionRef.get());
+                    future.completeExceptionally(
+                            new PulsarAdminException("Incorrect last message 
id returned from server"));
+                }
+
+                String[] messageIdSplits = 
mergedResult.getLastMessageId().split(":");
+                MessageIdImpl nextScanMessageId =
+                        new MessageIdImpl(Long.parseLong(messageIdSplits[0]), 
Long.parseLong(messageIdSplits[1]) + 1,
+                                partitionIndex);

Review Comment:
   Actually, `lastMessageId` is returned by the server and is guaranteed to be 
valid. Keep it simple here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to