This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eb343fda471244dc22f0ae5cb764484d855a6954 Author: lipenghui <peng...@apache.org> AuthorDate: Tue Jul 7 00:40:33 2020 +0800 Fix batch ackset recycled multiple times. (#7409) * Fix batch ackset recycled multiple times. * Apply comments. * Update pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java (cherry picked from commit 25a690734f278299bfdaae118a2c02ecc25c125e) --- .../client/impl/BatchMessageIndexAckTest.java | 31 ++++++++++++++++++++++ .../PersistentAcknowledgmentsGroupingTracker.java | 1 + .../apache/pulsar/common/protocol/Commands.java | 3 --- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java index 114b9ae..3150f10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java @@ -181,4 +181,35 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { // broker also need to handle the available permits. Assert.assertEquals(received.size(), 100); } + + @Test + public void testDoNotRecycleAckSetMultipleTimes() throws Exception { + final String topic = "persistent://my-property/my-ns/testSafeAckSetRecycle"; + + Producer<byte[]> producer = pulsarClient.newProducer() + .batchingMaxMessages(10) + .blockIfQueueFull(true).topic(topic) + .create(); + + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS) + .topic(topic) + .subscriptionName("test") + .subscribe(); + + final int messages = 100; + for (int i = 0; i < messages; i++) { + producer.sendAsync("Hello Pulsar".getBytes()); + } + + // Should not throw an exception. + for (int i = 0; i < messages; i++) { + consumer.acknowledgeCumulative(consumer.receive()); + // make sure the group ack flushed. + Thread.sleep(2); + } + + producer.close(); + consumer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 937f005..6908979 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -205,6 +205,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties); + bitSet.recycle(); cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); return true; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 0222f3c..7ae21e7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -961,9 +961,6 @@ public class Commands { ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack)); ack.recycle(); - if (ackSet != null) { - ackSet.recycle(); - } ackBuilder.recycle(); messageIdDataBuilder.recycle(); messageIdData.recycle();