This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1617bb22173 [fix][broker] Fix markDeletedPosition race condition in 
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() method 
(#25110)
1617bb22173 is described below

commit 1617bb22173a117f24d47ac6f11cc2f7c68de635
Author: Oneby Wang <[email protected]>
AuthorDate: Mon Jan 12 20:48:33 2026 +0800

    [fix][broker] Fix markDeletedPosition race condition in 
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() method 
(#25110)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  58 ++++---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  17 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  64 ++++---
 .../mledger/impl/NonDurableCursorImpl.java         |   8 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  23 ++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 189 ++++++++++++++++++++-
 6 files changed, 287 insertions(+), 72 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b3cb8dc4596..c120b9fa719 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1250,14 +1250,17 @@ public class ManagedCursorImpl implements ManagedCursor 
{
 
     @Override
     public long getNumberOfEntries() {
-        if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) {
+        Position readPos = readPosition;
+        Position lastPosition = ledger.getLastPosition();
+        Position nextPosition = lastPosition.getNext();
+        if (readPos.compareTo(nextPosition) > 0) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Read position {} is ahead of last 
position {}. There are no entries to read",
-                        ledger.getName(), name, readPosition, 
ledger.getLastPosition());
+                        ledger.getName(), name, readPos, lastPosition);
             }
             return 0;
         } else {
-            return getNumberOfEntries(Range.closedOpen(readPosition, 
ledger.getLastPosition().getNext()));
+            return getNumberOfEntries(Range.closedOpen(readPos, nextPosition));
         }
     }
 
@@ -2255,13 +2258,15 @@ public class ManagedCursorImpl implements ManagedCursor 
{
         }
 
         Position newPosition = ackBatchPosition(position);
