This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0be16ea566084706ae34b388e7c1d5b0fcca9e4d
Author: lipenghui <peng...@apache.org>
AuthorDate: Mon Jul 27 21:12:10 2020 +0800

    Fix batch index filter issue in Consumer. (#7654)
    
    ### Motivation
    
    Fix batch index filter issue in Consumer. The previous logic is wrong at 
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1344,
 this should be opposite.
    
    (cherry picked from commit e9a0fd1e9415b1d19e877e315261f923a86fe073)
---
 .../client/impl/BatchMessageIndexAckTest.java      | 23 +++++++++++++++++-----
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 10 +++++++++-
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 582d461..8f76561 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -84,22 +84,35 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
         }
         FutureUtil.waitForAll(futures).get();
 
+        List<MessageId> acked = new ArrayList<>(50);
         for (int i = 0; i < messages; i++) {
+            Message<Integer> msg = consumer.receive();
             if (i % 2 == 0) {
-                consumer.acknowledge(consumer.receive());
+                consumer.acknowledge(msg);
+                acked.add(msg.getMessageId());
             } else {
                 consumer.negativeAcknowledge(consumer.receive());
             }
         }
 
-        List<Message<Integer>> received = new ArrayList<>(50);
+        List<MessageId> received = new ArrayList<>(50);
         for (int i = 0; i < 50; i++) {
-            received.add(consumer.receive());
+            received.add(consumer.receive().getMessageId());
         }
 
         Assert.assertEquals(received.size(), 50);
+        acked.retainAll(received);
+        Assert.assertEquals(acked.size(), 0);
 
-        Message<Integer> moreMessage = consumer.receive(1, TimeUnit.SECONDS);
+        for (MessageId messageId : received) {
+            consumer.acknowledge(messageId);
+        }
+
+        Thread.sleep(1000);
+
+        consumer.redeliverUnacknowledgedMessages();
+
+        Message<Integer> moreMessage = consumer.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(moreMessage);
 
         futures.clear();
@@ -109,7 +122,7 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
         FutureUtil.waitForAll(futures).get();
 
         for (int i = 0; i < 50; i++) {
-            received.add(consumer.receive());
+            received.add(consumer.receive().getMessageId());
         }
 
         // Ensure the flow permit is work well since the client skip the acked 
batch index,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 28995ad..f9546e9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
@@ -1304,6 +1305,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             possibleToDeadLetter = new ArrayList<>();
         }
         int skippedMessages = 0;
+        BitSetRecyclable ackBitSet = null;
+        if (ackSet != null && ackSet.size() > 0) {
+            ackBitSet = 
BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
+        }
         try {
             for (int i = 0; i < batchSize; ++i) {
                 if (log.isDebugEnabled()) {
@@ -1337,7 +1342,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     continue;
                 }
 
-                if (ackSet != null && 
BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)).get(i)) {
+                if (ackBitSet != null && !ackBitSet.get(i)) {
                     singleMessagePayload.release();
                     singleMessageMetadataBuilder.recycle();
                     ++skippedMessages;
@@ -1367,6 +1372,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 singleMessagePayload.release();
                 singleMessageMetadataBuilder.recycle();
             }
+            if (ackBitSet != null) {
+                ackBitSet.recycle();
+            }
         } catch (IOException e) {
             log.warn("[{}] [{}] unable to obtain message in batch", 
subscription, consumerName);
             discardCorruptedMessage(messageId, cnx, 
ValidationError.BatchDeSerializeError);

Reply via email to