This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eb77b522ba0931ed22b32be5999ea68ad6337f2f Author: congbo <[email protected]> AuthorDate: Sat Aug 27 12:38:08 2022 +0800 [fix][client] Fix reach redeliverCount client can't send messages to DLQ (#17287) (cherry picked from commit 4a28c087fe1308ea4eabc104b3d4889b47316afe) --- .../client/impl/TransactionEndToEndTest.java | 55 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 13 +++-- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 72eef04bad9..e2683afa2df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -67,6 +68,7 @@ import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.internal.DefaultImplementation; +import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -1169,4 +1171,57 @@ public class TransactionEndToEndTest extends TransactionTestBase { assertTrue(ex instanceof PulsarClientException.TimeoutException); } } + + @Test + public void testSendTxnAckMessageToDLQ() throws Exception { + String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ"; + String subName = "test"; + String value = "test"; + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic(topic) + .sendTimeout(1, TimeUnit.SECONDS) + .create(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Shared) + // consumer can't receive the same message three times + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()) + .subscriptionName(subName) + .subscribe(); + + @Cleanup + Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer() + .topic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, + topic, subName)) + .subscriptionType(SubscriptionType.Shared) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()) + .subscriptionName("test") + .subscribe(); + + producer.send(value.getBytes()); + Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES) + .build().get(); + + // consumer receive the message the first time, redeliverCount = 0 + consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get(); + + transaction.abort().get(); + + transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES) + .build().get(); + + // consumer receive the message the second time, redeliverCount = 1, also can be received + consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get(); + + transaction.abort().get(); + + // consumer receive the message the third time, redeliverCount = 2, + // the message will be sent to DLQ, can't receive + assertNull(consumer.receive(3, TimeUnit.SECONDS)); + + assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue())); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 4cb334b38e4..809e7c674b9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1386,10 +1386,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle schema, redeliveryCount, consumerEpoch); uncompressedPayload.release(); - if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null - && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { - possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(), - Collections.singletonList(message)); + if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) { + if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { + possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(), + Collections.singletonList(message)); + if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) { + redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId())); + return; + } + } } executeNotifyCallback(message); } else {
