This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 781a02b2085 [cleanup][ml] ManagedCursor clean up. (#22246) 781a02b2085 is described below commit 781a02b20859e61361f1d18c369c5d00d1b2f7fd Author: 道君 <dao...@apache.org> AuthorDate: Tue Mar 12 23:36:59 2024 +0800 [cleanup][ml] ManagedCursor clean up. (#22246) --- .../java/org/apache/bookkeeper/mledger/impl/EntryImpl.java | 7 ++++++- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 6512399173f..80397931357 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -42,6 +42,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr private long timestamp; private long ledgerId; private long entryId; + private PositionImpl position; ByteBuf data; private Runnable onDeallocate; @@ -151,7 +152,10 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr @Override public PositionImpl getPosition() { - return new PositionImpl(ledgerId, entryId); + if (position == null) { + position = PositionImpl.get(ledgerId, entryId); + } + return position; } @Override @@ -197,6 +201,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr timestamp = -1; ledgerId = -1; entryId = -1; + position = null; recyclerHandle.recycle(this); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0c8dedd6b21..7065af203da 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1506,10 +1506,7 @@ public class ManagedCursorImpl implements ManagedCursor { Set<Position> alreadyAcknowledgedPositions = new HashSet<>(); lock.readLock().lock(); try { - positions.stream() - .filter(position -> ((PositionImpl) position).compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) - .forEach(alreadyAcknowledgedPositions::add); + positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add); } finally { lock.readLock().unlock(); } @@ -2281,8 +2278,7 @@ public class ManagedCursorImpl implements ManagedCursor { return; } - if (position.compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) { + if (isMessageDeleted(position)) { if (config.isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { @@ -3517,8 +3513,7 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public void trimDeletedEntries(List<Entry> entries) { entries.removeIf(entry -> { - boolean isDeleted = markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0 - || individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()); + boolean isDeleted = isMessageDeleted(entry.getPosition()); if (isDeleted) { entry.release(); }