This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 64598b40db3 [fix] [client] call redeliver 1 msg but did 2 msgs (#23943)
64598b40db3 is described below

commit 64598b40db3c60ae0d548c5742ed28df2edec28d
Author: fengyubiao <[email protected]>
AuthorDate: Mon Feb 10 09:44:07 2025 +0800

    [fix] [client] call redeliver 1 msg but did 2 msgs (#23943)
    
    (cherry picked from commit 7a79c78f8e6f4b52f13be1c6441f4b007d9a00fe)
---
 .../BatchMessageWithBatchIndexLevelTest.java       | 62 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 31 ++++-------
 .../collections/GrowableArrayBlockingQueue.java    |  8 +++
 3 files changed, 81 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 8e902d5d1e7..52147f74f4a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -59,6 +59,7 @@ import 
org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -137,6 +138,67 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
         });
     }
 
+    @DataProvider
+    public Object[][] enabledBatchSend() {
+        return new Object[][] {
+                {false},
+                {true}
+        };
+    }
+
+    @Test(dataProvider = "enabledBatchSend")
+    @SneakyThrows
+    public void testBatchMessageNAck(boolean enabledBatchSend) {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+        final String subscriptionName = "s1";
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName(subscriptionName)
+                .receiverQueueSize(21)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+                .subscribe();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .batchingMaxMessages(20)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .enableBatching(enabledBatchSend)
+                .create();
+        final PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        final PersistentDispatcherMultipleConsumers dispatcher =
+                (PersistentDispatcherMultipleConsumers) 
topic.getSubscription(subscriptionName).getDispatcher();
+
+        // Send messages: 20 * 2.
+        for (int i = 0; i < 40; i++) {
+            byte[] message = ("batch-message-" + i).getBytes();
+            if (i == 19 || i == 39) {
+                producer.newMessage().value(message).send();
+            } else {
+                producer.newMessage().value(message).sendAsync();
+            }
+        }
+        Awaitility.await().untilAsserted(() -> {
+            if (enabledBatchSend) {
+                assertEquals(consumer.numMessagesInQueue(), 40);
+            } else {
+                assertEquals(consumer.numMessagesInQueue(), 21);
+            }
+        });
+
+        // Negative ack and verify result/
+        Message<byte[]> receive1 = consumer.receive();
+        consumer.pause();
+        consumer.negativeAcknowledge(receive1);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(consumer.numMessagesInQueue(), 20);
+            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
+        });
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
+
     @Test
     public void testBatchMessageMultiNegtiveAck() throws Exception{
         final String topicName = 
"persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d5bf2619b7f..7186bfd3fb1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2812,27 +2812,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
         int messagesFromQueue = 0;
-        Message<T> peek = incomingMessages.peek();
-        if (peek != null) {
-            MessageIdAdv messageId = 
MessageIdAdvUtils.discardBatch(peek.getMessageId());
-            if (!messageIds.contains(messageId)) {
-                // first message is not expired, then no message is expired in 
queue.
-                return 0;
-            }
-
-            // try not to remove elements that are added while we remove
-            Message<T> message = incomingMessages.poll();
-            while (message != null) {
-                decreaseIncomingMessageSize(message);
-                messagesFromQueue++;
-                MessageIdAdv id = 
MessageIdAdvUtils.discardBatch(message.getMessageId());
-                if (!messageIds.contains(id)) {
-                    messageIds.add(id);
-                    break;
-                }
-                message.release();
-                message = incomingMessages.poll();
+        Message<T> message;
+        while (true) {
+            message = incomingMessages.pollIf(msg -> {
+                MessageId idPolled = 
MessageIdAdvUtils.discardBatch(msg.getMessageId());
+                return messageIds.contains(idPolled);
+            });
+            if (message == null) {
+                break;
             }
+            decreaseIncomingMessageSize(message);
+            messagesFromQueue++;
+            message.release();
         }
         return messagesFromQueue;
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
index 467a455ed8b..94bfad1fbd2 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.StampedLock;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import javax.annotation.Nullable;
 
 /**
@@ -83,10 +84,17 @@ public class GrowableArrayBlockingQueue<T> extends 
AbstractQueue<T> implements B
 
     @Override
     public T poll() {
+        return pollIf(v -> true);
+    }
+
+    public T pollIf(Predicate<T> predicate) {
         headLock.lock();
         try {
             if (SIZE_UPDATER.get(this) > 0) {
                 T item = data[headIndex.value];
+                if (!predicate.test(item)) {
+                    return null;
+                }
                 data[headIndex.value] = null;
                 headIndex.value = (headIndex.value + 1) & (data.length - 1);
                 SIZE_UPDATER.decrementAndGet(this);

Reply via email to