This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4a97b77f27e [fix][client] Fix repeat consume when using n-ack and 
batched messages (#21116)
4a97b77f27e is described below

commit 4a97b77f27e33a1fe78dd3d5e614eb3b20e910dc
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 5 10:28:17 2023 +0800

    [fix][client] Fix repeat consume when using n-ack and batched messages 
(#21116)
---
 .../BatchMessageWithBatchIndexLevelTest.java       | 85 ++++++++++++++++++++++
 .../PersistentAcknowledgmentsGroupingTracker.java  | 13 +++-
 2 files changed, 97 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 433f5e56d95..d04647e21c1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -27,19 +27,26 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "broker")
 public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
 
@@ -280,4 +287,82 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
         Awaitility.await().until(() -> 
getPulsar().getBrokerService().getTopic(topicName, false)
                 
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages()
 == 0);
     }
+
+    @Test
+    public void testNegativeAckAndLongAckDelayWillNotLeadRepeatConsume() 
throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp_");
+        final String subscriptionName = "s1";
+        final int redeliveryDelaySeconds = 2;
+
+        // Create producer and consumer.
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxMessages(1000)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .create();
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .negativeAckRedeliveryDelay(redeliveryDelaySeconds, 
TimeUnit.SECONDS)
+                .enableBatchIndexAcknowledgment(true)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .acknowledgmentGroupTime(1, TimeUnit.HOURS)
+                .subscribe();
+
+        // Send 10 messages in batch.
+        ArrayList<String> messagesSent = new ArrayList<>();
+        List<CompletableFuture<MessageId>> sendTasks = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            String msg = Integer.valueOf(i).toString();
+            sendTasks.add(producer.sendAsync(Integer.valueOf(i).toString()));
+            messagesSent.add(msg);
+        }
+        producer.flush();
+        FutureUtil.waitForAll(sendTasks).join();
+
+        // Receive messages.
+        ArrayList<String> messagesReceived = new ArrayList<>();
+        // NegativeAck "batchMessageIdIndex1" once.
+        boolean index1HasBeenNegativeAcked = false;
+        while (true) {
+            Message<String> message = consumer.receive(2, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            if (index1HasBeenNegativeAcked) {
+                messagesReceived.add(message.getValue());
+                consumer.acknowledge(message);
+                continue;
+            }
+            if (((MessageIdAdv) message.getMessageId()).getBatchIndex() == 1) {
+                consumer.negativeAcknowledge(message);
+                index1HasBeenNegativeAcked = true;
+                continue;
+            }
+            messagesReceived.add(message.getValue());
+            consumer.acknowledge(message);
+        }
+
+        // Receive negative acked messages.
+        // Wait the message negative acknowledgment finished.
+        int tripleRedeliveryDelaySeconds = redeliveryDelaySeconds * 3;
+        while (true) {
+            Message<String> message = 
consumer.receive(tripleRedeliveryDelaySeconds, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            messagesReceived.add(message.getValue());
+            consumer.acknowledge(message);
+        }
+
+        log.info("messagesSent: {}, messagesReceived: {}", messagesSent, 
messagesReceived);
+        Assert.assertEquals(messagesReceived.size(), messagesSent.size());
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 9086ccc4ef0..0cf776aea59 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -124,7 +124,18 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             // Already included in a cumulative ack
             return true;
         } else {
-            return 
pendingIndividualAcks.contains(MessageIdAdvUtils.discardBatch(messageIdAdv));
+            // If "batchIndexAckEnabled" is false, the batched messages 
acknowledgment will be traced by
+            // pendingIndividualAcks. So no matter what type the message ID 
is, check with "pendingIndividualAcks"
+            // first.
+            MessageIdAdv key = MessageIdAdvUtils.discardBatch(messageIdAdv);
+            if (pendingIndividualAcks.contains(key)) {
+                return true;
+            }
+            if (messageIdAdv.getBatchIndex() >= 0) {
+                ConcurrentBitSetRecyclable bitSet = 
pendingIndividualBatchIndexAcks.get(key);
+                return bitSet != null && 
!bitSet.get(messageIdAdv.getBatchIndex());
+            }
+            return false;
         }
     }
 

Reply via email to