This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b15504a696 [fix][broker] Fix compaction cursor reset may lose 
mark-delete properties (#25862)
9b15504a696 is described below

commit 9b15504a696e3ea21426b9e7b92a1dd613120da5
Author: Oneby Wang <[email protected]>
AuthorDate: Mon May 25 20:29:10 2026 +0800

    [fix][broker] Fix compaction cursor reset may lose mark-delete properties 
(#25862)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 84 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c42ccf69f86..67d392f78f5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1686,7 +1686,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         persistentMarkDeletePosition = null;
         inProgressMarkDeletePersistPosition = null;
-        internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? 
getProperties() : Collections.emptyMap(),
+        internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? 
null : Collections.emptyMap(),
                 new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a3eb4756335..d972b63a294 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKK
 import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
@@ -6176,6 +6177,89 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         }
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testCompactionCursorResetNeverLoseMarkDeleteProperties() 
throws Exception {
+        @Cleanup
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(
+                "testCompactionCursorResetNeverLoseMarkDeleteProperties",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor("__compaction");
+        ManagedCursorImpl spyCursor = spy(cursor);
+        ledger.getCursors().removeCursor(cursor.getName());
+        ledger.getCursors().add(spyCursor, null);
+
+        ledger.addEntry("entry-1".getBytes(Encoding));
+        Position markDeletePosition = 
ledger.addEntry("entry-2".getBytes(Encoding));
+
+        String compactedLedgerProperty = "CompactedTopicLedger";
+        Map<String, Long> properties = Map.of(compactedLedgerProperty, 
123456L);
+
+        CountDownLatch markDeleteEntered = new CountDownLatch(1);
+        CountDownLatch resetEntered = new CountDownLatch(1);
+        CountDownLatch markDeleteReturned = new CountDownLatch(1);
+        CountDownLatch markDeleteCompleted = new CountDownLatch(1);
+        CountDownLatch resetCompleted = new CountDownLatch(1);
+
+        doAnswer(invocation -> {
+            Map<String, Long> invocationProperties = invocation.getArgument(1);
+            if (invocationProperties != null && 
invocationProperties.containsKey(compactedLedgerProperty)) {
+                // Hold the compaction mark-delete after it enters 
internalAsyncMarkDelete, but before its
+                // properties can update lastMarkDeleteEntry.
+                markDeleteEntered.countDown();
+                assertTrue(resetEntered.await(5, TimeUnit.SECONDS));
+                try {
+                    return invocation.callRealMethod();
+                } finally {
+                    markDeleteReturned.countDown();
+                }
+            }
+
+            if (invocationProperties == null || 
invocationProperties.isEmpty()) {
+                // Let reset capture its properties argument first, then 
persist it only after the compaction
+                // mark-delete has completed the real internalAsyncMarkDelete 
call.
+                resetEntered.countDown();
+                assertTrue(markDeleteReturned.await(5, TimeUnit.SECONDS));
+                return invocation.callRealMethod();
+            }
+
+            return invocation.callRealMethod();
+        }).when(spyCursor).internalAsyncMarkDelete(any(Position.class), 
nullable(Map.class),
+                any(MarkDeleteCallback.class), nullable(Object.class), 
nullable(Runnable.class));
+
+        // Start compaction mark-delete from another thread because the spy 
intentionally blocks it.
+        CompletableFuture.runAsync(() -> spyCursor.asyncMarkDelete(
+                markDeletePosition, properties, new MarkDeleteCallback() {
+                    @Override
+                    public void markDeleteComplete(Object ctx) {
+                        markDeleteCompleted.countDown();
+                    }
+
+                    @Override
+                    public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                    }
+                }, null));
+
+        assertTrue(markDeleteEntered.await(5, TimeUnit.SECONDS));
+        // Reset the compaction cursor while the previous mark-delete with 
properties is still in progress.
+        spyCursor.asyncResetCursor(markDeletePosition, false, new 
AsyncCallbacks.ResetCursorCallback() {
+            @Override
+            public void resetComplete(Object ctx) {
+                resetCompleted.countDown();
+            }
+
+            @Override
+            public void resetFailed(ManagedLedgerException exception, Object 
ctx) {
+            }
+        });
+
+        assertTrue(markDeleteCompleted.await(5, TimeUnit.SECONDS));
+        assertTrue(resetCompleted.await(5, TimeUnit.SECONDS));
+
+        assertEquals(spyCursor.getMarkDeletedPosition(), markDeletePosition);
+        assertEquals(spyCursor.getProperties(), properties);
+    }
+
     @SuppressWarnings("try")
     class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
         Map<Long, Integer> ledgerErrors = new HashMap<>();

Reply via email to