lhotari commented on code in PR #25127:
URL: https://github.com/apache/pulsar/pull/25127#discussion_r2707526751
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,94 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() || mergedResult.getEntries() >=
backlogScanMaxEntries) {
Review Comment:
the `!mergedResult.isAborted()` part isn't correct.
https://github.com/apache/pulsar/blob/da0d11644f097ae657e79b2eb7835478d79ccd0e/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java#L119-L128
The purpose of the loop is to keep on iterating until the result is
COMPLETED or the
`java.util.function.Predicate<AnalyzeSubscriptionBacklogResult>
continuePredicate` returns false (condition in continuePredicate could be
`result.getEntries() < backlogScanMaxEntries`).
It seems that
`conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);`
isn't effective in the tests since some of the tests should have failed.
##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java:
##########
@@ -2220,21 +2220,88 @@ AnalyzeSubscriptionBacklogResult
analyzeSubscriptionBacklog(String topic, String
* This is a potentially expensive operation, as it requires
* to read the messages from storage.
* This function takes into consideration batch messages
- * and also Subscription filters.
+ * and also Subscription filters. <br/>
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String,
Optional, long)}
* @param topic
* Topic name
* @param subscriptionName
* the subscription
* @param startPosition
* the position to start the scan from (empty means the last
processed message)
+ * @param backlogScanMaxEntries
+ * the maximum number of backlog entries the client will scan
before terminating its loop
* @return an accurate analysis of the backlog
* @throws PulsarAdminException
* Unexpected error
*/
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
String subscriptionName,
+
Optional<MessageId> startPosition,
+ long
backlogScanMaxEntries) throws PulsarAdminException;
+
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last
processed message)
+ * @return an accurate analysis of the backlog
+ */
CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName,
Optional<MessageId> startPosition);
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ *
+ *<p>
+ * What's the purpose of this overloaded method? <br/>
+ * There are broker side configurable maximum limits how many entries will
be read and how long the scanning can
+ * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and
subscriptionBacklogScanMaxEntries
+ * (default 10000) control this behavior. <br/>
+ * Increasing these settings is possible. However, it's possible that the
HTTP request times out (also idle timeout
+ * in NAT/firewall etc.) before the command completes so increasing the
limits might not be useful beyond a few
+ * minutes.
+ *</p>
+ *
+ *<p>
+ * How does this method work? <br/>
+ * 1. Add a new parameter backlogScanMaxEntries in client side method to
control the client-side loop termination
+ * condition. <br/>
+ * 2. If subscriptionBacklogScanMaxEntries(server side) >=
backlogScanMaxEntries(client side), then
+ * backlogScanMaxEntries parameter will take no effect. <br/>
+ * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the
client will call analyze-backlog method in
+ * a loop until server return ScanOutcome.COMPLETED or the total
entries exceeds backlogScanMaxEntries. <br/>
+ * 4. This means that backlogScanMaxEntries cannot be used to precisely
control the number of entries scanned by
+ * the server, it only serves to determine when the loop should
terminate. <br/>
+ * 5. With this method, the server can reduce the values of the two
parameters subscriptionBacklogScanMaxTimeMs and
+ * subscriptionBacklogScanMaxEntries, so user can retrieve the desired
number of backlog entries through
+ * client-side looping.
+ *</p>
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last
processed message)
+ * @param backlogScanMaxEntries
+ * the maximum number of backlog entries the client will scan
before terminating its loop
+ * @return an accurate analysis of the backlog
+ */
+ CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries);
Review Comment:
It would be useful to have a method where `long backlogScanMaxEntries` would
be replaced with
`java.util.function.Predicate<AnalyzeSubscriptionBacklogResult>
continuePredicate`. This would accept the accumulated
`AnalyzeSubscriptionBacklogResult` as a parameter. The benefit of this is that
the user of the API can choose to add logging when the accumulated result gets
updated.
The method signature with the `long backlogScanMaxEntries` can delegate to
this method with the
`java.util.function.Predicate<AnalyzeSubscriptionBacklogResult>
continuePredicate` parameter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]