This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d692bc1635cc276729a84da5d5d50dfb2bff96ff Author: sinan liu <[email protected]> AuthorDate: Wed Nov 20 00:24:46 2024 +0800 [improve][ml] Avoid repetitive nested lock for isMessageDeleted in ManagedCursorImpl (#23609) (cherry picked from commit 895e96853e56b7d9c10c6bd0f3bd6105b664eb14) --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index a8096a08e2f..85afae4e994 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1522,7 +1522,7 @@ public class ManagedCursorImpl implements ManagedCursor { Set<Position> alreadyAcknowledgedPositions = new HashSet<>(); lock.readLock().lock(); try { - positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add); + positions.stream().filter(this::internalIsMessageDeleted).forEach(alreadyAcknowledgedPositions::add); } finally { lock.readLock().unlock(); } @@ -2294,7 +2294,7 @@ public class ManagedCursorImpl implements ManagedCursor { return; } - if (isMessageDeleted(position)) { + if (internalIsMessageDeleted(position)) { if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { @@ -3468,13 +3468,19 @@ public class ManagedCursorImpl implements ManagedCursor { checkArgument(position instanceof PositionImpl); lock.readLock().lock(); try { - return ((PositionImpl) position).compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); + return internalIsMessageDeleted(position); } finally { lock.readLock().unlock(); } } + // When this method is called while the external has already acquired a write lock or a read lock, + // it avoids unnecessary lock nesting. + private boolean internalIsMessageDeleted(Position position) { + return ((PositionImpl) position).compareTo(markDeletePosition) <= 0 + || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); + } + //this method will return a copy of the position's ack set public long[] getBatchPositionAckSet(Position position) { if (!(position instanceof PositionImpl)) {
