This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new c1aab487f19 [fix][client] Fix negative acknowledgement by messageId (#23060) c1aab487f19 is described below commit c1aab487f199bdc2cc671694272fd733b441e829 Author: Hideaki Oguni <22386882+izum...@users.noreply.github.com> AuthorDate: Mon Jul 29 16:29:59 2024 +0900 [fix][client] Fix negative acknowledgement by messageId (#23060) (cherry picked from commit d4bbf10f58771e2d43e576dc3422e502834b1de4) --- .../org/apache/pulsar/client/impl/NegativeAcksTest.java | 13 ++++++++----- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index a6b77a1c727..a41b7f05a8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -134,7 +134,7 @@ public class NegativeAcksTest extends ProducerConsumerBase { Set<String> sentMessages = new HashSet<>(); final int N = 10; - for (int i = 0; i < N; i++) { + for (int i = 0; i < N * 2; i++) { String value = "test-" + i; producer.sendAsync(value); sentMessages.add(value); @@ -146,13 +146,18 @@ public class NegativeAcksTest extends ProducerConsumerBase { consumer.negativeAcknowledge(msg); } + for (int i = 0; i < N; i++) { + Message<String> msg = consumer.receive(); + consumer.negativeAcknowledge(msg.getMessageId()); + } + assertTrue(consumer instanceof ConsumerBase<String>); assertEquals(((ConsumerBase<String>) consumer).getUnAckedMessageTracker().size(), 0); Set<String> receivedMessages = new HashSet<>(); // All the messages should be received again - for (int i = 0; i < N; i++) { + for (int i = 0; i < N * 2; i++) { Message<String> msg = consumer.receive(); receivedMessages.add(msg.getValue()); consumer.acknowledge(msg); @@ -310,9 +315,7 @@ public class NegativeAcksTest extends ProducerConsumerBase { assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); // negative batch message id - unAckedMessageTracker.add(batchMessageId); - unAckedMessageTracker.add(batchMessageId2); - unAckedMessageTracker.add(batchMessageId3); + unAckedMessageTracker.add(messageId); consumer.negativeAcknowledge(batchMessageId); consumer.negativeAcknowledge(batchMessageId2); consumer.negativeAcknowledge(batchMessageId3); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6744a65d556..4b21d0908bc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -765,7 +765,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle negativeAcksTracker.add(messageId); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" - unAckedMessageTracker.remove(messageId); + unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId)); } @Override