This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a120f310c2 [fix][broker] First entry will be skipped if opening
NonDurableCursor while trimmed ledger is adding first entry. (#24738)
7a120f310c2 is described below
commit 7a120f310c286eed78701e10c7e6985bc3b6d001
Author: zhouyifan279 <[email protected]>
AuthorDate: Fri Sep 19 02:16:51 2025 +0800
[fix][broker] First entry will be skipped if opening NonDurableCursor while
trimmed ledger is adding first entry. (#24738)
Co-authored-by: Yunze Xu <[email protected]>
Co-authored-by: Lari Hotari <[email protected]>
---
.../mledger/impl/NonDurableCursorImpl.java | 5 +-
.../mledger/impl/NonDurableCursorTest.java | 58 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index d1fc54a9d6d..69c30fdaca9 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -45,11 +45,12 @@ public class NonDurableCursorImpl extends ManagedCursorImpl
{
// Compare with "latest" position marker by using only the ledger id.
Since the C++ client is using 48bits to
// store the entryId, it's not able to pass a Long.max() as entryId.
In this case there's no point to require
// both ledgerId and entryId to be Long.max()
- if (startCursorPosition == null ||
startCursorPosition.compareTo(ledger.lastConfirmedEntry) > 0) {
+ Pair<Position, Long> lastPositionCounter =
ledger.getLastPositionAndCounter();
+ if (startCursorPosition == null ||
startCursorPosition.compareTo(lastPositionCounter.getLeft()) > 0) {
// Start from last entry
switch (initialPosition) {
case Latest:
-
initializeCursorPosition(ledger.getLastPositionAndCounter());
+ initializeCursorPosition(lastPositionCounter);
break;
case Earliest:
initializeCursorPosition(ledger.getFirstPositionAndCounter());
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index a7fe289f56c..aced11d3b3d 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
@@ -50,6 +51,11 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -103,6 +109,58 @@ public class NonDurableCursorTest extends
MockedBookKeeperTestCase {
ledger.close();
}
+ @Test(timeOut = 20000)
+ void testOpenNonDurableCursorWhileLedgerIsAddingFirstEntryAfterTrimmed()
throws Exception {
+ ManagedLedgerConfig config = new
ManagedLedgerConfig().setMaxEntriesPerLedger(1)
+ .setRetentionTime(0, TimeUnit.MILLISECONDS);
+ config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ @Cleanup
+ ManagedLedgerImpl ledgerSpy =
+ Mockito.spy((ManagedLedgerImpl)
factory.open("non_durable_cursor_while_ledger_trimmed", config));
+
+ ledgerSpy.addEntry("message1".getBytes());
+
+ ledgerSpy.rollCurrentLedgerIfFull();
+ Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() ->
+ ledgerSpy.getLedgersInfoAsList().size() > 1
+ );
+ CompletableFuture<Void> trimFuture = new CompletableFuture<>();
+ ledgerSpy.trimConsumedLedgersInBackground(trimFuture);
+ trimFuture.join();
+
+ // Use (currentLedgerId, -1) as startCursorPosition after ledger was
trimmed
+ Position startCursorPosition =
PositionFactory.create(ledgerSpy.getCurrentLedger().getId(), -1);
+ assertTrue(startCursorPosition.compareTo(ledgerSpy.lastConfirmedEntry)
> 0);
+
+ CountDownLatch getLastPositionLatch = new CountDownLatch(1);
+ CountDownLatch newNonDurableCursorLatch = new CountDownLatch(1);
+
Mockito.when(ledgerSpy.getLastPositionAndCounter()).then((Answer<Pair<Position,
Long>>) invocation -> {
+ newNonDurableCursorLatch.countDown();
+ getLastPositionLatch.await();
+ return Pair.of(ledgerSpy.lastConfirmedEntry,
ENTRIES_ADDED_COUNTER_UPDATER.get(ledgerSpy));
+ });
+
+ CompletableFuture<ManagedCursor> cursorFuture = new
CompletableFuture<ManagedCursor>()
+ .completeAsync(() ->
+ new NonDurableCursorImpl(bkc, ledgerSpy,
"my_test_cursor",
+ startCursorPosition,
CommandSubscribe.InitialPosition.Latest, false)
+ );
+ Position oldLastConfirmedEntry = ledgerSpy.lastConfirmedEntry;
+
+ // Wait until NonDurableCursorImpl constructor invokes
ManagedLedgerImpl.getLastPositionAndCounter
+ newNonDurableCursorLatch.await();
+ // Add first entry after ledger was trimmed
+ ledgerSpy.addEntry("message2".getBytes());
+
assertTrue(oldLastConfirmedEntry.compareTo(ledgerSpy.lastConfirmedEntry) < 0);
+
+ // Unblock NonDurableCursorImpl constructor
+ getLastPositionLatch.countDown();
+
+ // cursor should read from lastConfirmedEntry
+ ManagedCursor cursor = cursorFuture.join();
+ assertEquals(cursor.getReadPosition(), ledgerSpy.lastConfirmedEntry);
+ }
+
@Test(timeOut = 20000)
void testZNodeBypassed() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");