This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 895e96853e5 [improve][ml] Avoid repetitive nested lock for
isMessageDeleted in ManagedCursorImpl (#23609)
895e96853e5 is described below
commit 895e96853e56b7d9c10c6bd0f3bd6105b664eb14
Author: sinan liu <[email protected]>
AuthorDate: Wed Nov 20 00:24:46 2024 +0800
[improve][ml] Avoid repetitive nested lock for isMessageDeleted in
ManagedCursorImpl (#23609)
---
.../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7c0d13108b1..478c6a1b379 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1569,7 +1569,7 @@ public class ManagedCursorImpl implements ManagedCursor {
Set<Position> alreadyAcknowledgedPositions = new HashSet<>();
lock.readLock().lock();
try {
-
positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
+
positions.stream().filter(this::internalIsMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
} finally {
lock.readLock().unlock();
}
@@ -2345,7 +2345,7 @@ public class ManagedCursorImpl implements ManagedCursor {
return;
}
- if (isMessageDeleted(position)) {
+ if (internalIsMessageDeleted(position)) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable =
batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
@@ -3543,13 +3543,19 @@ public class ManagedCursorImpl implements ManagedCursor
{
public boolean isMessageDeleted(Position position) {
lock.readLock().lock();
try {
- return position.compareTo(markDeletePosition) <= 0
- ||
individualDeletedMessages.contains(position.getLedgerId(),
position.getEntryId());
+ return internalIsMessageDeleted(position);
} finally {
lock.readLock().unlock();
}
}
+ // When this method is called while the external has already acquired a
write lock or a read lock,
+ // it avoids unnecessary lock nesting.
+ private boolean internalIsMessageDeleted(Position position) {
+ return position.compareTo(markDeletePosition) <= 0
+ || individualDeletedMessages.contains(position.getLedgerId(),
position.getEntryId());
+ }
+
//this method will return a copy of the position's ack set
@Override
public long[] getBatchPositionAckSet(Position position) {