This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 05efd10278eb1a3df3f24b9a2add390283196b09 Author: Michael Marshall <[email protected]> AuthorDate: Sun Oct 30 22:48:43 2022 -0700 [fix][ml] Persist correct markDeletePosition to prevent message loss (#18237) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 60 +++++----- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 123 ++++++++++++++++++++- 2 files changed, 153 insertions(+), 30 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index da855197df6..4f1a376771c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1153,29 +1153,33 @@ public class ManagedCursorImpl implements ManagedCursor { return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0); } - protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) { - if (position.equals(PositionImpl.EARLIEST)) { - position = ledger.getFirstPosition(); - } else if (position.equals(PositionImpl.LATEST)) { - position = ledger.getLastPosition().getNext(); + protected void internalResetCursor(PositionImpl proposedReadPosition, + AsyncCallbacks.ResetCursorCallback resetCursorCallback) { + final PositionImpl newReadPosition; + if (proposedReadPosition.equals(PositionImpl.EARLIEST)) { + newReadPosition = ledger.getFirstPosition(); + } else if (proposedReadPosition.equals(PositionImpl.LATEST)) { + newReadPosition = ledger.getLastPosition().getNext(); + } else { + newReadPosition = proposedReadPosition; } - log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name); + log.info("[{}] Initiate reset readPosition to {} on cursor {}", ledger.getName(), newReadPosition, name); synchronized (pendingMarkDeleteOps) { if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) { - log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}", - ledger.getName(), position, name); + log.error("[{}] reset requested - readPosition [{}], previous reset in progress - cursor {}", + ledger.getName(), newReadPosition, name); resetCursorCallback.resetFailed( new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"), - position); + newReadPosition); return; } } final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback; - final PositionImpl newPosition = position; + final PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition); VoidCallback finalCallback = new VoidCallback() { @Override @@ -1184,8 +1188,6 @@ public class ManagedCursorImpl implements ManagedCursor { // modify mark delete and read position since we are able to persist new position for cursor lock.writeLock().lock(); try { - PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition); - if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) { MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries( Range.closedOpen(newMarkDeletePosition, markDeletePosition))); @@ -1200,34 +1202,34 @@ public class ManagedCursorImpl implements ManagedCursor { if (config.isDeletionAtBatchIndexLevelEnabled()) { batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle); batchDeletedIndexes.clear(); - long[] resetWords = newPosition.ackSet; + long[] resetWords = newReadPosition.ackSet; if (resetWords != null) { BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords); - batchDeletedIndexes.put(newPosition, ackSet); + batchDeletedIndexes.put(newReadPosition, ackSet); } } PositionImpl oldReadPosition = readPosition; - if (oldReadPosition.compareTo(newPosition) >= 0) { - log.info("[{}] reset position to {} before current read position {} on cursor {}", - ledger.getName(), newPosition, oldReadPosition, name); + if (oldReadPosition.compareTo(newReadPosition) >= 0) { + log.info("[{}] reset readPosition to {} before current read readPosition {} on cursor {}", + ledger.getName(), newReadPosition, oldReadPosition, name); } else { - log.info("[{}] reset position to {} skipping from current read position {} on cursor {}", - ledger.getName(), newPosition, oldReadPosition, name); + log.info("[{}] reset readPosition to {} skipping from current read readPosition {} on " + + "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name); } - readPosition = newPosition; - ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition); + readPosition = newReadPosition; + ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition); } finally { lock.writeLock().unlock(); } synchronized (pendingMarkDeleteOps) { pendingMarkDeleteOps.clear(); if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { - log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", - ledger.getName(), newPosition, name); + log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}", + ledger.getName(), newReadPosition, name); } } - callback.resetComplete(newPosition); + callback.resetComplete(newReadPosition); updateLastActive(); } @@ -1235,20 +1237,20 @@ public class ManagedCursorImpl implements ManagedCursor { public void operationFailed(ManagedLedgerException exception) { synchronized (pendingMarkDeleteOps) { if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { - log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", - ledger.getName(), newPosition, name); + log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}", + ledger.getName(), newReadPosition, name); } } callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException( - "unable to persist position for cursor reset " + newPosition.toString()), newPosition); + "unable to persist readPosition for cursor reset " + newReadPosition), newReadPosition); } }; persistentMarkDeletePosition = null; inProgressMarkDeletePersistPosition = null; - lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null); - internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), + lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null); + internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 0e66e76d5c3..1a8feea1e0d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -93,6 +93,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.LongPairRangeSet; @@ -676,7 +677,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { } assertTrue(moveStatus.get()); PositionImpl earliestPos = new PositionImpl(actualEarliest.getLedgerId(), -1); - assertEquals(earliestPos, cursor.getReadPosition()); + assertEquals(cursor.getReadPosition(), earliestPos); moveStatus.set(false); // reset to one after last entry in a ledger should point to the first entry in the next ledger @@ -3283,6 +3284,126 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { }); } + @Test(timeOut = 20000) + public void testRecoverCursorAfterResetToLatestForNewEntry() throws Exception { + ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForNewEntry"); + ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest); + + // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + c.resetCursor(PositionImpl.LATEST); + + // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + final Position markDeleteBeforeRecover = c.getMarkDeletedPosition(); + final Position readPositionBeforeRecover = c.getReadPosition(); + + // Trigger the lastConfirmedEntry to move forward + ml.addEntry(new byte[1]); + + ManagedCursorInfo info = ManagedCursorInfo.newBuilder() + .setCursorsLedgerId(c.getCursorLedger()) + .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId()) + .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId()) + .setLastActive(0L) + .build(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + c.recoverFromLedger(info, new VoidCallback() { + @Override + public void operationComplete() { + latch.countDown(); + } + + @Override + public void operationFailed(ManagedLedgerException exception) { + failed.set(true); + latch.countDown(); + } + }); + + latch.await(); + if (failed.get()) { + fail("Cursor recovery should not fail"); + } + assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover); + assertEquals(c.getReadPosition(), readPositionBeforeRecover); + assertEquals(c.getNumberOfEntries(), 1L); + } + + @Test(timeOut = 20000) + public void testRecoverCursorAfterResetToLatestForMultipleEntries() throws Exception { + ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForMultipleEntries"); + ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest); + + // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + c.resetCursor(PositionImpl.LATEST); + + // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + // Trigger the lastConfirmedEntry to move forward + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + + c.resetCursor(PositionImpl.LATEST); + + assertEquals(c.getMarkDeletedPosition().getEntryId(), 3); + assertEquals(c.getReadPosition().getEntryId(), 4); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3); + + // Publish messages to move the lastConfirmedEntry field forward + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + + final Position markDeleteBeforeRecover = c.getMarkDeletedPosition(); + final Position readPositionBeforeRecover = c.getReadPosition(); + + ManagedCursorInfo info = ManagedCursorInfo.newBuilder() + .setCursorsLedgerId(c.getCursorLedger()) + .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId()) + .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId()) + .setLastActive(0L) + .build(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + c.recoverFromLedger(info, new VoidCallback() { + @Override + public void operationComplete() { + latch.countDown(); + } + + @Override + public void operationFailed(ManagedLedgerException exception) { + failed.set(true); + latch.countDown(); + } + }); + + latch.await(); + if (failed.get()) { + fail("Cursor recovery should not fail"); + } + assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover); + assertEquals(c.getReadPosition(), readPositionBeforeRecover); + assertEquals(c.getNumberOfEntries(), 2L); + } @Test void testAlwaysInactive() throws Exception { ManagedLedger ml = factory.open("testAlwaysInactive");
