This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 02f3ecc1d20 [fix][client] Fix negative acknowledgement by messageId (#23060) 02f3ecc1d20 is described below commit 02f3ecc1d20cda17edd4c308f401b3c15463753c Author: Hideaki Oguni <22386882+izum...@users.noreply.github.com> AuthorDate: Mon Jul 29 16:29:59 2024 +0900 [fix][client] Fix negative acknowledgement by messageId (#23060) (cherry picked from commit d4bbf10f58771e2d43e576dc3422e502834b1de4) --- .../org/apache/pulsar/client/impl/NegativeAcksTest.java | 13 ++++++++----- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 7812844bdc2..d4ea058f50f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -135,7 +135,7 @@ public class NegativeAcksTest extends ProducerConsumerBase { Set<String> sentMessages = new HashSet<>(); final int N = 10; - for (int i = 0; i < N; i++) { + for (int i = 0; i < N * 2; i++) { String value = "test-" + i; producer.sendAsync(value); sentMessages.add(value); @@ -146,11 +146,16 @@ public class NegativeAcksTest extends ProducerConsumerBase { Message<String> msg = consumer.receive(); consumer.negativeAcknowledge(msg); } + for (int i = 0; i < N; i++) { + Message<String> msg = consumer.receive(); + consumer.negativeAcknowledge(msg.getMessageId()); + } + Set<String> receivedMessages = new HashSet<>(); // All the messages should be received again - for (int i = 0; i < N; i++) { + for (int i = 0; i < N * 2; i++) { Message<String> msg = consumer.receive(); receivedMessages.add(msg.getValue()); consumer.acknowledge(msg); @@ -308,9 +313,7 @@ public class NegativeAcksTest extends ProducerConsumerBase { assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); // negative batch message id - unAckedMessageTracker.add(batchMessageId); - unAckedMessageTracker.add(batchMessageId2); - unAckedMessageTracker.add(batchMessageId3); + unAckedMessageTracker.add(messageId); consumer.negativeAcknowledge(batchMessageId); consumer.negativeAcknowledge(batchMessageId2); consumer.negativeAcknowledge(batchMessageId3); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ca5d0f34676..bf703ce047c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -754,7 +754,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle negativeAcksTracker.add(messageId); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" - unAckedMessageTracker.remove(messageId); + unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId)); } @Override