This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new a3122b106fc [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) a3122b106fc is described below commit a3122b106fc227be218af9b611626fcbfb01a5e4 Author: Cong Zhao <zhaoc...@apache.org> AuthorDate: Tue Apr 23 00:05:41 2024 +0800 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) (cherry picked from commit 35599b7325347838203a92ca63b78d134b7864c2) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1274b347263..bd74629e605 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3700,7 +3700,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { Long nextLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() + 1); // This means it has jumped to the last position if (nextLedgerId == null) { - if (currentLedgerEntries == 0) { + if (currentLedgerEntries == 0 && currentLedger != null) { return PositionImpl.get(currentLedger.getId(), 0); } return lastConfirmedEntry.getNext(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c9bd64171c1..4e3f8b79084 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4695,5 +4695,66 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } + @Test + public void testRecoverCursorWithTerminateManagedLedger() throws Exception { + String mlName = "my_test_ledger"; + String cursorName = "c1"; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); + + // Write some data. + Position p0 = ledger.addEntry("entry-0".getBytes()); + Position p1 = ledger.addEntry("entry-1".getBytes()); + + // Read message. + List<Entry> entries = c1.readEntries(2); + assertEquals(entries.size(), 2); + assertEquals(entries.get(0).getPosition(), p0); + assertEquals(entries.get(1).getPosition(), p1); + entries.forEach(Entry::release); + + // Mark delete the last message. + c1.markDelete(p1); + Position markDeletedPosition = c1.getMarkDeletedPosition(); + Assert.assertEquals(markDeletedPosition, p1); + + // Terminate the managed ledger. + Position lastPosition = ledger.terminate(); + assertEquals(lastPosition, p1); + + // Close the ledger. + ledger.close(); + + // Reopen the ledger. + ledger = (ManagedLedgerImpl) factory.open(mlName, config); + BookKeeper mockBookKeeper = mock(BookKeeper.class); + final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ledger, + cursorName); + + CompletableFuture<Void> recoverFuture = new CompletableFuture<>(); + // Recover the cursor. + cursor.recover(new VoidCallback() { + @Override + public void operationComplete() { + recoverFuture.complete(null); + } + + @Override + public void operationFailed(ManagedLedgerException exception) { + recoverFuture.completeExceptionally(exception); + } + }); + + recoverFuture.join(); + assertTrue(recoverFuture.isDone()); + assertFalse(recoverFuture.isCompletedExceptionally()); + + // Verify the cursor state. + assertEquals(cursor.getMarkDeletedPosition(), markDeletedPosition); + assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext()); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); }