This is an automated email from the ASF dual-hosted git repository. zixuan pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new ad5693cdda4 [fix][broker] Check the markDeletePosition and calculate the backlog (#22947) ad5693cdda4 is described below commit ad5693cdda46738ed37ef58177139d5c7406596c Author: Zixuan Liu <node...@gmail.com> AuthorDate: Thu Jun 20 21:47:27 2024 +0800 [fix][broker] Check the markDeletePosition and calculate the backlog (#22947) Signed-off-by: Zixuan Liu <node...@gmail.com> (cherry picked from commit 82b8d98a488191d279612d5cf2b4846627863543) --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8867d81cb1b..70342228d15 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1018,6 +1018,13 @@ public class ManagedCursorImpl implements ManagedCursor { return ledger.estimateBacklogFromPosition(markDeletePosition); } + private long getNumberOfEntriesInBacklog() { + if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) { + return 0; + } + return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + } + @Override public long getNumberOfEntriesInBacklog(boolean isPrecise) { if (log.isDebugEnabled()) { @@ -1026,16 +1033,13 @@ public class ManagedCursorImpl implements ManagedCursor { messagesConsumedCounter, markDeletePosition, readPosition); } if (isPrecise) { - if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) { - return 0; - } - return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + return getNumberOfEntriesInBacklog(); } long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter; if (backlog < 0) { // In some case the counters get incorrect values, fall back to the precise backlog count - backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + backlog = getNumberOfEntriesInBacklog(); } return backlog;