This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 781a02b2085 [cleanup][ml] ManagedCursor clean up. (#22246)
781a02b2085 is described below

commit 781a02b20859e61361f1d18c369c5d00d1b2f7fd
Author: 道君 <dao...@apache.org>
AuthorDate: Tue Mar 12 23:36:59 2024 +0800

    [cleanup][ml] ManagedCursor clean up. (#22246)
---
 .../java/org/apache/bookkeeper/mledger/impl/EntryImpl.java    |  7 ++++++-
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++--------
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index 6512399173f..80397931357 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -42,6 +42,7 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted implements Entr
     private long timestamp;
     private long ledgerId;
     private long entryId;
+    private PositionImpl position;
     ByteBuf data;
 
     private Runnable onDeallocate;
@@ -151,7 +152,10 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted implements Entr
 
     @Override
     public PositionImpl getPosition() {
-        return new PositionImpl(ledgerId, entryId);
+        if (position == null) {
+            position = PositionImpl.get(ledgerId, entryId);
+        }
+        return position;
     }
 
     @Override
@@ -197,6 +201,7 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted implements Entr
         timestamp = -1;
         ledgerId = -1;
         entryId = -1;
+        position = null;
         recyclerHandle.recycle(this);
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 0c8dedd6b21..7065af203da 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1506,10 +1506,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         Set<Position> alreadyAcknowledgedPositions = new HashSet<>();
         lock.readLock().lock();
         try {
-            positions.stream()
-                    .filter(position -> ((PositionImpl) 
position).compareTo(markDeletePosition) <= 0
-                            || 
individualDeletedMessages.contains(position.getLedgerId(), 
position.getEntryId()))
-                    .forEach(alreadyAcknowledgedPositions::add);
+            
positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
         } finally {
             lock.readLock().unlock();
         }
@@ -2281,8 +2278,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     return;
                 }
 
-                if (position.compareTo(markDeletePosition) <= 0
-                        || 
individualDeletedMessages.contains(position.getLedgerId(), 
position.getEntryId())) {
+                if (isMessageDeleted(position)) {
                     if (config.isDeletionAtBatchIndexLevelEnabled()) {
                         BitSetRecyclable bitSetRecyclable = 
batchDeletedIndexes.remove(position);
                         if (bitSetRecyclable != null) {
@@ -3517,8 +3513,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     @Override
     public void trimDeletedEntries(List<Entry> entries) {
         entries.removeIf(entry -> {
-            boolean isDeleted = 
markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0
-                    || individualDeletedMessages.contains(entry.getLedgerId(), 
entry.getEntryId());
+            boolean isDeleted = isMessageDeleted(entry.getPosition());
             if (isDeleted) {
                 entry.release();
             }

Reply via email to