This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 9e6f47ee73d [fix][broker] Fix cursor position persistence in ledger 
trimming (#25087)
9e6f47ee73d is described below

commit 9e6f47ee73daa1c2f54483554ffa4a62fa7fac0c
Author: Penghui Li <[email protected]>
AuthorDate: Wed Dec 17 00:45:57 2025 -0800

    [fix][broker] Fix cursor position persistence in ledger trimming (#25087)
    
    Co-authored-by: Jiwe Guo <[email protected]>
    (cherry picked from commit 26297ac77c05cf0a2b8b56112c140eecf2cb9c06)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 24 ++++++----
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 53 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0bd5659f45c..fe804179bb3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2613,14 +2613,22 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
             }
 
             if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
-                try {
-                    log.info("Reset cursor:{} to {} since ledger consumed 
completely", cursor, lastAckedPosition);
-                    onCursorMarkDeletePositionUpdated((ManagedCursorImpl) 
cursor, lastAckedPosition);
-                } catch (Exception e) {
-                    log.warn("Failed to reset cursor: {} from {} to {}. 
Trimming thread will retry next time.",
-                            cursor, cursor.getMarkDeletedPosition(), 
lastAckedPosition);
-                    log.warn("Caused by", e);
-                }
+                Position finalPosition = lastAckedPosition;
+                log.info("Reset cursor:{} to {} since ledger consumed 
completely", cursor, lastAckedPosition);
+                cursor.asyncMarkDelete(lastAckedPosition, 
cursor.getProperties(),
+                    new MarkDeleteCallback() {
+                        @Override
+                        public void markDeleteComplete(Object ctx) {
+                            log.info("Successfully persisted cursor position 
for cursor:{} to {}",
+                                    cursor, finalPosition);
+                        }
+
+                        @Override
+                        public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                            log.warn("Failed to reset cursor: {} from {} to 
{}. Trimming thread will retry next time.",
+                                    cursor, cursor.getMarkDeletedPosition(), 
finalPosition, exception);
+                        }
+                    }, null);
             }
         }
     }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2bbe01f8ac5..711c70b5a17 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -1275,7 +1275,9 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         c2 = ledger.openCursor("c2");
 
         assertEquals(c1.getMarkDeletedPosition(), p1);
-        assertEquals(c2.getMarkDeletedPosition(), p2);
+        // move mark-delete-position from 3:5 to 6:-1 since all the entries 
have been consumed
+        ManagedCursor finalC2 = c2;
+        Awaitility.await().untilAsserted(() -> 
assertNotEquals(finalC2.getMarkDeletedPosition(), p2));
     }
 
     @Test(timeOut = 20000)
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 1d557ef1f28..09cab9f1bdf 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -4966,4 +4966,57 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         // cleanup.
         ml.delete();
     }
+
+    @Test
+    public void testTrimmerRaceCondition() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+        config.setRetentionTime(0, TimeUnit.MILLISECONDS);
+        config.setRetentionSizeInMB(0);
+
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testTrimmerRaceCondition", config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
+
+        // 1. Add Entry 1 (Ledger 1)
+        ledger.addEntry("entry-1".getBytes(Encoding));
+
+        // 2. Ack Entry 1. Verify Persistence with properties.
+        List<Entry> entries = cursor.readEntries(1);
+        assertEquals(entries.size(), 1);
+        Position lastPosition = entries.get(0).getPosition();
+        entries.forEach(Entry::release);
+
+        // Mark delete with properties
+        Map<String, Long> properties = new HashMap<>();
+        properties.put("test-property", 12345L);
+        CountDownLatch latch = new CountDownLatch(1);
+        cursor.asyncMarkDelete(lastPosition, properties, new 
MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                fail("Mark delete should succeed");
+            }
+        }, null);
+
+        latch.await();
+        assertEquals(cursor.getPersistentMarkDeletedPosition(), lastPosition);
+        assertEquals(ledger.getCursors().getSlowestReaderPosition(), 
lastPosition);
+        assertEquals(cursor.getProperties(), properties);
+
+        // 3. Add Entry 2. Triggers Rollover.
+        // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger 
due to rollover
+        Position p = ledger.addEntry("entry-2".getBytes(Encoding));
+
+        // Wait for background tasks (metadata callback) to complete.
+        // We expect at least 2 ledgers (Rollover happened).
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> 
ledger.getLedgersInfo().size() >= 2);
+        assertEquals(cursor.getPersistentMarkDeletedPosition(), new 
ImmutablePositionImpl(p.getLedgerId(), -1));
+
+        // Verify properties are preserved after cursor reset
+        assertEquals(cursor.getProperties(), properties);
+    }
 }

Reply via email to