oneby-wang commented on code in PR #25127:
URL: https://github.com/apache/pulsar/pull/25127#discussion_r2767228687
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1613,103 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ return analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPosition,
+ (backlogResult) -> backlogResult.getEntries() >=
backlogScanMaxEntries);
+ }
+
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+ String
subscriptionName, Optional<MessageId> startPosition,
+
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() ||
terminatePredicate.test(mergedResult)) {
+ future.complete(mergedResult);
+ return;
+ }
+
+ // To avoid infinite loops, we ensure the entry count is
incremented after each loop.
+ // Should never happen.
+ if (currentResult.getEntries() <= 0) {
+ log.warn("[{}][{}] Scanned total entry count is zero or
negative, abort analyze backlog, start "
+ + "position is: {}", topic, subscriptionName,
startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect total entry
count returned from server"));
Review Comment:
Addressed.
> I would assume that it's possible to have a case where there's exactly the
requested amount of entries available on the broker side and it would return
the results because of reaching the max limit of an individual call. On the
next call, the request would return 0 entries.
I think this would not happen. If `ScanOutcome != ScanOutcome.COMPLETED` in
`OpScan`, there should be more entries to scan.
But I strongly agree with the comment, completing exceptionally is less
developer-friendly. Since the broker scan logic could be changed in the future,
and it is every reasonable to treat 0 entries or a null `lastMessageId` as no
more entries.
`ScanOutcome.USER_INTERRUPTED` would not returned in `OpScan`,
`ScanOutcome.COMPLETED` is returned in the following conditions:
```
if (cursor.hasMoreEntries(searchPosition)) {
OpReadEntry opReadEntry = OpReadEntry.create(cursor, searchPosition,
batchSize,
this, OpScan.this.ctx, null, null, false);
ledger.asyncReadEntries(opReadEntry);
} else {
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED,
OpScan.this.ctx);
}
```
or
```
searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1,
PositionBound.startExcluded);
if (searchPosition.compareTo(lastPositionForBatch) == 0) {
// we have reached the end of the ledger, as we are not doing progress
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED,
OpScan.this.ctx);
return;
}
```
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1613,103 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ return analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPosition,
+ (backlogResult) -> backlogResult.getEntries() >=
backlogScanMaxEntries);
+ }
+
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+ String
subscriptionName, Optional<MessageId> startPosition,
+
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() ||
terminatePredicate.test(mergedResult)) {
+ future.complete(mergedResult);
+ return;
+ }
+
+ // To avoid infinite loops, we ensure the entry count is
incremented after each loop.
+ // Should never happen.
+ if (currentResult.getEntries() <= 0) {
+ log.warn("[{}][{}] Scanned total entry count is zero or
negative, abort analyze backlog, start "
+ + "position is: {}", topic, subscriptionName,
startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect total entry
count returned from server"));
+ return;
+ }
+
+ // In analyze-backlog, lastMessageId is null only when: total
entries is 0, with false aborted flag
+ // returned. Should never happen.
+ if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+ log.warn("[{}][{}] Scanned last message id is blank, abort
analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
Review Comment:
Addressed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]