ShuKe-code opened a new issue, #25813: URL: https://github.com/apache/pulsar/issues/25813
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment pulsar broker 3.0.12 java 17 ### Issue Description Observed on a 3.0.x based branch. ### What did you expect to see? After the inactive subscription is deleted and all old ledgers are trimmed, recreating the durable subscription with `Earliest` should initialize the cursor with backlog `0` if there is no real remaining data. `stats.backlog`, `stats-internal`, and precise backlog should be consistent. ### What did you see instead? `admin topics stats` can report `backlog > 0`, while: - `admin topics stats-internal` shows old ledgers already trimmed - precise backlog is `0` - the consumer cannot actually receive any message - unloading the topic makes the reported backlog become `0` ### Error messages ```text ``` ### Reproducing the issue 1. Create a partitioned topic with durable subscription. 2. Produce messages across multiple ledgers. 3. Configure retention aggressively so old ledgers are trimmed, leaving only a new empty current ledger. 5. Create a new subscription name and `SubscriptionInitialPosition.Earliest`. 6. Compare: - `admin topics stats <topic>` - `admin topics stats-internal <topic>` - precise backlog ### Additional information Root cause is in `ManagedLedgerImpl#getFirstPositionAndCounter()` during earliest cursor initialization. When all historical ledgers are already trimmed and only an empty current ledger remains, `getFirstPosition()` can return a synthetic position on the old last confirmed ledger (for example `469:-1`). `getFirstPositionAndCounter()` then uses that synthetic position in `getNumberOfEntries(Range.openClosed(...))`, which counts entries from a ledger that no longer exists in `ledgers`. This leads to an incorrect `messagesConsumedCounter` when a durable subscription is recreated with `SubscriptionInitialPosition.Earliest` after inactive subscription deletion. The inaccurate counter then propagates into approximate backlog stats. A fix is to treat this synthetic-first-position case specially and initialize the earliest cursor from `lastConfirmedEntry` and `entriesAddedCounter` instead of counting against trimmed ledgers. --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3798,19 +3798,40 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { */ Pair<PositionImpl, Long> getFirstPositionAndCounter() { PositionImpl pos; + PositionImpl firstPosition; long count; Pair<PositionImpl, Long> lastPositionAndCounter; do { - pos = getFirstPosition(); + firstPosition = getFirstPosition(); lastPositionAndCounter = getLastPositionAndCounter(); - count = lastPositionAndCounter.getRight() - - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.getLeft())); - } while (pos.compareTo(getFirstPosition()) != 0 + if (isSyntheticFirstPosition(firstPosition)) { + pos = lastPositionAndCounter.getLeft(); + count = lastPositionAndCounter.getRight(); + } else { + pos = firstPosition; + count = lastPositionAndCounter.getRight() + - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.getLeft())); + } + } while (firstPosition.compareTo(getFirstPosition()) != 0 || lastPositionAndCounter.getLeft().compareTo(getLastPosition()) != 0); return Pair.of(pos, count); } + private boolean isSyntheticFirstPosition(PositionImpl position) { + Long firstLedgerId = ledgers.firstKey(); + if (firstLedgerId == null || position == null) { + return false; + } + + LedgerInfo firstLedger = ledgers.get(firstLedgerId); + return firstLedgerId > lastConfirmedEntry.getLedgerId() + && position.getLedgerId() == lastConfirmedEntry.getLedgerId() + && position.getEntryId() == -1 + && firstLedger != null + && firstLedger.getEntries() == 0; + } + ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
