This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 5aebfedeca8 [fix][ml] PIP-430: Fix concurrency issue in 
MessageMetadata caching and improve caching (#24836)
5aebfedeca8 is described below

commit 5aebfedeca869c8407dd9706a6ec34b57372a294
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Oct 10 17:33:15 2025 +0300

    [fix][ml] PIP-430: Fix concurrency issue in MessageMetadata caching and 
improve caching (#24836)
    
    (cherry picked from commit 9737d038485e0c15f251dc334d6963fd0207953e)
---
 .../main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java   | 8 +++++---
 .../bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java     | 8 +++++---
 .../apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java | 3 ++-
 .../bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java     | 4 ++--
 4 files changed, 14 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index 070a0fc1bea..8a48dbb7ea1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -150,13 +150,15 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted
     }
 
     public static EntryImpl createWithRetainedDuplicate(Position position, 
ByteBuf data,
-                                                        EntryReadCountHandler 
entryReadCountHandler) {
+                                                        EntryReadCountHandler 
entryReadCountHandler,
+                                                        MessageMetadata 
messageMetadata) {
         EntryImpl entry = RECYCLER.get();
         entry.position = PositionFactory.create(position);
         entry.ledgerId = position.getLedgerId();
         entry.entryId = position.getEntryId();
         entry.data = data.retainedDuplicate();
         entry.readCountHandler = entryReadCountHandler;
+        entry.messageMetadata = messageMetadata;
         entry.setRefCnt(1);
         return entry;
     }
@@ -305,11 +307,11 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted
         decreaseReadCountOnRelease = enabled;
     }
 
-    public void initializeMessageMetadataIfNeeded(String managedLedgerName) {
+    public synchronized void initializeMessageMetadataIfNeeded(String 
managedLedgerName) {
         if (messageMetadata == null) {
             try {
                 MessageMetadata msgMetadata = new MessageMetadata();
-                Commands.peekMessageMetadata(data, msgMetadata);
+                Commands.parseMessageMetadata(data.duplicate(), msgMetadata);
                 this.messageMetadata = msgMetadata;
             } catch (Throwable t) {
                 log.warn("[{}] Failed to parse message metadata for entry 
{}:{}", managedLedgerName, ledgerId, entryId,
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
index 5c82207e1c7..d3dd9de038b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
@@ -48,6 +48,7 @@ class RangeCacheEntryWrapper {
     long size;
     long timestampNanos;
     int requeueCount;
+    boolean messageMetadataInitialized;
     volatile boolean accessed;
 
     private RangeCacheEntryWrapper(Recycler.Handle<RangeCacheEntryWrapper> 
recyclerHandle) {
@@ -114,12 +115,12 @@ class RangeCacheEntryWrapper {
         long stamp = lock.tryOptimisticRead();
         Position localKey = this.key;
         ReferenceCountedEntry localValue = this.value;
-        boolean messageMetadataInitialized = localValue != null && 
localValue.getMessageMetadata() != null;
+        boolean messageMetadataInitialized = this.messageMetadataInitialized;
         if (!lock.validate(stamp)) {
             stamp = lock.readLock();
             localKey = this.key;
             localValue = this.value;
-            messageMetadataInitialized = localValue != null && 
localValue.getMessageMetadata() != null;
+            messageMetadataInitialized = this.messageMetadataInitialized;
             lock.unlockRead(stamp);
         }
         // check that the given key matches the key associated with the value 
in the entry
@@ -136,8 +137,9 @@ class RangeCacheEntryWrapper {
                 if (wrapper.key != key && (requireSameKeyInstance || 
wrapper.key == null || !wrapper.key.equals(key))) {
                     return null;
                 }
-                if (wrapper.value instanceof EntryImpl entry) {
+                if (wrapper.value instanceof EntryImpl entry && 
!this.messageMetadataInitialized) {
                     entry.initializeMessageMetadataIfNeeded(managedLedgerName);
+                    this.messageMetadataInitialized = true;
                 }
                 return wrapper.value;
             });
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index fd391ba2bf6..f8ee2c431b1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -152,7 +152,8 @@ public class RangeEntryCacheImpl implements EntryCache {
 
         Position position = entry.getPosition();
         ReferenceCountedEntry cacheEntry =
-                EntryImpl.createWithRetainedDuplicate(position, cachedData, 
entry.getReadCountHandler());
+                EntryImpl.createWithRetainedDuplicate(position, cachedData, 
entry.getReadCountHandler(),
+                            entry.getMessageMetadata());
         cachedData.release();
         if (entries.put(position, cacheEntry, entryLength)) {
             totalAddedEntriesSize.add(entryLength);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
index f7b6e755bce..285cd251a9f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
@@ -309,10 +309,10 @@ public class ManagedCursorConcurrencyTest extends 
MockedBookKeeperTestCase {
         assertEquals(cursor.getMarkDeletedPosition(), 
addedEntries.get(addedEntries.size() - 1));
     }
 
-    @Test(timeOut = 30000)
+    @Test(timeOut = 30000, invocationCount = 10)
     public void testConcurrentReadOfSameEntry() throws Exception {
         ManagedLedger ledger = factory.open("testConcurrentReadOfSameEntry", 
new ManagedLedgerConfig());
-        final int numCursors = 5;
+        final int numCursors = 20;
         final List<ManagedCursor> cursors = new ArrayList();
         for (int i = 0; i < numCursors; i++) {
             final ManagedCursor cursor = ledger.openCursor("c" + i);

Reply via email to