This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9b15504a696 [fix][broker] Fix compaction cursor reset may lose
mark-delete properties (#25862)
9b15504a696 is described below
commit 9b15504a696e3ea21426b9e7b92a1dd613120da5
Author: Oneby Wang <[email protected]>
AuthorDate: Mon May 25 20:29:10 2026 +0800
[fix][broker] Fix compaction cursor reset may lose mark-delete properties
(#25862)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 84 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c42ccf69f86..67d392f78f5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1686,7 +1686,7 @@ public class ManagedCursorImpl implements ManagedCursor {
persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;
- internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ?
getProperties() : Collections.emptyMap(),
+ internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ?
null : Collections.emptyMap(),
new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a3eb4756335..d972b63a294 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -23,6 +23,7 @@ import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKK
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
@@ -6176,6 +6177,89 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCompactionCursorResetNeverLoseMarkDeleteProperties()
throws Exception {
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(
+ "testCompactionCursorResetNeverLoseMarkDeleteProperties",
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor("__compaction");
+ ManagedCursorImpl spyCursor = spy(cursor);
+ ledger.getCursors().removeCursor(cursor.getName());
+ ledger.getCursors().add(spyCursor, null);
+
+ ledger.addEntry("entry-1".getBytes(Encoding));
+ Position markDeletePosition =
ledger.addEntry("entry-2".getBytes(Encoding));
+
+ String compactedLedgerProperty = "CompactedTopicLedger";
+ Map<String, Long> properties = Map.of(compactedLedgerProperty,
123456L);
+
+ CountDownLatch markDeleteEntered = new CountDownLatch(1);
+ CountDownLatch resetEntered = new CountDownLatch(1);
+ CountDownLatch markDeleteReturned = new CountDownLatch(1);
+ CountDownLatch markDeleteCompleted = new CountDownLatch(1);
+ CountDownLatch resetCompleted = new CountDownLatch(1);
+
+ doAnswer(invocation -> {
+ Map<String, Long> invocationProperties = invocation.getArgument(1);
+ if (invocationProperties != null &&
invocationProperties.containsKey(compactedLedgerProperty)) {
+ // Hold the compaction mark-delete after it enters
internalAsyncMarkDelete, but before its
+ // properties can update lastMarkDeleteEntry.
+ markDeleteEntered.countDown();
+ assertTrue(resetEntered.await(5, TimeUnit.SECONDS));
+ try {
+ return invocation.callRealMethod();
+ } finally {
+ markDeleteReturned.countDown();
+ }
+ }
+
+ if (invocationProperties == null ||
invocationProperties.isEmpty()) {
+ // Let reset capture its properties argument first, then
persist it only after the compaction
+ // mark-delete has completed the real internalAsyncMarkDelete
call.
+ resetEntered.countDown();
+ assertTrue(markDeleteReturned.await(5, TimeUnit.SECONDS));
+ return invocation.callRealMethod();
+ }
+
+ return invocation.callRealMethod();
+ }).when(spyCursor).internalAsyncMarkDelete(any(Position.class),
nullable(Map.class),
+ any(MarkDeleteCallback.class), nullable(Object.class),
nullable(Runnable.class));
+
+ // Start compaction mark-delete from another thread because the spy
intentionally blocks it.
+ CompletableFuture.runAsync(() -> spyCursor.asyncMarkDelete(
+ markDeletePosition, properties, new MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ markDeleteCompleted.countDown();
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
+ }
+ }, null));
+
+ assertTrue(markDeleteEntered.await(5, TimeUnit.SECONDS));
+ // Reset the compaction cursor while the previous mark-delete with
properties is still in progress.
+ spyCursor.asyncResetCursor(markDeletePosition, false, new
AsyncCallbacks.ResetCursorCallback() {
+ @Override
+ public void resetComplete(Object ctx) {
+ resetCompleted.countDown();
+ }
+
+ @Override
+ public void resetFailed(ManagedLedgerException exception, Object
ctx) {
+ }
+ });
+
+ assertTrue(markDeleteCompleted.await(5, TimeUnit.SECONDS));
+ assertTrue(resetCompleted.await(5, TimeUnit.SECONDS));
+
+ assertEquals(spyCursor.getMarkDeletedPosition(), markDeletePosition);
+ assertEquals(spyCursor.getProperties(), properties);
+ }
+
@SuppressWarnings("try")
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
Map<Long, Integer> ledgerErrors = new HashMap<>();