This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 043cb9fdee098f962c9940995fc57826d2db27d6 Author: LinChen <[email protected]> AuthorDate: Fri Oct 21 18:22:02 2022 +0800 When accumulating acks, update the batch index in batchDeletedIndexes and check whether it is greater than the batch index of the previous ack (#18042) Co-authored-by: leolinchen <[email protected]> --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 58 ++++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 6de68580826..ed861e6830f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1828,7 +1828,27 @@ public class ManagedCursorImpl implements ManagedCursor { if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { if (newPosition.ackSet != null) { - batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet)); + AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>(); + BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(newPosition.ackSet); + // In order to prevent the batch index recorded in batchDeletedIndexes from rolling back, + // only update batchDeletedIndexes when the submitted batch index is greater + // than the recorded index. + batchDeletedIndexes.compute(newPosition, + (k, v) -> { + if (v == null) { + return givenBitSet; + } + if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) { + bitSetRecyclable.set(v); + return givenBitSet; + } else { + bitSetRecyclable.set(givenBitSet); + return v; + } + }); + if (bitSetRecyclable.get() != null) { + bitSetRecyclable.get().recycle(); + } newPosition = ledger.getPreviousPosition(newPosition); } Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, newPosition); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 052f6ac2d54..0e66e76d5c3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -93,6 +93,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.common.api.proto.IntRange; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; @@ -3346,6 +3348,37 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(c1.getReadPosition(), positions[markDelete + 1]); } + @Test + public void testBatchIndexMarkdelete() throws ManagedLedgerException, InterruptedException { + ManagedLedger ledger = factory.open("test_batch_index_delete"); + ManagedCursor cursor = ledger.openCursor("c1"); + + final int totalEntries = 100; + final Position[] positions = new Position[totalEntries]; + for (int i = 0; i < totalEntries; i++) { + // add entry + positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding)); + } + assertEquals(cursor.getNumberOfEntries(), totalEntries); + markDeleteBatchIndex(cursor, positions[0], 10, 3); + List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10); + Assert.assertEquals(1, deletedIndexes.size()); + Assert.assertEquals(0, deletedIndexes.get(0).getStart()); + Assert.assertEquals(3, deletedIndexes.get(0).getEnd()); + + markDeleteBatchIndex(cursor, positions[0], 10, 4); + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10); + Assert.assertEquals(1, deletedIndexes.size()); + Assert.assertEquals(0, deletedIndexes.get(0).getStart()); + Assert.assertEquals(4, deletedIndexes.get(0).getEnd()); + + markDeleteBatchIndex(cursor, positions[0], 10, 2); + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10); + Assert.assertEquals(1, deletedIndexes.size()); + Assert.assertEquals(0, deletedIndexes.get(0).getStart()); + Assert.assertEquals(4, deletedIndexes.get(0).getEnd()); + } + @Test public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedException { ManagedLedger ledger = factory.open("test_batch_index_delete"); @@ -3477,6 +3510,31 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { pos.ackSet = null; } + private void markDeleteBatchIndex(ManagedCursor cursor, Position position, int batchSize, int batchIndex + ) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + PositionImpl pos = (PositionImpl) position; + BitSetRecyclable bitSet = new BitSetRecyclable(); + bitSet.set(0, batchSize); + bitSet.clear(0, batchIndex + 1); + + pos.ackSet = bitSet.toLongArray(); + + cursor.asyncMarkDelete(pos, new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + } + + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + }, null); + latch.await(); + pos.ackSet = null; + } + private List<IntRange> getAckedIndexRange(long[] bitSetLongArray, int batchSize) { if (bitSetLongArray == null) { return null;
