This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4a97b77f27e [fix][client] Fix repeat consume when using n-ack and
batched messages (#21116)
4a97b77f27e is described below
commit 4a97b77f27e33a1fe78dd3d5e614eb3b20e910dc
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 5 10:28:17 2023 +0800
[fix][client] Fix repeat consume when using n-ack and batched messages
(#21116)
---
.../BatchMessageWithBatchIndexLevelTest.java | 85 ++++++++++++++++++++++
.../PersistentAcknowledgmentsGroupingTracker.java | 13 +++-
2 files changed, 97 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 433f5e56d95..d04647e21c1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -27,19 +27,26 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+@Slf4j
@Test(groups = "broker")
public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
@@ -280,4 +287,82 @@ public class BatchMessageWithBatchIndexLevelTest extends
BatchMessageTest {
Awaitility.await().until(() ->
getPulsar().getBrokerService().getTopic(topicName, false)
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages()
== 0);
}
+
+ @Test
+ public void testNegativeAckAndLongAckDelayWillNotLeadRepeatConsume()
throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp_");
+ final String subscriptionName = "s1";
+ final int redeliveryDelaySeconds = 2;
+
+ // Create producer and consumer.
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(true)
+ .batchingMaxMessages(1000)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .create();
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>)
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .negativeAckRedeliveryDelay(redeliveryDelaySeconds,
TimeUnit.SECONDS)
+ .enableBatchIndexAcknowledgment(true)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .acknowledgmentGroupTime(1, TimeUnit.HOURS)
+ .subscribe();
+
+ // Send 10 messages in batch.
+ ArrayList<String> messagesSent = new ArrayList<>();
+ List<CompletableFuture<MessageId>> sendTasks = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String msg = Integer.valueOf(i).toString();
+ sendTasks.add(producer.sendAsync(Integer.valueOf(i).toString()));
+ messagesSent.add(msg);
+ }
+ producer.flush();
+ FutureUtil.waitForAll(sendTasks).join();
+
+ // Receive messages.
+ ArrayList<String> messagesReceived = new ArrayList<>();
+ // NegativeAck "batchMessageIdIndex1" once.
+ boolean index1HasBeenNegativeAcked = false;
+ while (true) {
+ Message<String> message = consumer.receive(2, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ if (index1HasBeenNegativeAcked) {
+ messagesReceived.add(message.getValue());
+ consumer.acknowledge(message);
+ continue;
+ }
+ if (((MessageIdAdv) message.getMessageId()).getBatchIndex() == 1) {
+ consumer.negativeAcknowledge(message);
+ index1HasBeenNegativeAcked = true;
+ continue;
+ }
+ messagesReceived.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ // Receive negative acked messages.
+ // Wait the message negative acknowledgment finished.
+ int tripleRedeliveryDelaySeconds = redeliveryDelaySeconds * 3;
+ while (true) {
+ Message<String> message =
consumer.receive(tripleRedeliveryDelaySeconds, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ messagesReceived.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ log.info("messagesSent: {}, messagesReceived: {}", messagesSent,
messagesReceived);
+ Assert.assertEquals(messagesReceived.size(), messagesSent.size());
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 9086ccc4ef0..0cf776aea59 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -124,7 +124,18 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
// Already included in a cumulative ack
return true;
} else {
- return
pendingIndividualAcks.contains(MessageIdAdvUtils.discardBatch(messageIdAdv));
+ // If "batchIndexAckEnabled" is false, the batched messages
acknowledgment will be traced by
+ // pendingIndividualAcks. So no matter what type the message ID
is, check with "pendingIndividualAcks"
+ // first.
+ MessageIdAdv key = MessageIdAdvUtils.discardBatch(messageIdAdv);
+ if (pendingIndividualAcks.contains(key)) {
+ return true;
+ }
+ if (messageIdAdv.getBatchIndex() >= 0) {
+ ConcurrentBitSetRecyclable bitSet =
pendingIndividualBatchIndexAcks.get(key);
+ return bitSet != null &&
!bitSet.get(messageIdAdv.getBatchIndex());
+ }
+ return false;
}
}