This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 731897244c0e2ba81f8866202420e461c37e7eac Author: congbo <[email protected]> AuthorDate: Sun Aug 28 20:10:57 2022 +0800 [fix][client] Fix reach redeliverCount client can't send batch messages to DLQ (#17317) (cherry picked from commit 0909853873fc61395df7a68de13335d9f770383a) --- .../client/impl/TransactionEndToEndTest.java | 66 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 11 +++- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index e2683afa2df..1b114922c71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -1180,6 +1180,7 @@ public class TransactionEndToEndTest extends TransactionTestBase { @Cleanup ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() .topic(topic) + .enableBatching(false) .sendTimeout(1, TimeUnit.SECONDS) .create(); @@ -1224,4 +1225,69 @@ public class TransactionEndToEndTest extends TransactionTestBase { assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue())); } + + @Test + public void testSendTxnAckBatchMessageToDLQ() throws Exception { + String topic = NAMESPACE1 + "/testSendTxnAckBatchMessageToDLQ"; + String subName = "test"; + String value1 = "test1"; + String value2 = "test2"; + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic(topic) + .sendTimeout(1, TimeUnit.SECONDS) + .create(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + // consumer can't receive the same message three times + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()) + .subscriptionName(subName) + .subscribe(); + + @Cleanup + Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer() + .topic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, + topic, subName)) + .subscriptionType(SubscriptionType.Shared) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()) + .subscriptionName("test") + .subscribe(); + + producer.sendAsync(value1.getBytes()); + producer.sendAsync(value2.getBytes()); + Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES) + .build().get(); + + Message<byte[]> message = consumer.receive(); + assertEquals(value1, new String(message.getValue())); + // consumer receive the batch message one the first time, redeliverCount = 0 + consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); + + transaction.abort().get(); + + // consumer will receive the batch message two and then receive + // the message one and message two again, redeliverCount = 1 + for (int i = 0; i < 3; i ++) { + message = consumer.receive(); + } + + transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES) + .build().get(); + + assertEquals(value2, new String(message.getValue())); + // consumer receive the batch message two the second time, redeliverCount = 1, also can be received + consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); + + transaction.abort().get(); + + // consumer receive the batch message the third time, redeliverCount = 2, + // the message will be sent to DLQ, can't receive + assertNull(consumer.receive(3, TimeUnit.SECONDS)); + + assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue())); + assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue())); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 809e7c674b9..d8f85bcfbaa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1570,8 +1570,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError); } - if (possibleToDeadLetter != null && possibleSendToDeadLetterTopicMessages != null) { - possibleSendToDeadLetterTopicMessages.put(batchMessage, possibleToDeadLetter); + if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) { + if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { + possibleSendToDeadLetterTopicMessages.put(batchMessage, + possibleToDeadLetter); + if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) { + redeliverUnacknowledgedMessages(Collections.singleton(batchMessage)); + return; + } + } } if (log.isDebugEnabled()) {
