This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 81202e1 support use `BitSet` generate the `BatchMessageAcker` (#7909) 81202e1 is described below commit 81202e147f9460e3ab60d82fe0ad55510a258ed6 Author: ran <gaoran...@126.com> AuthorDate: Tue Sep 1 11:54:21 2020 +0800 support use `BitSet` generate the `BatchMessageAcker` (#7909) Motivation Currently, we have to know the batchSize to generate BatchMessageAcker. If we could get the batch index ack bitSet from Broker we could generate the BatchMessageAcker by the bitSet, this is useful for consuming transaction messages, we don't need to change the protocol to get the total message number of one transaction. Modifications Add a new static method to generate the BatchMessageAcker by BitSet. --- .../pulsar/client/impl/BatchMessageAcker.java | 5 +++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 9 +++++++-- .../PersistentAcknowledgmentsGroupingTracker.java | 18 +++++++++++++---- .../pulsar/client/impl/BatchMessageAckerTest.java | 12 +++++++++++ .../collections/ConcurrentBitSetRecyclable.java | 7 +++++++ .../ConcurrentBitSetRecyclableTest.java | 23 ++++++++++++++++++++++ 6 files changed, 68 insertions(+), 6 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java index d46d3b3..e34d1a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java @@ -34,6 +34,11 @@ class BatchMessageAcker { return new BatchMessageAcker(bitSet, batchSize); } + // Use the param bitSet as the BatchMessageAcker's bitSet, don't care about the batchSize. + static BatchMessageAcker newAcker(BitSet bitSet) { + return new BatchMessageAcker(bitSet, -1); + } + // bitset shared across messages in the same batch. private final int batchSize; private final BitSet bitSet; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b1df6f8..cfaaa89 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1356,16 +1356,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle // create ack tracker for entry aka batch MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); - BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize); List<MessageImpl<T>> possibleToDeadLetter = null; if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { possibleToDeadLetter = new ArrayList<>(); } - int skippedMessages = 0; + + BatchMessageAcker acker; BitSetRecyclable ackBitSet = null; if (ackSet != null && ackSet.size() > 0) { ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)); + acker = BatchMessageAcker.newAcker(BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet))); + } else { + acker = BatchMessageAcker.newAcker(batchSize); } + + int skippedMessages = 0; try { int startBatchIndex = Math.max(messageId.getBatchIndex(), 0); for (int i = startBatchIndex; i < batchSize; ++i) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 6a4deef..fd61c42 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -168,8 +168,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } else if (ackType == AckType.Individual) { ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), (v) -> { - ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create(); - value.set(0, batchSize); + ConcurrentBitSetRecyclable value; + if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) { + value = ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet()); + } else { + value = ConcurrentBitSetRecyclable.create(); + value.set(0, batchSize); + } return value; }); bitSet.clear(batchIndex); @@ -221,8 +226,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments if (cnx == null) { return false; } - BitSetRecyclable bitSet = BitSetRecyclable.create(); - bitSet.set(0, batchSize); + BitSetRecyclable bitSet; + if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) { + bitSet = BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray()); + } else { + bitSet = BitSetRecyclable.create(); + bitSet.set(0, batchSize); + } if (ackType == AckType.Cumulative) { bitSet.clear(0, batchIndex + 1); } else { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java index 2bfa620..8c1565e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java @@ -22,9 +22,12 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.BitSet; + public class BatchMessageAckerTest { private static final int BATCH_SIZE = 10; @@ -68,4 +71,13 @@ public class BatchMessageAckerTest { assertEquals(0, acker.getOutstandingAcks()); } + @Test + public void testBitSetAcker() { + BitSet bitSet = BitSet.valueOf(acker.getBitSet().toLongArray()); + BatchMessageAcker bitSetAcker = BatchMessageAcker.newAcker(bitSet); + + Assert.assertEquals(acker.getBitSet(), bitSetAcker.getBitSet()); + Assert.assertEquals(acker.getOutstandingAcks(), bitSetAcker.getOutstandingAcks()); + } + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java index 8e787c1..21ee42b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java @@ -20,6 +20,7 @@ package org.apache.pulsar.common.util.collections; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; +import java.util.BitSet; /** * Safe multithreaded version of {@code BitSet} and leverage netty recycler. @@ -43,6 +44,12 @@ public class ConcurrentBitSetRecyclable extends ConcurrentBitSet { return RECYCLER.get(); } + public static ConcurrentBitSetRecyclable create(BitSet bitSet) { + ConcurrentBitSetRecyclable recyclable = RECYCLER.get(); + recyclable.or(bitSet); + return recyclable; + } + public void recycle() { this.clear(); recyclerHandle.recycle(this); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java index b037c70..e937176 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.util.collections; +import java.util.BitSet; import org.testng.Assert; import org.testng.annotations.Test; @@ -34,4 +35,26 @@ public class ConcurrentBitSetRecyclableTest { Assert.assertFalse(bitset2.get(3)); Assert.assertNotSame(bitset3, bitset1); } + + @Test + public void testGenerateByBitSet() { + BitSet bitSet = new BitSet(); + ConcurrentBitSetRecyclable bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + + bitSet.set(0, 10); + bitSetRecyclable.recycle(); + bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + + bitSet.clear(5); + bitSetRecyclable.recycle(); + bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + + bitSet.clear(); + bitSetRecyclable.recycle(); + bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); + Assert.assertEquals(bitSet, bitSetRecyclable); + } }