lhotari commented on code in PR #25127:
URL: https://github.com/apache/pulsar/pull/25127#discussion_r2707526751


##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,94 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                               
String subscriptionName,
+                                                                               
Optional<MessageId> startPosition,
+                                                                               
long backlogScanMaxEntries) {
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new 
CompletableFuture<>();
+        AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new 
AtomicReference<>();
+        int partitionIndex = TopicName.get(topic).getPartitionIndex();
+        AtomicReference<Optional<MessageId>> startPositionRef = new 
AtomicReference<>(startPosition);
+
+        Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> 
resultSupplier =
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPositionRef.get());
+        BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction 
= new BiConsumer<>() {
+            @Override
+            public void accept(AnalyzeSubscriptionBacklogResult currentResult, 
Throwable throwable) {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+
+                AnalyzeSubscriptionBacklogResult mergedResult = 
mergeBacklogResults(currentResult, resultRef.get());
+                resultRef.set(mergedResult);
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {

Review Comment:
   the `!mergedResult.isAborted()` part isn't correct.
   
   
https://github.com/apache/pulsar/blob/da0d11644f097ae657e79b2eb7835478d79ccd0e/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java#L119-L128
   
   The purpose of the loop is to keep on iterating until the result is 
COMPLETED or the 
`java.util.function.Predicate<AnalyzeSubscriptionBacklogResult> 
continuePredicate` returns false (condition in continuePredicate could be 
`result.getEntries() < backlogScanMaxEntries`).
   
   It seems that 
`conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);`
 isn't effective in the tests since some of the tests should have failed.



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java:
##########
@@ -2220,21 +2220,88 @@ AnalyzeSubscriptionBacklogResult 
analyzeSubscriptionBacklog(String topic, String
      * This is a potentially expensive operation, as it requires
      * to read the messages from storage.
      * This function takes into consideration batch messages
-     * and also Subscription filters.
+     * and also Subscription filters. <br/>
+     * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, 
Optional, long)}
      * @param topic
      *            Topic name
      * @param subscriptionName
      *            the subscription
      * @param startPosition
      *           the position to start the scan from (empty means the last 
processed message)
+     * @param backlogScanMaxEntries
+     *           the maximum number of backlog entries the client will scan 
before terminating its loop
      * @return an accurate analysis of the backlog
      * @throws PulsarAdminException
      *            Unexpected error
      */
+    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, 
String subscriptionName,
+                                                                
Optional<MessageId> startPosition,
+                                                                long 
backlogScanMaxEntries) throws PulsarAdminException;
+
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last 
processed message)
+     * @return an accurate analysis of the backlog
+     */
     CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
                                                                            
String subscriptionName,
                                                                            
Optional<MessageId> startPosition);
 
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     *
+     *<p>
+     * What's the purpose of this overloaded method? <br/>
+     * There are broker side configurable maximum limits how many entries will 
be read and how long the scanning can
+     * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and 
subscriptionBacklogScanMaxEntries
+     * (default 10000) control this behavior. <br/>
+     * Increasing these settings is possible. However, it's possible that the 
HTTP request times out (also idle timeout
+     * in NAT/firewall etc.) before the command completes so increasing the 
limits might not be useful beyond a few
+     * minutes.
+     *</p>
+     *
+     *<p>
+     * How does this method work? <br/>
+     * 1. Add a new parameter backlogScanMaxEntries in client side method to 
control the client-side loop termination
+     *    condition. <br/>
+     * 2. If subscriptionBacklogScanMaxEntries(server side) >= 
backlogScanMaxEntries(client side), then
+     *    backlogScanMaxEntries parameter will take no effect. <br/>
+     * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the 
client will call analyze-backlog method in
+     *    a loop until server return ScanOutcome.COMPLETED or the total 
entries exceeds backlogScanMaxEntries. <br/>
+     * 4. This means that backlogScanMaxEntries cannot be used to precisely 
control the number of entries scanned by
+     *    the server, it only serves to determine when the loop should 
terminate. <br/>
+     * 5. With this method, the server can reduce the values of the two 
parameters subscriptionBacklogScanMaxTimeMs and
+     *    subscriptionBacklogScanMaxEntries, so user can retrieve the desired 
number of backlog entries through
+     *    client-side looping.
+     *</p>
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last 
processed message)
+     * @param backlogScanMaxEntries
+     *           the maximum number of backlog entries the client will scan 
before terminating its loop
+     * @return an accurate analysis of the backlog
+     */
+    CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                            
String subscriptionName,
+                                                                            
Optional<MessageId> startPosition,
+                                                                            
long backlogScanMaxEntries);

Review Comment:
   It would be useful to have a method where `long backlogScanMaxEntries` would 
be replaced with 
`java.util.function.Predicate<AnalyzeSubscriptionBacklogResult> 
continuePredicate`. This would accept the accumulated 
`AnalyzeSubscriptionBacklogResult` as a parameter. The benefit of this is that 
the user of the API can choose to add logging when the accumulated result gets 
updated.
   
   The method signature with the `long backlogScanMaxEntries` can delegate to 
this method with the 
`java.util.function.Predicate<AnalyzeSubscriptionBacklogResult> 
continuePredicate` parameter.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to