This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 4ff8f93b157 [fix][client] Fix negative acknowledgement by messageId 
(#23060)
4ff8f93b157 is described below

commit 4ff8f93b15730f474fb42d9a2abb6f958ab4ab2a
Author: Hideaki Oguni <22386882+izum...@users.noreply.github.com>
AuthorDate: Mon Jul 29 16:29:59 2024 +0900

    [fix][client] Fix negative acknowledgement by messageId (#23060)
    
    (cherry picked from commit d4bbf10f58771e2d43e576dc3422e502834b1de4)
---
 .../org/apache/pulsar/client/impl/NegativeAcksTest.java     | 13 ++++++++-----
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java    |  2 +-
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index a6b77a1c727..a41b7f05a8e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -134,7 +134,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         Set<String> sentMessages = new HashSet<>();
 
         final int N = 10;
-        for (int i = 0; i < N; i++) {
+        for (int i = 0; i < N * 2; i++) {
             String value = "test-" + i;
             producer.sendAsync(value);
             sentMessages.add(value);
@@ -146,13 +146,18 @@ public class NegativeAcksTest extends 
ProducerConsumerBase {
             consumer.negativeAcknowledge(msg);
         }
 
+        for (int i = 0; i < N; i++) {
+            Message<String> msg = consumer.receive();
+            consumer.negativeAcknowledge(msg.getMessageId());
+        }
+
         assertTrue(consumer instanceof ConsumerBase<String>);
         assertEquals(((ConsumerBase<String>) 
consumer).getUnAckedMessageTracker().size(), 0);
 
         Set<String> receivedMessages = new HashSet<>();
 
         // All the messages should be received again
-        for (int i = 0; i < N; i++) {
+        for (int i = 0; i < N * 2; i++) {
             Message<String> msg = consumer.receive();
             receivedMessages.add(msg.getValue());
             consumer.acknowledge(msg);
@@ -310,9 +315,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
         // negative batch message id
-        unAckedMessageTracker.add(batchMessageId);
-        unAckedMessageTracker.add(batchMessageId2);
-        unAckedMessageTracker.add(batchMessageId3);
+        unAckedMessageTracker.add(messageId);
         consumer.negativeAcknowledge(batchMessageId);
         consumer.negativeAcknowledge(batchMessageId2);
         consumer.negativeAcknowledge(batchMessageId3);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6ddb0e1bc01..1806d13493b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -811,7 +811,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         negativeAcksTracker.add(messageId);
 
         // Ensure the message is not redelivered for ack-timeout, since we did 
receive an "ack"
-        unAckedMessageTracker.remove(messageId);
+        
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId));
     }
 
     @Override

Reply via email to