Copilot commented on code in PR #25127:
URL: https://github.com/apache/pulsar/pull/25127#discussion_r2671561014


##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                               
String subscriptionName,
+                                                                               
Optional<MessageId> startPosition,
+                                                                               
long backlogScanMaxEntries) {
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new 
CompletableFuture<>();
+        AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new 
AtomicReference<>();
+        int partitionIndex = TopicName.get(topic).getPartitionIndex();
+        AtomicReference<Optional<MessageId>> startPositionRef = new 
AtomicReference<>(startPosition);
+
+        Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> 
resultSupplier =
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPositionRef.get());
+        BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction 
= new BiConsumer<>() {
+            @Override
+            public void accept(AnalyzeSubscriptionBacklogResult currentResult, 
Throwable throwable) {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+
+                AnalyzeSubscriptionBacklogResult mergedResult = 
mergeBacklogResults(currentResult, resultRef.get());
+                resultRef.set(mergedResult);
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {
+                    future.complete(mergedResult);
+                    return;
+                }
+
+                // To avoid infinite loops, we ensure the entry count is 
incremented after each loop.
+                if (currentResult.getEntries() <= 0) {
+                    log.warn("[{}][{}] Scanned total entry count is null, 
abort analyze backlog, start position is: {}",
+                            topic, subscriptionName, startPositionRef.get());
+                    future.completeExceptionally(
+                            new PulsarAdminException("Incorrect total entry 
count returned from server"));
+                }
+
+                // In analyze-backlog, lastMessageId is null only when: total 
entries is 0,
+                // with false aborted flag returned.
+                if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+                    log.warn("[{}][{}] Scanned last message id is blank, abort 
analyze backlog, start position is: {}",
+                            topic, subscriptionName, startPositionRef.get());
+                    future.completeExceptionally(
+                            new PulsarAdminException("Incorrect last message 
id returned from server"));
+                }

Review Comment:
   After completing the future exceptionally, the method should return to 
prevent further execution. Without a return statement, the code continues to 
execute lines 1651-1657, which will attempt to parse the lastMessageId and 
schedule another iteration even though an error has already been reported.



##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                               
String subscriptionName,
+                                                                               
Optional<MessageId> startPosition,
+                                                                               
long backlogScanMaxEntries) {
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new 
CompletableFuture<>();
+        AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new 
AtomicReference<>();
+        int partitionIndex = TopicName.get(topic).getPartitionIndex();
+        AtomicReference<Optional<MessageId>> startPositionRef = new 
AtomicReference<>(startPosition);
+
+        Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> 
resultSupplier =
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPositionRef.get());
+        BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction 
= new BiConsumer<>() {
+            @Override
+            public void accept(AnalyzeSubscriptionBacklogResult currentResult, 
Throwable throwable) {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+
+                AnalyzeSubscriptionBacklogResult mergedResult = 
mergeBacklogResults(currentResult, resultRef.get());
+                resultRef.set(mergedResult);
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {
+                    future.complete(mergedResult);
+                    return;
+                }
+
+                // To avoid infinite loops, we ensure the entry count is 
incremented after each loop.
+                if (currentResult.getEntries() <= 0) {
+                    log.warn("[{}][{}] Scanned total entry count is null, 
abort analyze backlog, start position is: {}",
+                            topic, subscriptionName, startPositionRef.get());
+                    future.completeExceptionally(
+                            new PulsarAdminException("Incorrect total entry 
count returned from server"));
+                }
+
+                // In analyze-backlog, lastMessageId is null only when: total 
entries is 0,
+                // with false aborted flag returned.
+                if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+                    log.warn("[{}][{}] Scanned last message id is blank, abort 
analyze backlog, start position is: {}",
+                            topic, subscriptionName, startPositionRef.get());
+                    future.completeExceptionally(
+                            new PulsarAdminException("Incorrect last message 
id returned from server"));
+                }
+
+                String[] messageIdSplits = 
mergedResult.getLastMessageId().split(":");
+                MessageIdImpl nextScanMessageId =
+                        new MessageIdImpl(Long.parseLong(messageIdSplits[0]), 
Long.parseLong(messageIdSplits[1]) + 1,
+                                partitionIndex);

Review Comment:
   The message ID parsing logic is fragile and lacks error handling. If the 
lastMessageId format is unexpected (e.g., doesn't contain a colon, has 
non-numeric values, or has fewer than 2 parts), this will throw 
ArrayIndexOutOfBoundsException or NumberFormatException, causing an unhandled 
exception. Consider adding validation and proper error handling for the parsing 
operation.
   ```suggestion
                       return;
                   }
   
                   String[] messageIdSplits = 
mergedResult.getLastMessageId().split(":");
                   if (messageIdSplits.length < 2) {
                       log.warn("[{}][{}] Scanned last message id has 
unexpected format '{}', "
                                       + "abort analyze backlog, start position 
is: {}",
                               topic, subscriptionName, 
mergedResult.getLastMessageId(), startPositionRef.get());
                       future.completeExceptionally(
                               new PulsarAdminException("Incorrect last message 
id format returned from server"));
                       return;
                   }
   
                   final long ledgerId;
                   final long entryId;
                   try {
                       ledgerId = Long.parseLong(messageIdSplits[0]);
                       entryId = Long.parseLong(messageIdSplits[1]);
                   } catch (NumberFormatException e) {
                       log.warn("[{}][{}] Failed to parse last message id '{}', 
abort analyze backlog, "
                                       + "start position is: {}",
                               topic, subscriptionName, 
mergedResult.getLastMessageId(), startPositionRef.get(), e);
                       future.completeExceptionally(
                               new PulsarAdminException("Incorrect last message 
id format returned from server", e));
                       return;
                   }
   
                   MessageIdImpl nextScanMessageId =
                           new MessageIdImpl(ledgerId, entryId + 1, 
partitionIndex);
   ```



##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                               
String subscriptionName,
+                                                                               
Optional<MessageId> startPosition,
+                                                                               
long backlogScanMaxEntries) {
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new 
CompletableFuture<>();
+        AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new 
AtomicReference<>();
+        int partitionIndex = TopicName.get(topic).getPartitionIndex();
+        AtomicReference<Optional<MessageId>> startPositionRef = new 
AtomicReference<>(startPosition);
+
+        Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> 
resultSupplier =
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPositionRef.get());
+        BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction 
= new BiConsumer<>() {
+            @Override
+            public void accept(AnalyzeSubscriptionBacklogResult currentResult, 
Throwable throwable) {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+
+                AnalyzeSubscriptionBacklogResult mergedResult = 
mergeBacklogResults(currentResult, resultRef.get());
+                resultRef.set(mergedResult);
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {
+                    future.complete(mergedResult);
+                    return;
+                }
+
+                // To avoid infinite loops, we ensure the entry count is 
incremented after each loop.
+                if (currentResult.getEntries() <= 0) {
+                    log.warn("[{}][{}] Scanned total entry count is null, 
abort analyze backlog, start position is: {}",
+                            topic, subscriptionName, startPositionRef.get());
+                    future.completeExceptionally(
+                            new PulsarAdminException("Incorrect total entry 
count returned from server"));
+                }
+
+                // In analyze-backlog, lastMessageId is null only when: total 
entries is 0,
+                // with false aborted flag returned.
+                if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+                    log.warn("[{}][{}] Scanned last message id is blank, abort 
analyze backlog, start position is: {}",
+                            topic, subscriptionName, startPositionRef.get());
+                    future.completeExceptionally(
+                            new PulsarAdminException("Incorrect last message 
id returned from server"));
+                }
+
+                String[] messageIdSplits = 
mergedResult.getLastMessageId().split(":");
+                MessageIdImpl nextScanMessageId =
+                        new MessageIdImpl(Long.parseLong(messageIdSplits[0]), 
Long.parseLong(messageIdSplits[1]) + 1,
+                                partitionIndex);
+                startPositionRef.set(Optional.of(nextScanMessageId));
+
+                resultSupplier.get().whenComplete(this);
+            }
+        };
+
+        resultSupplier.get().whenComplete(completeAction);
+        return future;
+    }
+
+    private AnalyzeSubscriptionBacklogResult 
mergeBacklogResults(AnalyzeSubscriptionBacklogResult current,
+                                                                 
AnalyzeSubscriptionBacklogResult previous) {
+        if (previous == null) {
+            return current;
+        }
+
+        AnalyzeSubscriptionBacklogResult mergedRes = new 
AnalyzeSubscriptionBacklogResult();
+        mergedRes.setEntries(current.getEntries() + previous.getEntries());
+        mergedRes.setMessages(current.getMessages() + previous.getMessages());
+        mergedRes.setMarkerMessages(current.getMarkerMessages() + 
previous.getMarkerMessages());
+
+        mergedRes.setFilterAcceptedEntries(current.getFilterAcceptedEntries() 
+ previous.getFilterAcceptedEntries());
+        mergedRes.setFilterRejectedEntries(current.getFilterRejectedEntries() 
+ previous.getFilterRejectedEntries());
+        mergedRes.setFilterRescheduledEntries(
+                current.getFilterRescheduledEntries() + 
previous.getFilterRescheduledEntries());
+
+        
mergedRes.setFilterAcceptedMessages(current.getFilterAcceptedMessages() + 
previous.getFilterAcceptedMessages());
+        
mergedRes.setFilterRejectedMessages(current.getFilterRejectedMessages() + 
previous.getFilterRejectedMessages());
+        mergedRes.setFilterRescheduledMessages(
+                current.getFilterRescheduledMessages() + 
previous.getFilterRescheduledMessages());
+
+        mergedRes.setAborted(current.isAborted());
+        mergedRes.setFirstMessageId(current.getFirstMessageId());

Review Comment:
   The mergeBacklogResults method should use the firstMessageId from the 
previous result (if available) instead of always using the current result's 
firstMessageId. When merging multiple scan results, the first message ID should 
refer to the very first message from the initial scan, not the first message 
from the latest scan iteration.
   ```suggestion
           mergedRes.setFirstMessageId(previous.getFirstMessageId() != null
                   ? previous.getFirstMessageId()
                   : current.getFirstMessageId());
   ```



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java:
##########
@@ -2220,21 +2220,88 @@ AnalyzeSubscriptionBacklogResult 
analyzeSubscriptionBacklog(String topic, String
      * This is a potentially expensive operation, as it requires
      * to read the messages from storage.
      * This function takes into consideration batch messages
-     * and also Subscription filters.
+     * and also Subscription filters. <br/>
+     * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, 
Optional, long)}
      * @param topic
      *            Topic name
      * @param subscriptionName
      *            the subscription
      * @param startPosition
      *           the position to start the scan from (empty means the last 
processed message)
+     * @param backlogScanMaxEntries
+     *           the maximum number of backlog entries the client will scan 
before terminating its loop
      * @return an accurate analysis of the backlog
      * @throws PulsarAdminException
      *            Unexpected error
      */
+    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, 
String subscriptionName,
+                                                                
Optional<MessageId> startPosition,
+                                                                long 
backlogScanMaxEntries) throws PulsarAdminException;
+
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last 
processed message)
+     * @return an accurate analysis of the backlog
+     */
     CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
                                                                            
String subscriptionName,
                                                                            
Optional<MessageId> startPosition);
 
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     *
+     *<p>
+     * What's the purpose of this overloaded method? <br/>
+     * There are broker side configurable maximum limits how many entries will 
be read and how long the scanning can
+     * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and 
subscriptionBacklogScanMaxEntries
+     * (default 10000) control this behavior. <br/>
+     * Increasing these settings is possible. However, it's possible that the 
HTTP request times out (also idle timeout
+     * in NAT/firewall etc.) before the command completes so increasing the 
limits might not be useful beyond a few
+     * minutes.
+     *<p/>

Review Comment:
   There's a formatting inconsistency in the documentation. The opening `<p>` 
tag uses lowercase, but the closing tag uses uppercase and has an extra forward 
slash (`<p/>` instead of `</p>`). While HTML tags are generally 
case-insensitive, it's better to be consistent. Consider using either 
`<p>...</p>` or removing the tags if they're not needed in the JavaDoc.



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java:
##########
@@ -2220,21 +2220,88 @@ AnalyzeSubscriptionBacklogResult 
analyzeSubscriptionBacklog(String topic, String
      * This is a potentially expensive operation, as it requires
      * to read the messages from storage.
      * This function takes into consideration batch messages
-     * and also Subscription filters.
+     * and also Subscription filters. <br/>
+     * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, 
Optional, long)}
      * @param topic
      *            Topic name
      * @param subscriptionName
      *            the subscription
      * @param startPosition
      *           the position to start the scan from (empty means the last 
processed message)
+     * @param backlogScanMaxEntries
+     *           the maximum number of backlog entries the client will scan 
before terminating its loop
      * @return an accurate analysis of the backlog
      * @throws PulsarAdminException
      *            Unexpected error
      */
+    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, 
String subscriptionName,
+                                                                
Optional<MessageId> startPosition,
+                                                                long 
backlogScanMaxEntries) throws PulsarAdminException;
+
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last 
processed message)
+     * @return an accurate analysis of the backlog
+     */
     CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
                                                                            
String subscriptionName,
                                                                            
Optional<MessageId> startPosition);
 
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     *
+     *<p>
+     * What's the purpose of this overloaded method? <br/>
+     * There are broker side configurable maximum limits how many entries will 
be read and how long the scanning can
+     * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and 
subscriptionBacklogScanMaxEntries
+     * (default 10000) control this behavior. <br/>
+     * Increasing these settings is possible. However, it's possible that the 
HTTP request times out (also idle timeout
+     * in NAT/firewall etc.) before the command completes so increasing the 
limits might not be useful beyond a few
+     * minutes.
+     *<p/>
+     *
+     *<p>
+     * How does this method work? <br/>
+     * 1. Add a new parameter backlogScanMaxEntries in client side method to 
control the client-side loop termination
+     *    condition. <br/>
+     * 2. If subscriptionBacklogScanMaxEntries(server side) >= 
backlogScanMaxEntries(client side), then
+     *    backlogScanMaxEntries parameter will take no effect. <br/>
+     * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the 
client will call analyze-backlog method in
+     *    a loop until server return ScanOutcome.COMPLETED or the total 
entries exceeds backlogScanMaxEntries. <br/>
+     * 4. This means that backlogScanMaxEntries cannot be used to precisely 
control the number of entries scanned by
+     *    the server, it only serves to determine when the loop should 
terminate. <br/>
+     * 5. With this method, the server can reduce the values of the two 
parameters subscriptionBacklogScanMaxTimeMs and
+     *    subscriptionBacklogScanMaxEntries, so user can retrieve the desired 
number of backlog entries through
+     *    client-side looping.
+     *<p/>

Review Comment:
   There's a formatting inconsistency in the documentation. The opening `<p>` 
tag uses lowercase, but the closing tag uses uppercase and has an extra forward 
slash (`<p/>` instead of `</p>`). While HTML tags are generally 
case-insensitive, it's better to be consistent. Consider using either 
`<p>...</p>` or removing the tags if they're not needed in the JavaDoc.



##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                               
String subscriptionName,
+                                                                               
Optional<MessageId> startPosition,
+                                                                               
long backlogScanMaxEntries) {
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new 
CompletableFuture<>();
+        AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new 
AtomicReference<>();
+        int partitionIndex = TopicName.get(topic).getPartitionIndex();
+        AtomicReference<Optional<MessageId>> startPositionRef = new 
AtomicReference<>(startPosition);
+
+        Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> 
resultSupplier =
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPositionRef.get());
+        BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction 
= new BiConsumer<>() {
+            @Override
+            public void accept(AnalyzeSubscriptionBacklogResult currentResult, 
Throwable throwable) {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+
+                AnalyzeSubscriptionBacklogResult mergedResult = 
mergeBacklogResults(currentResult, resultRef.get());
+                resultRef.set(mergedResult);
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {
+                    future.complete(mergedResult);
+                    return;
+                }
+
+                // To avoid infinite loops, we ensure the entry count is 
incremented after each loop.
+                if (currentResult.getEntries() <= 0) {
+                    log.warn("[{}][{}] Scanned total entry count is null, 
abort analyze backlog, start position is: {}",
+                            topic, subscriptionName, startPositionRef.get());
+                    future.completeExceptionally(
+                            new PulsarAdminException("Incorrect total entry 
count returned from server"));
+                }

Review Comment:
   After completing the future exceptionally, the method should return to 
prevent further execution. Without a return statement, the code continues to 
execute lines 1644-1657, which will attempt to parse the lastMessageId and 
schedule another iteration even though an error has already been reported.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -69,29 +69,23 @@ public void readEntriesComplete(List<Entry> entries, Object 
ctx) {
         try {
             Position lastPositionForBatch = entries.get(entries.size() - 
1).getPosition();
             lastSeenPosition = lastPositionForBatch;
-            // filter out the entry if it has been already deleted
-            // filterReadEntries will call entry.release if the entry is 
filtered out
-            List<Entry> entriesFiltered = 
this.cursor.filterReadEntries(entries);
-            int skippedEntries = entries.size() - entriesFiltered.size();
-            remainingEntries.addAndGet(-skippedEntries);
-            if (!entriesFiltered.isEmpty()) {
-                for (Entry entry : entriesFiltered) {
-                    if (remainingEntries.decrementAndGet() <= 0) {
-                        log.warn("[{}] Scan abort after reading too many 
entries", OpScan.this.cursor);
-                        callback.scanComplete(lastSeenPosition, 
ScanOutcome.ABORTED, OpScan.this.ctx);
-                        return;
-                    }
-                    if (!condition.test(entry)) {
-                        log.warn("[{}] Scan abort due to user code", 
OpScan.this.cursor);
-                        callback.scanComplete(lastSeenPosition, 
ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
-                        return;
-                    }
+            for (Entry entry : entries) {
+                if (remainingEntries.getAndDecrement() <= 0) {
+                    log.warn("[{}] Scan abort after reading too many entries", 
OpScan.this.cursor);
+                    callback.scanComplete(lastSeenPosition, 
ScanOutcome.ABORTED, OpScan.this.ctx);
+                    return;
+                }
+                if (!condition.test(entry)) {
+                    log.warn("[{}] Scan abort due to user code", 
OpScan.this.cursor);
+                    callback.scanComplete(lastSeenPosition, 
ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+                    return;
                 }
             }

Review Comment:
   This change fundamentally alters the behavior of the scan operation by 
removing the filtering of individually deleted entries. The removed code 
filtered out entries that were already individually deleted and didn't count 
them against the scan limits. Without this filtering, deleted entries are now 
counted and processed, which may cause the scan to abort prematurely when 
hitting the entry limit. This appears to be an unintended change that's not 
mentioned in the PR description, which only discusses adding client-side 
looping to avoid HTTP timeouts. Consider whether this behavioral change is 
intentional and document it if so.



##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                               
String subscriptionName,
+                                                                               
Optional<MessageId> startPosition,
+                                                                               
long backlogScanMaxEntries) {
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new 
CompletableFuture<>();
+        AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new 
AtomicReference<>();
+        int partitionIndex = TopicName.get(topic).getPartitionIndex();
+        AtomicReference<Optional<MessageId>> startPositionRef = new 
AtomicReference<>(startPosition);
+
+        Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> 
resultSupplier =
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPositionRef.get());
+        BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction 
= new BiConsumer<>() {
+            @Override
+            public void accept(AnalyzeSubscriptionBacklogResult currentResult, 
Throwable throwable) {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+
+                AnalyzeSubscriptionBacklogResult mergedResult = 
mergeBacklogResults(currentResult, resultRef.get());
+                resultRef.set(mergedResult);
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {
+                    future.complete(mergedResult);
+                    return;
+                }
+
+                // To avoid infinite loops, we ensure the entry count is 
incremented after each loop.
+                if (currentResult.getEntries() <= 0) {
+                    log.warn("[{}][{}] Scanned total entry count is null, 
abort analyze backlog, start position is: {}",

Review Comment:
   The log message says "Scanned total entry count is null" but the condition 
checks if it's <= 0, not if it's null. The message should say "Scanned total 
entry count is zero or negative" to accurately describe the condition being 
checked.
   ```suggestion
                       log.warn("[{}][{}] Scanned total entry count is zero or 
negative, abort analyze backlog, start position is: {}",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to