This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ad7589f53b1b73c52f1a6b65c3f086b9b012d580
Author: Oneby Wang <[email protected]>
AuthorDate: Mon May 18 17:14:10 2026 +0800

    [fix][broker] Fix ManagedLedgerImpl.advanceCursorsIfNecessary() method may 
lose non-durable cursor properties in race condition (#25796)
    
    (cherry picked from commit cbe716121068357de367b20e9b6f5bf08e869cb4)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 62 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 95ba53312b4..f1eaa04d313 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3149,7 +3149,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     && 
highestPositionToDelete.compareTo(cursor.getManagedLedger()
                     .getLastConfirmedEntry()) <= 0 && !(!cursor.isDurable() && 
cursor instanceof NonDurableCursorImpl
                     && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
-                cursor.asyncMarkDelete(highestPositionToDelete, 
cursor.getProperties(), new MarkDeleteCallback() {
+                cursor.asyncMarkDelete(highestPositionToDelete, null, new 
MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
                     }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 3804ff7ade9..dff78eab2f3 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -5213,4 +5214,65 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         // Verify properties are preserved after cursor reset
         assertEquals(cursor.getProperties(), expectedProperties);
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() 
throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+        config.setRetentionTime(0, TimeUnit.SECONDS);
+        config.setRetentionSizeInMB(0);
+
+        @Cleanup
+        ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) 
factory.open("testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties", 
config);
+        @Cleanup
+        ManagedCursorImpl durableCursor = (ManagedCursorImpl) 
ledger.openCursor("durableCursor1");
+        @Cleanup
+        NonDurableCursorImpl realNonDurableCursor =
+                (NonDurableCursorImpl) 
ledger.newNonDurableCursor(PositionFactory.EARLIEST);
+        NonDurableCursorImpl nonDurableCursor = spy(realNonDurableCursor);
+
+        ledger.getCursors().removeCursor(realNonDurableCursor.getName());
+        ledger.getCursors().add(nonDurableCursor, null);
+
+        CountDownLatch advanceCursorsMarkDeleteEnteredLatch = new 
CountDownLatch(1);
+        CountDownLatch nonDurableCursorsMarkDeleteCompletedLatch = new 
CountDownLatch(1);
+        CountDownLatch advanceCursorsMarkDeleteCompletedLatch = new 
CountDownLatch(1);
+
+        doAnswer(invocation -> {
+            Map<String, Long> invocationProperties = invocation.getArgument(1);
+            // Pause the advanceCursorsIfNecessary mark-delete so the 
nonDurableCursor markDelete() can complete first.
+            if (invocationProperties == null || 
invocationProperties.isEmpty()) {
+                advanceCursorsMarkDeleteEnteredLatch.countDown();
+                assertTrue(nonDurableCursorsMarkDeleteCompletedLatch.await(5, 
TimeUnit.SECONDS));
+                try {
+                    return invocation.callRealMethod();
+                } finally {
+                    advanceCursorsMarkDeleteCompletedLatch.countDown();
+                }
+            }
+
+            return invocation.callRealMethod();
+        }).when(nonDurableCursor)
+                .internalAsyncMarkDelete(any(Position.class), 
nullable(Map.class), any(MarkDeleteCallback.class),
+                        nullable(Object.class), nullable(Runnable.class));
+
+        ledger.addEntry("entry-1".getBytes(Encoding));
+        Position pos2 = ledger.addEntry("entry-2".getBytes(Encoding));
+
+        // Mark-delete the durable cursor to trigger trimming, which advances 
non-durable cursors.
+        durableCursor.markDelete(pos2);
+        assertTrue(advanceCursorsMarkDeleteEnteredLatch.await(5, 
TimeUnit.SECONDS));
+
+        String propertyKey = "test-property";
+        Map<String, Long> properties = new HashMap<>();
+        properties.put(propertyKey, 1L);
+        nonDurableCursor.markDelete(pos2, properties);
+        nonDurableCursorsMarkDeleteCompletedLatch.countDown();
+
+        assertTrue(advanceCursorsMarkDeleteCompletedLatch.await(5, 
TimeUnit.SECONDS));
+        assertEquals(nonDurableCursor.getMarkDeletedPosition(), pos2);
+        assertEquals(nonDurableCursor.getProperties(), properties);
+    }
 }

Reply via email to