This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0be16ea566084706ae34b388e7c1d5b0fcca9e4d Author: lipenghui <peng...@apache.org> AuthorDate: Mon Jul 27 21:12:10 2020 +0800 Fix batch index filter issue in Consumer. (#7654) ### Motivation Fix batch index filter issue in Consumer. The previous logic is wrong at https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1344, this should be opposite. (cherry picked from commit e9a0fd1e9415b1d19e877e315261f923a86fe073) --- .../client/impl/BatchMessageIndexAckTest.java | 23 +++++++++++++++++----- .../apache/pulsar/client/impl/ConsumerImpl.java | 10 +++++++++- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java index 582d461..8f76561 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java @@ -84,22 +84,35 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { } FutureUtil.waitForAll(futures).get(); + List<MessageId> acked = new ArrayList<>(50); for (int i = 0; i < messages; i++) { + Message<Integer> msg = consumer.receive(); if (i % 2 == 0) { - consumer.acknowledge(consumer.receive()); + consumer.acknowledge(msg); + acked.add(msg.getMessageId()); } else { consumer.negativeAcknowledge(consumer.receive()); } } - List<Message<Integer>> received = new ArrayList<>(50); + List<MessageId> received = new ArrayList<>(50); for (int i = 0; i < 50; i++) { - received.add(consumer.receive()); + received.add(consumer.receive().getMessageId()); } Assert.assertEquals(received.size(), 50); + acked.retainAll(received); + Assert.assertEquals(acked.size(), 0); - Message<Integer> moreMessage = consumer.receive(1, TimeUnit.SECONDS); + for (MessageId messageId : received) { + consumer.acknowledge(messageId); + } + + Thread.sleep(1000); + + consumer.redeliverUnacknowledgedMessages(); + + Message<Integer> moreMessage = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNull(moreMessage); futures.clear(); @@ -109,7 +122,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { FutureUtil.waitForAll(futures).get(); for (int i = 0; i < 50; i++) { - received.add(consumer.receive()); + received.add(consumer.receive().getMessageId()); } // Ensure the flow permit is work well since the client skip the acked batch index, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 28995ad..f9546e9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -101,6 +101,7 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SafeCollectionUtils; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; @@ -1304,6 +1305,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle possibleToDeadLetter = new ArrayList<>(); } int skippedMessages = 0; + BitSetRecyclable ackBitSet = null; + if (ackSet != null && ackSet.size() > 0) { + ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)); + } try { for (int i = 0; i < batchSize; ++i) { if (log.isDebugEnabled()) { @@ -1337,7 +1342,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle continue; } - if (ackSet != null && BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)).get(i)) { + if (ackBitSet != null && !ackBitSet.get(i)) { singleMessagePayload.release(); singleMessageMetadataBuilder.recycle(); ++skippedMessages; @@ -1367,6 +1372,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle singleMessagePayload.release(); singleMessageMetadataBuilder.recycle(); } + if (ackBitSet != null) { + ackBitSet.recycle(); + } } catch (IOException e) { log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName); discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);