This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cc7b3816ce1 [fix] [ml] incorrect non-durable cursor's backlog due to
concurrently trimming ledger and non-durable cursor creation (#23951)
cc7b3816ce1 is described below
commit cc7b3816ce14a2ddff18dbf04216769fdb8751db
Author: fengyubiao <[email protected]>
AuthorDate: Tue Feb 11 11:03:39 2025 +0800
[fix] [ml] incorrect non-durable cursor's backlog due to concurrently
trimming ledger and non-durable cursor creation (#23951)
Co-authored-by: Yunze Xu <[email protected]>
---
.../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f9a0ff26208..7426059e576 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1152,16 +1152,17 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
return cachedCursor;
}
- NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper,
this, cursorName,
- startCursorPosition, initialPosition, isReadCompacted);
- cursor.setActive();
-
- log.info("[{}] Opened new cursor: {}", name, cursor);
+ // The backlog of a non-durable cursor could be incorrect if the
cursor is created before `internalTrimLedgers`
+ // and added to the managed ledger after `internalTrimLedgers`.
+ // For more details, see https://github.com/apache/pulsar/pull/23951.
synchronized (this) {
+ NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper,
this, cursorName,
+ startCursorPosition, initialPosition, isReadCompacted);
+ cursor.setActive();
+ log.info("[{}] Opened new cursor: {}", name, cursor);
addCursor(cursor);
+ return cursor;
}
-
- return cursor;
}
@Override