This is an automated email from the ASF dual-hosted git repository. aahmed pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new dc7d01e Trim deleted entries after recover cursor. (#4987) dc7d01e is described below commit dc7d01efc6cf2df5631bc509758f2212bede35ce Author: lipenghui <peng...@apache.org> AuthorDate: Sat Sep 7 02:46:18 2019 +0800 Trim deleted entries after recover cursor. (#4987) * Trim deleted entries after recover cursor. * Fix errors * Add managed cursor unit tests. * Fix tests and handle cursor reset. * fix unit tests * Fix tests * Fix check style --- .../apache/bookkeeper/mledger/ManagedCursor.java | 14 +++++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 13 ++++- .../mledger/impl/ManagedCursorContainerTest.java | 11 ++++ .../bookkeeper/mledger/impl/ManagedCursorTest.java | 59 ++++++++++++++++++++++ .../apache/pulsar/broker/service/Dispatcher.java | 4 ++ .../PersistentDispatcherMultipleConsumers.java | 25 +++++++++ .../service/persistent/PersistentSubscription.java | 3 ++ .../ConcurrentOpenLongPairRangeSet.java | 14 +++++ .../common/util/collections/LongPairRangeSet.java | 19 +++++++ .../ConcurrentOpenLongPairRangeSetTest.java | 23 +++++++++ 10 files changed, 184 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 03a68ba..ae04269 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -20,9 +20,12 @@ package org.apache.bookkeeper.mledger; import com.google.common.annotations.Beta; import com.google.common.base.Predicate; +import com.google.common.collect.Range; + import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; @@ -30,6 +33,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback; +import org.apache.bookkeeper.mledger.impl.PositionImpl; /** * A ManangedCursor is a persisted cursor inside a ManagedLedger. @@ -594,4 +598,14 @@ public interface ManagedCursor { */ ManagedLedger getManagedLedger(); + /** + * Get last individual deleted range + * @return range + */ + Range<PositionImpl> getLastIndividualDeletedRange(); + + /** + * Trim delete entries for the given entries + */ + void trimDeletedEntries(List<Entry> entries); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8a6f9a2..cc425ae 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1050,7 +1050,7 @@ public class ManagedCursorImpl implements ManagedCursor { positions.stream() .filter(position -> individualDeletedMessages.contains(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId()) - || ((PositionImpl) position).compareTo(markDeletePosition) < 0) + || ((PositionImpl) position).compareTo(markDeletePosition) <= 0) .forEach(alreadyAcknowledgedPositions::add); } finally { lock.readLock().unlock(); @@ -2590,5 +2590,16 @@ public class ManagedCursorImpl implements ManagedCursor { return this.ledger; } + @Override + public Range<PositionImpl> getLastIndividualDeletedRange() { + return individualDeletedMessages.lastRange(); + } + + @Override + public void trimDeletedEntries(List<Entry> entries) { + entries.removeIf(entry -> ((PositionImpl) entry.getPosition()).compareTo(markDeletePosition) <= 0 + || individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId())); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index c415320..21f5747 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -26,6 +26,7 @@ import static org.testng.Assert.fail; import com.google.common.base.Predicate; import com.google.common.collect.Lists; +import com.google.common.collect.Range; import com.google.common.collect.Sets; import java.util.Collections; import java.util.List; @@ -315,6 +316,16 @@ public class ManagedCursorContainerTest { return null; } + @Override + public Range<PositionImpl> getLastIndividualDeletedRange() { + return null; + } + + @Override + public void trimDeletedEntries(List<Entry> entries) { + + } + } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 70db43c..b0db926 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -32,6 +32,7 @@ import static org.testng.Assert.fail; import com.google.common.base.Charsets; import com.google.common.collect.Lists; +import com.google.common.collect.Range; import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.nio.charset.Charset; @@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; + +import io.netty.buffer.ByteBufAllocator; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -2226,6 +2229,62 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { } @Test(timeOut = 20000) + void testGetLastIndividualDeletedRange() throws Exception { + ManagedLedger ledger = factory.open("test_last_individual_deleted"); + + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); + PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition(); + for(int i = 0; i < 10; i++) { + ledger.addEntry(("entry" + i).getBytes(Encoding)); + } + PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1); + PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2); + PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5); + PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6); + + c1.delete(Lists.newArrayList(p1, p2, p3, p4)); + + assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p3.getLedgerId(), + p3.getEntryId() - 1), p4)); + + PositionImpl p5 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 8); + c1.delete(p5); + + assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p5.getLedgerId(), + p5.getEntryId() - 1), p5)); + + } + + @Test(timeOut = 20000) + void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedException { + ManagedLedger ledger = factory.open("my_test_ledger"); + + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1"); + PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition(); + for(int i = 0; i < 10; i++) { + ledger.addEntry(("entry" + i).getBytes(Encoding)); + } + PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1); + PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2); + PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5); + PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6); + + c1.delete(Lists.newArrayList(p1, p2, p3, p4)); + + EntryImpl entry1 = EntryImpl.create(p1, ByteBufAllocator.DEFAULT.buffer(0)); + EntryImpl entry2 = EntryImpl.create(p2, ByteBufAllocator.DEFAULT.buffer(0)); + EntryImpl entry3 = EntryImpl.create(p3, ByteBufAllocator.DEFAULT.buffer(0)); + EntryImpl entry4 = EntryImpl.create(p4, ByteBufAllocator.DEFAULT.buffer(0)); + EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 7, + ByteBufAllocator.DEFAULT.buffer(0)); + List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3, entry4, entry5); + c1.trimDeletedEntries(entries); + assertEquals(entries.size(), 1); + assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId() , + markDeletedPosition.getEntryId() + 7)); + } + + @Test(timeOut = 20000) void outOfOrderAcks() throws Exception { ManagedLedger ledger = factory.open("outOfOrderAcks"); ManagedCursor c1 = ledger.openCursor("c1"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index cda9c09..5e6e72e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -94,4 +94,8 @@ public interface Dispatcher { default long getNumberOfDelayedMessages() { return 0; } + + default void cursorIsReset() { + //No-op + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 4dff4c4..91e4ab7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -32,6 +32,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import com.google.common.collect.Range; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -72,6 +73,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected final PersistentTopic topic; protected final ManagedCursor cursor; + protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery; private CompletableFuture<Void> closeFuture = null; LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); @@ -106,6 +108,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul super(subscription); this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.cursor = cursor; + this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange(); this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() @@ -431,6 +434,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) { + + if (entries == null || entries.size() == 0) { + return; + } + if (needTrimAckedMessages()) { + cursor.trimDeletedEntries(entries); + } int start = 0; int entriesToDispatch = entries.size(); long totalMessagesSent = 0; @@ -558,6 +568,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } + private boolean needTrimAckedMessages() { + if (lastIndividualDeletedRangeFromCursorRecovery == null) { + return false; + } else { + return lastIndividualDeletedRangeFromCursorRecovery.upperEndpoint() + .compareTo((PositionImpl) cursor.getReadPosition()) > 0; + } + } /** * returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits @@ -726,6 +744,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } } + @Override + public void cursorIsReset() { + if (this.lastIndividualDeletedRangeFromCursorRecovery != null) { + this.lastIndividualDeletedRangeFromCursorRecovery = null; + } + } + public PersistentTopic getTopic() { return topic; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 20d64ed..4a4e3aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -651,6 +651,9 @@ public class PersistentSubscription implements Subscription { log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName, finalPosition); } + if (dispatcher != null) { + dispatcher.cursorIsReset(); + } IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); future.complete(null); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java index 3af8fb7..0d14119 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java @@ -222,6 +222,9 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements @Override public Range<T> firstRange() { + if (rangeBitSetMap.isEmpty()) { + return null; + } Entry<Long, BitSet> firstSet = rangeBitSetMap.firstEntry(); int lower = firstSet.getValue().nextSetBit(0); int upper = Math.max(lower, firstSet.getValue().nextClearBit(lower) - 1); @@ -229,6 +232,17 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements } @Override + public Range<T> lastRange() { + if (rangeBitSetMap.isEmpty()) { + return null; + } + Entry<Long, BitSet> lastSet = rangeBitSetMap.lastEntry(); + int upper = lastSet.getValue().previousSetBit(lastSet.getValue().size()); + int lower = Math.min(lastSet.getValue().previousClearBit(upper), upper); + return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper)); + } + + @Override public int size() { if (updatedAfterCachedForSize) { AtomicInteger size = new AtomicInteger(0); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java index 2187ffe..0d19635 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java @@ -19,10 +19,13 @@ package org.apache.pulsar.common.util.collections; import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeSet; + import java.util.Collection; +import java.util.List; import java.util.Set; /** @@ -115,6 +118,13 @@ public interface LongPairRangeSet<T extends Comparable<T>> { Range<T> firstRange(); /** + * It returns very last biggest range in the rangeSet. + * + * @return last biggest range into the set + */ + Range<T> lastRange(); + + /** * Represents a function that accepts two long arguments and produces a result. * * @param <T> the type of the result. @@ -260,6 +270,15 @@ public interface LongPairRangeSet<T extends Comparable<T>> { } @Override + public Range<T> lastRange() { + if (set.asRanges().isEmpty()) { + return null; + } + List<Range<T>> list = Lists.newArrayList(set.asRanges().iterator()); + return list.get(list.size() - 1); + } + + @Override public int size() { return set.asRanges().size(); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java index 210b7e6..4adf14f 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java @@ -197,6 +197,7 @@ public class ConcurrentOpenLongPairRangeSetTest { @Test public void testFirstRange() { ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer); + assertNull(set.firstRange()); Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)); set.add(range); assertEquals(set.firstRange(), range); @@ -212,6 +213,28 @@ public class ConcurrentOpenLongPairRangeSetTest { } @Test + public void testLastRange() { + ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer); + assertNull(set.lastRange()); + Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)); + set.add(range); + assertEquals(set.lastRange(), range); + assertEquals(set.size(), 1); + range = Range.openClosed(new LongPair(0, 98), new LongPair(0, 105)); + set.add(range); + assertEquals(set.lastRange(), Range.openClosed(new LongPair(0, 97), new LongPair(0, 105))); + assertEquals(set.size(), 1); + range = Range.openClosed(new LongPair(1, 5), new LongPair(1, 75)); + set.add(range); + assertEquals(set.lastRange(), range); + assertEquals(set.size(), 2); + range = Range.openClosed(new LongPair(1, 80), new LongPair(1, 120)); + set.add(range); + assertEquals(set.lastRange(), range); + assertEquals(set.size(), 3); + } + + @Test public void testToString() { ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer); Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));