This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 68d377937bc [improve][broker] Fix thread safety issue in 
ManagedCursorImpl.removeProperty  (#25104)
68d377937bc is described below

commit 68d377937bc4e3265e7858abab630ff39969f774
Author: Ruimin MA <[email protected]>
AuthorDate: Fri Jan 2 22:38:59 2026 +0800

    [improve][broker] Fix thread safety issue in 
ManagedCursorImpl.removeProperty  (#25104)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   9 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 114 +++++++++++++++++++++
 2 files changed, 122 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c0f29ea5c22..ab15a6d6a17 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -524,7 +524,14 @@ public class ManagedCursorImpl implements ManagedCursor {
             LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
                 Map<String, Long> properties = last.properties;
                 if (properties != null && properties.containsKey(key)) {
-                    properties.remove(key);
+                    Map<String, Long> newProperties = new 
HashMap<>(properties);
+                    newProperties.remove(key);
+
+                    MarkDeleteEntry newLastMarkDeleteEntry = new 
MarkDeleteEntry(last.newPosition, newProperties,
+                            last.callback, last.ctx);
+                    newLastMarkDeleteEntry.callbackGroup = last.callbackGroup;
+
+                    return newLastMarkDeleteEntry;
                 }
                 return last;
             });
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 123a5cf048d..c87477a95a0 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -50,6 +50,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.ConcurrentModificationException;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -57,6 +58,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -72,6 +74,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -208,6 +211,117 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(cursor.getMarkDeletedPosition(), 
ledger.getLastConfirmedEntry());
     }
 
+    @Test
+    public void testConcurrentPropertyOperationsThreadSafety() throws 
Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        ManagedLedger ledger = 
factory.open("testConcurrentPropertyOperationsThreadSafety", config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor("c_concurrent", null);
+
+        int threadCount = 100;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+        // Collect all Future objects to ensure all tasks complete before 
verification
+        List<Future<?>> allFutures = new ArrayList<>();
+
+        // Use fixed number of operations
+        int totalOperations = 10000;
+        AtomicLong completedOperations = new AtomicLong(0);
+        AtomicLong exceptionCount = new AtomicLong(0);
+        AtomicBoolean inconsistencyDetected = new AtomicBoolean(false);
+
+        // Submit fixed number of concurrent tasks
+        for (int i = 0; i < totalOperations; i++) {
+            Future<?> future = executor.submit(() -> {
+                try {
+                    Random random = new Random();
+                    int operationType = random.nextInt(3);
+                    String randomKey = "key" + random.nextInt(20);
+
+                    switch (operationType) {
+                        case 0: // Put operation
+                            Long randomValue = random.nextLong();
+                            cursor.putProperty(randomKey, randomValue);
+                            break;
+
+                        case 1: // Remove operation
+                            cursor.removeProperty(randomKey);
+                            break;
+
+                        case 2: // Read and verify operation
+                            Map<String, Long> properties = 
cursor.getProperties();
+                            // Verify no inconsistent state (key exists but 
value is null)
+                            if (properties.containsKey(randomKey) && 
properties.get(randomKey) == null) {
+                                inconsistencyDetected.set(true);
+                                fail("INCONSISTENT STATE DETECTED: Key '" + 
randomKey + "' exists but has null value");
+                            }
+                            break;
+                    }
+                    completedOperations.incrementAndGet();
+
+                } catch (ConcurrentModificationException cme) {
+                    // Record ConcurrentModificationException but don't fail 
immediately
+                    // We'll assert at the end that no exceptions occurred
+                    exceptionCount.incrementAndGet();
+                } catch (Exception e) {
+                    exceptionCount.incrementAndGet();
+                    fail("Unexpected exception: " + e.getMessage());
+                }
+            });
+
+            allFutures.add(future);
+        }
+        executor.shutdown();
+
+        // Wait for each task to complete with timeout
+        for (Future<?> future : allFutures) {
+            try {
+                future.get(30, TimeUnit.SECONDS);
+            } catch (TimeoutException e) {
+                fail("Task timed out after 30 seconds - possible deadlock or 
infinite loop");
+            } catch (ExecutionException e) {
+                fail("unexpected exception: " + e.getCause());
+            }
+        }
+
+        // Ensure executor is fully terminated
+        boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
+        assertTrue(terminated, "Executor should be fully terminated");
+
+        // Then: Verify test results
+        // 1. No ConcurrentModificationException should occur with fixed code
+        assertEquals(exceptionCount.get(), 0, "No exceptions should occur with 
thread-safe implementation");
+
+        // 2. No inconsistent states detected
+        assertFalse(inconsistencyDetected.get(), "No inconsistent states (key 
with null value) should be detected");
+
+        // 3: Final cursor state should be internally consistent
+        Map<String, Long> finalProperties = cursor.getProperties();
+        try {
+            for (Map.Entry<String, Long> entry : finalProperties.entrySet()) {
+                String key = entry.getKey();
+                Long value = entry.getValue();
+                // Verify key is not null
+                assertNotNull(key, "Final key should not be null");
+                // Verify value is not null
+                assertNotNull(value, "Final value should not be null for key: 
" + key);
+                // Verify key follows expected pattern
+                assertTrue(key.startsWith("key"),
+                        "Final key should start with 'key', got: " + key);
+
+                // Verify key format is valid (key followed by a number 0-19)
+                try {
+                    String numberPart = key.substring(3); // Remove "key" 
prefix
+                    int keyNumber = Integer.parseInt(numberPart);
+                    assertTrue(keyNumber >= 0 && keyNumber < 20,
+                            "Key number should be between 0 and 19, got: " + 
keyNumber);
+                } catch (NumberFormatException e) {
+                    fail("Invalid key format: " + key + ". Should be 'keyX' 
where X is a number");
+                }
+            }
+        } catch (Exception e) {
+            fail("HashMap corruption detected in final state: " + 
e.getMessage());
+        }
+    }
     private static void closeCursorLedger(ManagedCursorImpl managedCursor) {
         Awaitility.await().until(managedCursor::closeCursorLedger);
     }

Reply via email to