-        if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
+        Position markDeletePos = markDeletePosition;
+        Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
+        if (lastConfirmedEntry.compareTo(newPosition) < 0) {
             boolean shouldCursorMoveForward = false;
             try {
-                long ledgerEntries = 
ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
-                Long nextValidLedger = 
ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
+                long ledgerEntries = 
ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
+                Long nextValidLedger = 
ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId());
                 shouldCursorMoveForward = nextValidLedger != null
-                        && (markDeletePosition.getEntryId() + 1 >= 
ledgerEntries)
+                        && (markDeletePos.getEntryId() + 1 >= ledgerEntries)
                         && (newPosition.getLedgerId() == nextValidLedger);
             } catch (Exception e) {
                 log.warn("Failed to get ledger entries while setting 
mark-delete-position", e);
@@ -2269,11 +2274,11 @@ public class ManagedCursorImpl implements ManagedCursor 
{
 
             if (shouldCursorMoveForward) {
                 log.info("[{}] move mark-delete-position from {} to {} since 
all the entries have been consumed",
-                        ledger.getName(), markDeletePosition, newPosition);
+                        ledger.getName(), markDeletePos, newPosition);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Failed mark delete due to invalid 
markDelete {} is ahead of last-confirmed-entry {}"
-                             + " for cursor [{}]", ledger.getName(), position, 
ledger.getLastConfirmedEntry(), name);
+                             + " for cursor [{}]", ledger.getName(), position, 
lastConfirmedEntry, name);
                 }
                 callback.markDeleteFailed(new ManagedLedgerException("Invalid 
mark deleted position"), ctx);
                 return;
@@ -2329,11 +2334,15 @@ public class ManagedCursorImpl implements ManagedCursor 
{
             final MarkDeleteCallback callback, final Object ctx, Runnable 
alignAcknowledgeStatusAfterPersisted) {
         ledger.mbean.addMarkDeleteOp();
 
-        MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, 
callback, ctx,
-                alignAcknowledgeStatusAfterPersisted);
-
         // We cannot write to the ledger during the switch, need to wait until 
the new metadata ledger is available
         synchronized (pendingMarkDeleteOps) {
+            // use given properties or when missing, use the properties from 
the previous field value
+            MarkDeleteEntry last = pendingMarkDeleteOps.peekLast();
+            Map<String, Long> propertiesToUse =
+                    properties != null ? properties : (last != null ? 
last.properties : getProperties());
+            MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, 
propertiesToUse, callback, ctx,
+                    alignAcknowledgeStatusAfterPersisted);
+
             // The state might have changed while we were waiting on the queue 
mutex
             switch (state) {
             case Closed:
@@ -2701,17 +2710,20 @@ public class ManagedCursorImpl implements ManagedCursor 
{
     // update lastMarkDeleteEntry field if newPosition is later than the 
current lastMarkDeleteEntry.newPosition
     private void updateLastMarkDeleteEntryToLatest(final Position newPosition,
                                                    final Map<String, Long> 
properties) {
-        LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
-            if (last != null && last.newPosition.compareTo(newPosition) > 0) {
-                // keep current value, don't update
-                return last;
-            } else {
-                // use given properties or when missing, use the properties 
from the previous field value
-                Map<String, Long> propertiesToUse =
-                        properties != null ? properties : (last != null ? 
last.properties : Collections.emptyMap());
-                return new MarkDeleteEntry(newPosition, propertiesToUse, null, 
null);
-            }
-        });
+        synchronized (pendingMarkDeleteOps) {
+            // use given properties or when missing, use the properties from 
the previous field value
+            MarkDeleteEntry lastPending = pendingMarkDeleteOps.peekLast();
+            Map<String, Long> propertiesToUse =
+                    properties != null ? properties : (lastPending != null ? 
lastPending.properties : getProperties());
+            LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
+                if (last != null && last.newPosition.compareTo(newPosition) > 
0) {
+                    // keep current value, don't update
+                    return last;
+                } else {
+                    return new MarkDeleteEntry(newPosition, propertiesToUse, 
null, null);
+                }
+            });
+        }
     }
 
     /**
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index a452c6682a5..97333fbb1e3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -481,14 +481,15 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                             public void initializeComplete() {
                                 log.info("[{}] Successfully initialize managed 
ledger", name);
                                 pendingInitializeLedgers.remove(name, 
pendingLedger);
-                                future.complete(newledger);
-
-                                // May need to update the cursor position
-                                
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
-                                // May need to trigger offloading
-                                if (config.isTriggerOffloadOnTopicLoad()) {
-                                    
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
-                                }
+                                // May need to update the cursor position and 
wait them finished
+                                
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) 
-> {
+                                    // ignore ex since it is handled in 
maybeUpdateCursorBeforeTrimmingConsumedLedger
+                                    future.complete(newledger);
+                                    // May need to trigger offloading
+                                    if (config.isTriggerOffloadOnTopicLoad()) {
+                                        
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+                                    }
+                                });
                             }
 
                             @Override
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4b278cf6664..bb682cdb293 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1761,11 +1761,10 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                         updateLedgersIdsComplete(originalCurrentLedger);
                         
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
                                 - lastLedgerCreationInitiationTimestamp, 
TimeUnit.MILLISECONDS);
+                        // May need to update the cursor position
+                        maybeUpdateCursorBeforeTrimmingConsumedLedger();
                     }
                     metadataMutex.unlock();
-
-                    // May need to update the cursor position
-                    maybeUpdateCursorBeforeTrimmingConsumedLedger();
                 }
 
                 @Override
@@ -2709,18 +2708,23 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         this.waitingEntryCallBacks.add(cb);
     }
 
-    public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
+    public CompletableFuture<Void> 
maybeUpdateCursorBeforeTrimmingConsumedLedger() {
+        List<CompletableFuture<Void>> cursorMarkDeleteFutures = new 
ArrayList<>();
         for (ManagedCursor cursor : cursors) {
-            Position lastAckedPosition = 
cursor.getPersistentMarkDeletedPosition() != null
-                    ? cursor.getPersistentMarkDeletedPosition() : 
cursor.getMarkDeletedPosition();
-            LedgerInfo currPointedLedger = 
ledgers.get(lastAckedPosition.getLedgerId());
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            cursorMarkDeleteFutures.add(future);
+
+            // Snapshot positions into a local variables to avoid race 
condition.
+            Position markDeletedPosition = cursor.getMarkDeletedPosition();
+            Position lastAckedPosition = markDeletedPosition;
+            LedgerInfo curPointedLedger   = 
ledgers.get(lastAckedPosition.getLedgerId());
             LedgerInfo nextPointedLedger = 
Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
                     .map(Map.Entry::getValue).orElse(null);
 
-            if (currPointedLedger != null) {
+            if (curPointedLedger != null) {
                 if (nextPointedLedger != null) {
                     if (lastAckedPosition.getEntryId() != -1
-                            && lastAckedPosition.getEntryId() + 1 >= 
currPointedLedger.getEntries()) {
+                            && lastAckedPosition.getEntryId() + 1 >= 
curPointedLedger.getEntries()) {
                         lastAckedPosition = 
PositionFactory.create(nextPointedLedger.getLedgerId(), -1);
                     }
                 } else {
@@ -2730,25 +2734,37 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                 log.warn("Cursor: {} does not exist in the managed-ledger.", 
cursor);
             }
 
-            if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
+            int compareResult = 
lastAckedPosition.compareTo(markDeletedPosition);
+            if (compareResult > 0) {
                 Position finalPosition = lastAckedPosition;
-                log.info("Reset cursor:{} to {} since ledger consumed 
completely", cursor, lastAckedPosition);
-                cursor.asyncMarkDelete(lastAckedPosition, 
cursor.getProperties(),
-                    new MarkDeleteCallback() {
-                        @Override
-                        public void markDeleteComplete(Object ctx) {
-                            log.info("Successfully persisted cursor position 
for cursor:{} to {}",
-                                    cursor, finalPosition);
-                        }
+                log.info("Mark deleting cursor:{} from {} to {} since ledger 
consumed completely.", cursor,
+                        markDeletedPosition, lastAckedPosition);
+                cursor.asyncMarkDelete(lastAckedPosition, null, new 
MarkDeleteCallback() {
+                    @Override
+                    public void markDeleteComplete(Object ctx) {
+                        log.info("Successfully persisted cursor position for 
cursor:{} to {}", cursor, finalPosition);
+                        future.complete(null);
+                    }
 
-                        @Override
-                        public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
-                            log.warn("Failed to reset cursor: {} from {} to 
{}. Trimming thread will retry next time.",
-                                    cursor, cursor.getMarkDeletedPosition(), 
finalPosition, exception);
-                        }
-                    }, null);
+                    @Override
+                    public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                        log.warn("Failed to mark delete: {} from {} to {}. ", 
cursor, cursor.getMarkDeletedPosition(),
+                                finalPosition, exception);
+                        future.completeExceptionally(exception);
+                    }
+                }, null);
+            } else if (compareResult == 0) {
+                log.debug("No need to reset cursor: {}, last acked position 
equals to current mark-delete position {}.",
+                        cursor, markDeletedPosition);
+                future.complete(null);
+            } else {
+                // Should not happen
+                log.warn("Ledger rollover tries to mark delete an already 
mark-deleted position. Current mark-delete:"
+                        + " {} -- attempted position: {}", 
markDeletedPosition, lastAckedPosition);
+                future.complete(null);
             }
         }
+        return FutureUtil.waitForAll(cursorMarkDeleteFutures);
     }
 
     private void trimConsumedLedgersInBackground() {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 91b46b3660c..129163eb317 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -103,11 +103,13 @@ public class NonDurableCursorImpl extends 
ManagedCursorImpl {
     protected void internalAsyncMarkDelete(final Position newPosition, 
Map<String, Long> properties,
             final MarkDeleteCallback callback, final Object ctx, Runnable 
alignAcknowledgeStatusAfterPersisted) {
         // Bypass persistence of mark-delete position and individually deleted 
messages info
-
-        MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, 
callback, ctx,
-                alignAcknowledgeStatusAfterPersisted);
+        MarkDeleteEntry mdEntry;
         lock.writeLock().lock();
         try {
+            // use given properties or when missing, use the properties from 
the previous field value
+            Map<String, Long> propertiesToUse = properties != null ? 
properties : getProperties();
+            mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, 
callback, ctx,
+                    alignAcknowledgeStatusAfterPersisted);
             lastMarkDeleteEntry = mdEntry;
             mdEntry.alignAcknowledgeStatus();
         } finally {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index c87477a95a0..bfb9b6ecca1 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -407,7 +407,8 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ml.close();
         ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
         ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) 
ml.openCursor(cursorName);
-        assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), 
lastEntry);
+        
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
+        
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThan(lastEntry);
 
         // cleanup.
         ml.delete();
@@ -498,12 +499,18 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertTrue(slowestReadPosition.getLedgerId() >= 
lastEntry.getLedgerId());
         assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
         assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
+        
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
+        
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
 
         // Verify the mark delete position can be recovered properly.
         ml.close();
         ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
         ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) 
ml.openCursor(cursorName);
-        assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), 
lastEntry);
+        
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
+        // If previous ledger is trimmed, Cursor: 
ManagedCursorImpl{ledger=ml_test, name=c1, ackPos=12:0, readPos=15:0}
+        // does not exist in the managed-ledger. Recovered cursor's position 
will not be moved forward.
+        // TODO should be handled in ledger trim process.
+        
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
 
         // cleanup.
         ml.delete();
@@ -4441,7 +4448,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
                     ManagedLedger ledger2 = 
factory2.open("testFlushCursorAfterInactivity", config);
                     ManagedCursor c2 = ledger2.openCursor("c");
 
-                    assertEquals(c2.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+                    
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size()
 - 1));
                 });
     }
 
@@ -4500,7 +4507,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
                     ManagedLedger ledger2 = 
factory2.open("testFlushCursorAfterIndDelInactivity", config);
                     ManagedCursor c2 = ledger2.openCursor("c");
 
-                    assertEquals(c2.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+                    
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size()
 - 1));
                 });
     }
 
@@ -4552,7 +4559,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
                     ManagedLedger ledger2 = 
factory2.open("testFlushCursorAfterInactivity", config);
                     ManagedCursor c2 = ledger2.openCursor("c");
 
-                    assertEquals(c2.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+                    
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size()
 - 1));
                 });
     }
 
@@ -4815,7 +4822,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testLazyCursorLedgerCreation() throws Exception {
+    public void testEagerCursorLedgerCreation() throws Exception {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
                 .open("testLazyCursorLedgerCreation", managedLedgerConfig);
@@ -4840,8 +4847,8 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ledger = (ManagedLedgerImpl) factory
                 .open("testLazyCursorLedgerCreation", managedLedgerConfig);
         ManagedCursorImpl cursor1 = (ManagedCursorImpl) 
ledger.openCursor("test");
-        assertEquals(cursor1.getState(), "NoLedger");
-        assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition);
+        assertEquals(cursor1.getState(), "Open");
+        
assertThat(cursor1.getMarkDeletedPosition()).isGreaterThan(finalLastPosition);
 
         // Verify the recovered cursor can work with new mark delete.
         lastPosition = null;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 1cdeb415b7c..12bbd66670b 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyMap;
@@ -56,6 +57,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -69,6 +71,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
@@ -3784,6 +3787,61 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         });
     }
 
+    @Test(timeOut = 20000)
+    public void 
testNeverThrowExceptionInMaybeUpdateCursorBeforeTrimmingConsumedLedger()
+            throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        initManagedLedgerConfig(config);
+        config.setMaxEntriesPerLedger(1);
+        int entryNum = 100;
+
+        ManagedLedgerImpl realManagedLedger =
+                (ManagedLedgerImpl) 
factory.open("maybeUpdateCursorBeforeTrimmingConsumed_ledger", config);
+        ManagedLedgerImpl managedLedger = spy(realManagedLedger);
+        ManagedCursor cursor = managedLedger.openCursor("c1");
+
+        Deque<CompletableFuture<Void>> futures = new ConcurrentLinkedDeque<>();
+        doAnswer(invocation -> {
+            CompletableFuture<Void> result = (CompletableFuture<Void>) 
invocation.callRealMethod();
+            futures.offer(result);
+            return result;
+        }).when(managedLedger).maybeUpdateCursorBeforeTrimmingConsumedLedger();
+
+        final CountDownLatch latch = new CountDownLatch(entryNum);
+        // Two asyncMarkDelete operations running concurrently:
+        //   1. ledger rollover triggered 
maybeUpdateCursorBeforeTrimmingConsumedLedger.
+        //   2. user triggered asyncMarkDelete.
+        for (int i = 0; i < entryNum; i++) {
+            managedLedger.asyncAddEntry("entry".getBytes(Encoding), new 
AddEntryCallback() {
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                }
+
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
+                        @Override
+                        public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                            fail("Should never fail",  exception);
+                        }
+
+                        @Override
+                        public void markDeleteComplete(Object ctx) {
+                            latch.countDown();
+                        }
+                    }, null);
+
+                }
+            }, null);
+        }
+
+        latch.await();
+        assertEquals(cursor.getNumberOfEntries(), 0);
+
+        // Will not throw exception
+        FutureUtil.waitForAll(futures).get();
+    }
+
     @Test(timeOut = 20000)
     public void testAsyncTruncateLedgerRetention() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig();
@@ -5109,13 +5167,13 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testTrimmerRaceCondition() throws Exception {
+    public void testTrimmerRaceConditionInDurableCursor() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(1);
         config.setRetentionTime(0, TimeUnit.MILLISECONDS);
         config.setRetentionSizeInMB(0);
 
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testTrimmerRaceCondition", config);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testTrimmerRaceConditionInDurableCursor", config);
         ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
 
         // 1. Add Entry 1 (Ledger 1)
@@ -5144,20 +5202,139 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         }, null);
 
         latch.await();
-        assertEquals(cursor.getPersistentMarkDeletedPosition(), lastPosition);
-        assertEquals(ledger.getCursors().getSlowestCursorPosition(), 
lastPosition);
+        
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
+        
assertThat(ledger.getCursors().getSlowestCursorPosition()).isGreaterThanOrEqualTo(lastPosition);
         assertEquals(cursor.getProperties(), properties);
 
-        // 3. Add Entry 2. Triggers Rollover.
+        // 3. Add Entry 2. Triggers second rollover process.
         // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger 
due to rollover
         Position p = ledger.addEntry("entry-2".getBytes(Encoding));
 
         // Wait for background tasks (metadata callback) to complete.
         // We expect at least 2 ledgers (Rollover happened).
         Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> 
ledger.getLedgersInfo().size() >= 2);
-        assertEquals(cursor.getPersistentMarkDeletedPosition(), new 
ImmutablePositionImpl(p.getLedgerId(), -1));
+        // First ledger is all consumed and trimmed, left current ledger and 
next empty ledger.
+        assertEquals(cursor.getPersistentMarkDeletedPosition(), 
PositionFactory.create(p.getLedgerId(), -1));
 
         // Verify properties are preserved after cursor reset
         assertEquals(cursor.getProperties(), properties);
     }
+
+    @Test
+    public void testTrimmerRaceConditionInNonDurableCursor() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+        config.setRetentionTime(0, TimeUnit.MILLISECONDS);
+        config.setRetentionSizeInMB(0);
+
+        ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) 
factory.open("testTrimmerRaceConditionInNonDurableCursor", config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.newNonDurableCursor(PositionFactory.EARLIEST);
+
+        // 1. Add Entry 1 (Ledger 1)
+        ledger.addEntry("entry-1".getBytes(Encoding));
+
+        // 2. Ack Entry 1. Verify Persistence with properties.
+        List<Entry> entries = cursor.readEntries(1);
+        assertEquals(entries.size(), 1);
+        Position lastPosition = entries.get(0).getPosition();
+        entries.forEach(Entry::release);
+
+        // Mark delete with properties
+        Map<String, Long> properties = new HashMap<>();
+        properties.put("test-property", 12345L);
+        CountDownLatch latch = new CountDownLatch(1);
+        cursor.asyncMarkDelete(lastPosition, properties, new 
MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                fail("Mark delete should succeed");
+            }
+        }, null);
+
+        latch.await();
+        
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
+        
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
+        assertEquals(cursor.getProperties(), properties);
+
+        // 3. Add Entry 2. Triggers second rollover process.
+        // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger 
due to rollover
+        Position p = ledger.addEntry("entry-2".getBytes(Encoding));
+
+        // Wait for background tasks (metadata callback and trim) to complete.
+        // We expect only one ledger (Rollover and trim happened).
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> 
ledger.getLedgersInfo().size() == 1);
+        // All ledgers are trimmed, left one empty ledger, trim process moves 
markDeletedPosition to p.getLedgerId():0
+        assertEquals(cursor.getMarkDeletedPosition(), 
PositionFactory.create(p.getLedgerId(), 0));
+        assertEquals(cursor.getPersistentMarkDeletedPosition(), 
PositionFactory.create(p.getLedgerId(), 0));
+
+        // Verify properties are preserved after cursor reset
+        assertEquals(cursor.getProperties(), properties);
+    }
+
+    @Test
+    public void 
testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor() throws 
Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        int maxEntriesPerLedger = 1;
+        config.setMaxEntriesPerLedger(maxEntriesPerLedger);
+        config.setThrottleMarkDelete(1);
+        config.setRetentionTime(0, TimeUnit.MILLISECONDS);
+        config.setRetentionSizeInMB(0);
+
+        ManagedLedgerImpl ledger =
+                (ManagedLedgerImpl) 
factory.open("testTrimmerRaceConditionWithThrottleMarkDeleteInDurableCursor",
+                        config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<Position> lastPosition = new AtomicReference<>();
+        ledger.asyncAddEntry("entry-1".getBytes(Encoding), new 
AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                lastPosition.set(position);
+                // Mark delete with properties
+                Map<String, Long> properties = new HashMap<>();
+                properties.put("test-property", 12345L);
+                cursor.asyncMarkDelete(position, properties, new 
MarkDeleteCallback() {
+                    @Override
+                    public void markDeleteComplete(Object ctx) {
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                        fail("Mark delete should succeed");
+                    }
+                }, null);
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                fail("Add entry should succeed");
+            }
+        }, null);
+
+        latch.await();
+
+        Map<String, Long> expectedProperties = new HashMap<>();
+        expectedProperties.put("test-property", 12345L);
+        
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get());
+        assertEquals(cursor.getProperties(), expectedProperties);
+
+        // 3. Add Entry 2. Triggers second rollover process.
+        // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger 
due to rollover
+        Position p2 = ledger.addEntry(("entry-2").getBytes(Encoding));
+
+        // Wait for background tasks (metadata callback) to complete.
+        // We expect at least 2 ledgers (Rollover happened).
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> 
ledger.getLedgersInfo().size() >= 2);
+        assertEquals(cursor.getMarkDeletedPosition(), 
PositionFactory.create(p2.getLedgerId(), -1));
+
+        // Verify properties are preserved after cursor reset
+        assertEquals(cursor.getProperties(), expectedProperties);
+    }
 }

Reply via email to