This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 9e6f47ee73d [fix][broker] Fix cursor position persistence in ledger
trimming (#25087)
9e6f47ee73d is described below
commit 9e6f47ee73daa1c2f54483554ffa4a62fa7fac0c
Author: Penghui Li <[email protected]>
AuthorDate: Wed Dec 17 00:45:57 2025 -0800
[fix][broker] Fix cursor position persistence in ledger trimming (#25087)
Co-authored-by: Jiwe Guo <[email protected]>
(cherry picked from commit 26297ac77c05cf0a2b8b56112c140eecf2cb9c06)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 24 ++++++----
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 4 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 53 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 9 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0bd5659f45c..fe804179bb3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2613,14 +2613,22 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
- try {
- log.info("Reset cursor:{} to {} since ledger consumed
completely", cursor, lastAckedPosition);
- onCursorMarkDeletePositionUpdated((ManagedCursorImpl)
cursor, lastAckedPosition);
- } catch (Exception e) {
- log.warn("Failed to reset cursor: {} from {} to {}.
Trimming thread will retry next time.",
- cursor, cursor.getMarkDeletedPosition(),
lastAckedPosition);
- log.warn("Caused by", e);
- }
+ Position finalPosition = lastAckedPosition;
+ log.info("Reset cursor:{} to {} since ledger consumed
completely", cursor, lastAckedPosition);
+ cursor.asyncMarkDelete(lastAckedPosition,
cursor.getProperties(),
+ new MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ log.info("Successfully persisted cursor position
for cursor:{} to {}",
+ cursor, finalPosition);
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
+ log.warn("Failed to reset cursor: {} from {} to
{}. Trimming thread will retry next time.",
+ cursor, cursor.getMarkDeletedPosition(),
finalPosition, exception);
+ }
+ }, null);
}
}
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2bbe01f8ac5..711c70b5a17 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -1275,7 +1275,9 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
c2 = ledger.openCursor("c2");
assertEquals(c1.getMarkDeletedPosition(), p1);
- assertEquals(c2.getMarkDeletedPosition(), p2);
+ // move mark-delete-position from 3:5 to 6:-1 since all the entries
have been consumed
+ ManagedCursor finalC2 = c2;
+ Awaitility.await().untilAsserted(() ->
assertNotEquals(finalC2.getMarkDeletedPosition(), p2));
}
@Test(timeOut = 20000)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 1d557ef1f28..09cab9f1bdf 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -4966,4 +4966,57 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
// cleanup.
ml.delete();
}
+
+ @Test
+ public void testTrimmerRaceCondition() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(1);
+ config.setRetentionTime(0, TimeUnit.MILLISECONDS);
+ config.setRetentionSizeInMB(0);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testTrimmerRaceCondition", config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
+
+ // 1. Add Entry 1 (Ledger 1)
+ ledger.addEntry("entry-1".getBytes(Encoding));
+
+ // 2. Ack Entry 1. Verify Persistence with properties.
+ List<Entry> entries = cursor.readEntries(1);
+ assertEquals(entries.size(), 1);
+ Position lastPosition = entries.get(0).getPosition();
+ entries.forEach(Entry::release);
+
+ // Mark delete with properties
+ Map<String, Long> properties = new HashMap<>();
+ properties.put("test-property", 12345L);
+ CountDownLatch latch = new CountDownLatch(1);
+ cursor.asyncMarkDelete(lastPosition, properties, new
MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception,
Object ctx) {
+ fail("Mark delete should succeed");
+ }
+ }, null);
+
+ latch.await();
+ assertEquals(cursor.getPersistentMarkDeletedPosition(), lastPosition);
+ assertEquals(ledger.getCursors().getSlowestReaderPosition(),
lastPosition);
+ assertEquals(cursor.getProperties(), properties);
+
+ // 3. Add Entry 2. Triggers Rollover.
+ // This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger
due to rollover
+ Position p = ledger.addEntry("entry-2".getBytes(Encoding));
+
+ // Wait for background tasks (metadata callback) to complete.
+ // We expect at least 2 ledgers (Rollover happened).
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
ledger.getLedgersInfo().size() >= 2);
+ assertEquals(cursor.getPersistentMarkDeletedPosition(), new
ImmutablePositionImpl(p.getLedgerId(), -1));
+
+ // Verify properties are preserved after cursor reset
+ assertEquals(cursor.getProperties(), properties);
+ }
}