This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 448d5d095225b7bc63b76c0f2198d1631a9c59d9
Author: 道君- Tao Jiuming <[email protected]>
AuthorDate: Fri May 23 09:38:45 2025 +0800

    [fix][ml] Fix ManagedCursorImpl.individualDeletedMessages concurrent issue 
(#24338)
    
    (cherry picked from commit 376ae57e52a3d9557ff26f5c184e0bb79b22fb37)
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java  | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index aeed250be13..fe10ed6e2fe 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -639,6 +639,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
         } else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) {
             List<LongListMap> rangeList = 
positionInfo.getIndividualDeletedMessageRangesList();
+            lock.writeLock().lock();
             try {
                 Map<Long, long[]> rangeMap = 
rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
                         list -> list.getValuesList().stream().mapToLong(i -> 
i).toArray()));
@@ -661,6 +662,8 @@ public class ManagedCursorImpl implements ManagedCursor {
             } catch (Exception e) {
                 log.warn("[{}]-{} Failed to recover individualDeletedMessages 
from serialized data", ledger.getName(),
                         name, e);
+            } finally {
+                lock.writeLock().unlock();
             }
         }
     }
@@ -3208,10 +3211,13 @@ public class ManagedCursorImpl implements ManagedCursor 
{
          * and deserialization error.
          */
         if (getConfig().isUnackedRangesOpenCacheSetEnabled() && 
getConfig().isPersistIndividualAckAsLongArray()) {
+            lock.readLock().lock();
             try {
                 internalRanges = 
individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
             } catch (Exception e) {
                 log.warn("[{}]-{} Failed to serialize 
individualDeletedMessages", ledger.getName(), name, e);
+            } finally {
+                lock.readLock().unlock();
             }
         }
         if (internalRanges != null && !internalRanges.isEmpty()) {

Reply via email to