This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 448d5d095225b7bc63b76c0f2198d1631a9c59d9 Author: 道君- Tao Jiuming <[email protected]> AuthorDate: Fri May 23 09:38:45 2025 +0800 [fix][ml] Fix ManagedCursorImpl.individualDeletedMessages concurrent issue (#24338) (cherry picked from commit 376ae57e52a3d9557ff26f5c184e0bb79b22fb37) --- .../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index aeed250be13..fe10ed6e2fe 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -639,6 +639,7 @@ public class ManagedCursorImpl implements ManagedCursor { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) { List<LongListMap> rangeList = positionInfo.getIndividualDeletedMessageRangesList(); + lock.writeLock().lock(); try { Map<Long, long[]> rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey, list -> list.getValuesList().stream().mapToLong(i -> i).toArray())); @@ -661,6 +662,8 @@ public class ManagedCursorImpl implements ManagedCursor { } catch (Exception e) { log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(), name, e); + } finally { + lock.writeLock().unlock(); } } } @@ -3208,10 +3211,13 @@ public class ManagedCursorImpl implements ManagedCursor { * and deserialization error. */ if (getConfig().isUnackedRangesOpenCacheSetEnabled() && getConfig().isPersistIndividualAckAsLongArray()) { + lock.readLock().lock(); try { internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist()); } catch (Exception e) { log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e); + } finally { + lock.readLock().unlock(); } } if (internalRanges != null && !internalRanges.isEmpty()) {
