This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb343fda471244dc22f0ae5cb764484d855a6954
Author: lipenghui <peng...@apache.org>
AuthorDate: Tue Jul 7 00:40:33 2020 +0800

    Fix batch ackset recycled multiple times. (#7409)
    
    * Fix batch ackset recycled multiple times.
    
    * Apply comments.
    
    * Update 
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
    
    (cherry picked from commit 25a690734f278299bfdaae118a2c02ecc25c125e)
---
 .../client/impl/BatchMessageIndexAckTest.java      | 31 ++++++++++++++++++++++
 .../PersistentAcknowledgmentsGroupingTracker.java  |  1 +
 .../apache/pulsar/common/protocol/Commands.java    |  3 ---
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 114b9ae..3150f10 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -181,4 +181,35 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
         // broker also need to handle the available permits.
         Assert.assertEquals(received.size(), 100);
     }
+
+    @Test
+    public void testDoNotRecycleAckSetMultipleTimes() throws Exception  {
+        final String topic = 
"persistent://my-property/my-ns/testSafeAckSetRecycle";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .batchingMaxMessages(10)
+                .blockIfQueueFull(true).topic(topic)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS)
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 100;
+        for (int i = 0; i < messages; i++) {
+            producer.sendAsync("Hello Pulsar".getBytes());
+        }
+
+        // Should not throw an exception.
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledgeCumulative(consumer.receive());
+            // make sure the group ack flushed.
+            Thread.sleep(2);
+        }
+
+        producer.close();
+        consumer.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 937f005..6908979 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -205,6 +205,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
 
         final ByteBuf cmd = Commands.newAck(consumer.consumerId, 
msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties);
+        bitSet.recycle();
         cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
         return true;
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 0222f3c..7ae21e7 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -961,9 +961,6 @@ public class Commands {
 
         ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack));
         ack.recycle();
-        if (ackSet != null) {
-            ackSet.recycle();
-        }
         ackBuilder.recycle();
         messageIdDataBuilder.recycle();
         messageIdData.recycle();

Reply via email to