Copilot commented on code in PR #25127:
URL: https://github.com/apache/pulsar/pull/25127#discussion_r2671561014
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() || mergedResult.getEntries() >=
backlogScanMaxEntries) {
+ future.complete(mergedResult);
+ return;
+ }
+
+ // To avoid infinite loops, we ensure the entry count is
incremented after each loop.
+ if (currentResult.getEntries() <= 0) {
+ log.warn("[{}][{}] Scanned total entry count is null,
abort analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect total entry
count returned from server"));
+ }
+
+ // In analyze-backlog, lastMessageId is null only when: total
entries is 0,
+ // with false aborted flag returned.
+ if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+ log.warn("[{}][{}] Scanned last message id is blank, abort
analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect last message
id returned from server"));
+ }
Review Comment:
After completing the future exceptionally, the method should return to
prevent further execution. Without a return statement, the code continues to
execute lines 1651-1657, which will attempt to parse the lastMessageId and
schedule another iteration even though an error has already been reported.
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() || mergedResult.getEntries() >=
backlogScanMaxEntries) {
+ future.complete(mergedResult);
+ return;
+ }
+
+ // To avoid infinite loops, we ensure the entry count is
incremented after each loop.
+ if (currentResult.getEntries() <= 0) {
+ log.warn("[{}][{}] Scanned total entry count is null,
abort analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect total entry
count returned from server"));
+ }
+
+ // In analyze-backlog, lastMessageId is null only when: total
entries is 0,
+ // with false aborted flag returned.
+ if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+ log.warn("[{}][{}] Scanned last message id is blank, abort
analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect last message
id returned from server"));
+ }
+
+ String[] messageIdSplits =
mergedResult.getLastMessageId().split(":");
+ MessageIdImpl nextScanMessageId =
+ new MessageIdImpl(Long.parseLong(messageIdSplits[0]),
Long.parseLong(messageIdSplits[1]) + 1,
+ partitionIndex);
Review Comment:
The message ID parsing logic is fragile and lacks error handling. If the
lastMessageId format is unexpected (e.g., doesn't contain a colon, has
non-numeric values, or has fewer than 2 parts), this will throw
ArrayIndexOutOfBoundsException or NumberFormatException, causing an unhandled
exception. Consider adding validation and proper error handling for the parsing
operation.
```suggestion
return;
}
String[] messageIdSplits =
mergedResult.getLastMessageId().split(":");
if (messageIdSplits.length < 2) {
log.warn("[{}][{}] Scanned last message id has
unexpected format '{}', "
+ "abort analyze backlog, start position
is: {}",
topic, subscriptionName,
mergedResult.getLastMessageId(), startPositionRef.get());
future.completeExceptionally(
new PulsarAdminException("Incorrect last message
id format returned from server"));
return;
}
final long ledgerId;
final long entryId;
try {
ledgerId = Long.parseLong(messageIdSplits[0]);
entryId = Long.parseLong(messageIdSplits[1]);
} catch (NumberFormatException e) {
log.warn("[{}][{}] Failed to parse last message id '{}',
abort analyze backlog, "
+ "start position is: {}",
topic, subscriptionName,
mergedResult.getLastMessageId(), startPositionRef.get(), e);
future.completeExceptionally(
new PulsarAdminException("Incorrect last message
id format returned from server", e));
return;
}
MessageIdImpl nextScanMessageId =
new MessageIdImpl(ledgerId, entryId + 1,
partitionIndex);
```
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() || mergedResult.getEntries() >=
backlogScanMaxEntries) {
+ future.complete(mergedResult);
+ return;
+ }
+
+ // To avoid infinite loops, we ensure the entry count is
incremented after each loop.
+ if (currentResult.getEntries() <= 0) {
+ log.warn("[{}][{}] Scanned total entry count is null,
abort analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect total entry
count returned from server"));
+ }
+
+ // In analyze-backlog, lastMessageId is null only when: total
entries is 0,
+ // with false aborted flag returned.
+ if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+ log.warn("[{}][{}] Scanned last message id is blank, abort
analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect last message
id returned from server"));
+ }
+
+ String[] messageIdSplits =
mergedResult.getLastMessageId().split(":");
+ MessageIdImpl nextScanMessageId =
+ new MessageIdImpl(Long.parseLong(messageIdSplits[0]),
Long.parseLong(messageIdSplits[1]) + 1,
+ partitionIndex);
+ startPositionRef.set(Optional.of(nextScanMessageId));
+
+ resultSupplier.get().whenComplete(this);
+ }
+ };
+
+ resultSupplier.get().whenComplete(completeAction);
+ return future;
+ }
+
+ private AnalyzeSubscriptionBacklogResult
mergeBacklogResults(AnalyzeSubscriptionBacklogResult current,
+
AnalyzeSubscriptionBacklogResult previous) {
+ if (previous == null) {
+ return current;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedRes = new
AnalyzeSubscriptionBacklogResult();
+ mergedRes.setEntries(current.getEntries() + previous.getEntries());
+ mergedRes.setMessages(current.getMessages() + previous.getMessages());
+ mergedRes.setMarkerMessages(current.getMarkerMessages() +
previous.getMarkerMessages());
+
+ mergedRes.setFilterAcceptedEntries(current.getFilterAcceptedEntries()
+ previous.getFilterAcceptedEntries());
+ mergedRes.setFilterRejectedEntries(current.getFilterRejectedEntries()
+ previous.getFilterRejectedEntries());
+ mergedRes.setFilterRescheduledEntries(
+ current.getFilterRescheduledEntries() +
previous.getFilterRescheduledEntries());
+
+
mergedRes.setFilterAcceptedMessages(current.getFilterAcceptedMessages() +
previous.getFilterAcceptedMessages());
+
mergedRes.setFilterRejectedMessages(current.getFilterRejectedMessages() +
previous.getFilterRejectedMessages());
+ mergedRes.setFilterRescheduledMessages(
+ current.getFilterRescheduledMessages() +
previous.getFilterRescheduledMessages());
+
+ mergedRes.setAborted(current.isAborted());
+ mergedRes.setFirstMessageId(current.getFirstMessageId());
Review Comment:
The mergeBacklogResults method should use the firstMessageId from the
previous result (if available) instead of always using the current result's
firstMessageId. When merging multiple scan results, the first message ID should
refer to the very first message from the initial scan, not the first message
from the latest scan iteration.
```suggestion
mergedRes.setFirstMessageId(previous.getFirstMessageId() != null
? previous.getFirstMessageId()
: current.getFirstMessageId());
```
##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java:
##########
@@ -2220,21 +2220,88 @@ AnalyzeSubscriptionBacklogResult
analyzeSubscriptionBacklog(String topic, String
* This is a potentially expensive operation, as it requires
* to read the messages from storage.
* This function takes into consideration batch messages
- * and also Subscription filters.
+ * and also Subscription filters. <br/>
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String,
Optional, long)}
* @param topic
* Topic name
* @param subscriptionName
* the subscription
* @param startPosition
* the position to start the scan from (empty means the last
processed message)
+ * @param backlogScanMaxEntries
+ * the maximum number of backlog entries the client will scan
before terminating its loop
* @return an accurate analysis of the backlog
* @throws PulsarAdminException
* Unexpected error
*/
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
String subscriptionName,
+
Optional<MessageId> startPosition,
+ long
backlogScanMaxEntries) throws PulsarAdminException;
+
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last
processed message)
+ * @return an accurate analysis of the backlog
+ */
CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName,
Optional<MessageId> startPosition);
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ *
+ *<p>
+ * What's the purpose of this overloaded method? <br/>
+ * There are broker side configurable maximum limits how many entries will
be read and how long the scanning can
+ * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and
subscriptionBacklogScanMaxEntries
+ * (default 10000) control this behavior. <br/>
+ * Increasing these settings is possible. However, it's possible that the
HTTP request times out (also idle timeout
+ * in NAT/firewall etc.) before the command completes so increasing the
limits might not be useful beyond a few
+ * minutes.
+ *<p/>
Review Comment:
There's a formatting inconsistency in the documentation. The opening `<p>`
tag uses lowercase, but the closing tag uses uppercase and has an extra forward
slash (`<p/>` instead of `</p>`). While HTML tags are generally
case-insensitive, it's better to be consistent. Consider using either
`<p>...</p>` or removing the tags if they're not needed in the JavaDoc.
##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java:
##########
@@ -2220,21 +2220,88 @@ AnalyzeSubscriptionBacklogResult
analyzeSubscriptionBacklog(String topic, String
* This is a potentially expensive operation, as it requires
* to read the messages from storage.
* This function takes into consideration batch messages
- * and also Subscription filters.
+ * and also Subscription filters. <br/>
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String,
Optional, long)}
* @param topic
* Topic name
* @param subscriptionName
* the subscription
* @param startPosition
* the position to start the scan from (empty means the last
processed message)
+ * @param backlogScanMaxEntries
+ * the maximum number of backlog entries the client will scan
before terminating its loop
* @return an accurate analysis of the backlog
* @throws PulsarAdminException
* Unexpected error
*/
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
String subscriptionName,
+
Optional<MessageId> startPosition,
+ long
backlogScanMaxEntries) throws PulsarAdminException;
+
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last
processed message)
+ * @return an accurate analysis of the backlog
+ */
CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName,
Optional<MessageId> startPosition);
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ *
+ *<p>
+ * What's the purpose of this overloaded method? <br/>
+ * There are broker side configurable maximum limits how many entries will
be read and how long the scanning can
+ * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and
subscriptionBacklogScanMaxEntries
+ * (default 10000) control this behavior. <br/>
+ * Increasing these settings is possible. However, it's possible that the
HTTP request times out (also idle timeout
+ * in NAT/firewall etc.) before the command completes so increasing the
limits might not be useful beyond a few
+ * minutes.
+ *<p/>
+ *
+ *<p>
+ * How does this method work? <br/>
+ * 1. Add a new parameter backlogScanMaxEntries in client side method to
control the client-side loop termination
+ * condition. <br/>
+ * 2. If subscriptionBacklogScanMaxEntries(server side) >=
backlogScanMaxEntries(client side), then
+ * backlogScanMaxEntries parameter will take no effect. <br/>
+ * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the
client will call analyze-backlog method in
+ * a loop until server return ScanOutcome.COMPLETED or the total
entries exceeds backlogScanMaxEntries. <br/>
+ * 4. This means that backlogScanMaxEntries cannot be used to precisely
control the number of entries scanned by
+ * the server, it only serves to determine when the loop should
terminate. <br/>
+ * 5. With this method, the server can reduce the values of the two
parameters subscriptionBacklogScanMaxTimeMs and
+ * subscriptionBacklogScanMaxEntries, so user can retrieve the desired
number of backlog entries through
+ * client-side looping.
+ *<p/>
Review Comment:
There's a formatting inconsistency in the documentation. The opening `<p>`
tag uses lowercase, but the closing tag uses uppercase and has an extra forward
slash (`<p/>` instead of `</p>`). While HTML tags are generally
case-insensitive, it's better to be consistent. Consider using either
`<p>...</p>` or removing the tags if they're not needed in the JavaDoc.
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() || mergedResult.getEntries() >=
backlogScanMaxEntries) {
+ future.complete(mergedResult);
+ return;
+ }
+
+ // To avoid infinite loops, we ensure the entry count is
incremented after each loop.
+ if (currentResult.getEntries() <= 0) {
+ log.warn("[{}][{}] Scanned total entry count is null,
abort analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect total entry
count returned from server"));
+ }
Review Comment:
After completing the future exceptionally, the method should return to
prevent further execution. Without a return statement, the code continues to
execute lines 1644-1657, which will attempt to parse the lastMessageId and
schedule another iteration even though an error has already been reported.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -69,29 +69,23 @@ public void readEntriesComplete(List<Entry> entries, Object
ctx) {
try {
Position lastPositionForBatch = entries.get(entries.size() -
1).getPosition();
lastSeenPosition = lastPositionForBatch;
- // filter out the entry if it has been already deleted
- // filterReadEntries will call entry.release if the entry is
filtered out
- List<Entry> entriesFiltered =
this.cursor.filterReadEntries(entries);
- int skippedEntries = entries.size() - entriesFiltered.size();
- remainingEntries.addAndGet(-skippedEntries);
- if (!entriesFiltered.isEmpty()) {
- for (Entry entry : entriesFiltered) {
- if (remainingEntries.decrementAndGet() <= 0) {
- log.warn("[{}] Scan abort after reading too many
entries", OpScan.this.cursor);
- callback.scanComplete(lastSeenPosition,
ScanOutcome.ABORTED, OpScan.this.ctx);
- return;
- }
- if (!condition.test(entry)) {
- log.warn("[{}] Scan abort due to user code",
OpScan.this.cursor);
- callback.scanComplete(lastSeenPosition,
ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
- return;
- }
+ for (Entry entry : entries) {
+ if (remainingEntries.getAndDecrement() <= 0) {
+ log.warn("[{}] Scan abort after reading too many entries",
OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition,
ScanOutcome.ABORTED, OpScan.this.ctx);
+ return;
+ }
+ if (!condition.test(entry)) {
+ log.warn("[{}] Scan abort due to user code",
OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition,
ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+ return;
}
}
Review Comment:
This change fundamentally alters the behavior of the scan operation by
removing the filtering of individually deleted entries. The removed code
filtered out entries that were already individually deleted and didn't count
them against the scan limits. Without this filtering, deleted entries are now
counted and processed, which may cause the scan to abort prematurely when
hitting the entry limit. This appears to be an unintended change that's not
mentioned in the PR description, which only discusses adding client-side
looping to avoid HTTP timeouts. Consider whether this behavioral change is
intentional and document it if so.
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,92 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() || mergedResult.getEntries() >=
backlogScanMaxEntries) {
+ future.complete(mergedResult);
+ return;
+ }
+
+ // To avoid infinite loops, we ensure the entry count is
incremented after each loop.
+ if (currentResult.getEntries() <= 0) {
+ log.warn("[{}][{}] Scanned total entry count is null,
abort analyze backlog, start position is: {}",
Review Comment:
The log message says "Scanned total entry count is null" but the condition
checks if it's <= 0, not if it's null. The message should say "Scanned total
entry count is zero or negative" to accurately describe the condition being
checked.
```suggestion
log.warn("[{}][{}] Scanned total entry count is zero or
negative, abort analyze backlog, start position is: {}",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]