This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 02f3ecc1d20 [fix][client] Fix negative acknowledgement by messageId 
(#23060)
02f3ecc1d20 is described below

commit 02f3ecc1d20cda17edd4c308f401b3c15463753c
Author: Hideaki Oguni <22386882+izum...@users.noreply.github.com>
AuthorDate: Mon Jul 29 16:29:59 2024 +0900

    [fix][client] Fix negative acknowledgement by messageId (#23060)
    
    (cherry picked from commit d4bbf10f58771e2d43e576dc3422e502834b1de4)
---
 .../org/apache/pulsar/client/impl/NegativeAcksTest.java     | 13 ++++++++-----
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java    |  2 +-
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 7812844bdc2..d4ea058f50f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -135,7 +135,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         Set<String> sentMessages = new HashSet<>();
 
         final int N = 10;
-        for (int i = 0; i < N; i++) {
+        for (int i = 0; i < N * 2; i++) {
             String value = "test-" + i;
             producer.sendAsync(value);
             sentMessages.add(value);
@@ -146,11 +146,16 @@ public class NegativeAcksTest extends 
ProducerConsumerBase {
             Message<String> msg = consumer.receive();
             consumer.negativeAcknowledge(msg);
         }
+        for (int i = 0; i < N; i++) {
+            Message<String> msg = consumer.receive();
+            consumer.negativeAcknowledge(msg.getMessageId());
+        }
+
 
         Set<String> receivedMessages = new HashSet<>();
 
         // All the messages should be received again
-        for (int i = 0; i < N; i++) {
+        for (int i = 0; i < N * 2; i++) {
             Message<String> msg = consumer.receive();
             receivedMessages.add(msg.getValue());
             consumer.acknowledge(msg);
@@ -308,9 +313,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
         // negative batch message id
-        unAckedMessageTracker.add(batchMessageId);
-        unAckedMessageTracker.add(batchMessageId2);
-        unAckedMessageTracker.add(batchMessageId3);
+        unAckedMessageTracker.add(messageId);
         consumer.negativeAcknowledge(batchMessageId);
         consumer.negativeAcknowledge(batchMessageId2);
         consumer.negativeAcknowledge(batchMessageId3);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ca5d0f34676..bf703ce047c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -754,7 +754,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         negativeAcksTracker.add(messageId);
 
         // Ensure the message is not redelivered for ack-timeout, since we did 
receive an "ack"
-        unAckedMessageTracker.remove(messageId);
+        
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId));
     }
 
     @Override

Reply via email to