This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5025b8f52a25dd658d1386020184d9fcd35e53cf Author: Penghui Li <[email protected]> AuthorDate: Tue Jul 29 09:40:26 2025 -0700 [improve][test] Add test for dead letter topic with max unacked messages blocking (#24535) (cherry picked from commit 16a43c679ee9e12ba9900da63569c8c7614d2faf) --- .../pulsar/client/api/DeadLetterTopicTest.java | 94 ++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index f624b010534..dd25c82155f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -1322,4 +1322,98 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { dlqTopic) .isEqualTo(0); } + + @Test + public void testDeadLetterTopicWithMaxUnackedMessagesBlocking() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic-unacked-blocking"; + final String dlq = "persistent://my-property/my-ns/dead-letter-topic-unacked-blocking-my-subscription-DLQ"; + final int maxRedeliveryCount = 3; + final int maxUnackedMessages = 100; + final int sendMessages = 1000; + + admin.topics().createNonPartitionedTopic(topic); + admin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, maxUnackedMessages); + admin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, maxUnackedMessages); + + Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliveryCount) + .deadLetterTopic(dlq) + .build()) + .receiverQueueSize(200) + .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(dlq) + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer.newMessage() + .value(String.format("Hello Pulsar [%d]", i).getBytes()) + .send(); + } + producer.close(); + + Set<String> receivedMessages = new HashSet<>(); + int totalReceived = 0; + + while (totalReceived < sendMessages * (maxRedeliveryCount + 1)) { + try { + Message<byte[]> message = consumer.receive(); + if (message != null) { + String messageContent = new String(message.getData()); + receivedMessages.add(messageContent); + totalReceived++; + log.info("Received message: {} (total: {}), redelivery count: {}", messageContent, + totalReceived, message.getRedeliveryCount()); + consumer.negativeAcknowledge(message); + } + } catch (Exception e) { + log.warn("Exception while receiving message", e); + break; + } + } + + log.info("Total messages received: {}, Expected: {}", totalReceived, sendMessages); + log.info("Unique messages received: {}", receivedMessages.size()); + + int totalInDeadLetter = 0; + + while (totalInDeadLetter < sendMessages) { + try { + Message<byte[]> message = deadLetterConsumer.receive(); + if (message != null) { + String messageContent = new String(message.getData()); + log.info("Dead letter message received: {}", messageContent); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } else { + log.warn("No more messages in DLQ"); + break; + } + } catch (Exception e) { + log.warn("Exception while receiving from DLQ", e); + break; + } + } + + log.info("Total messages in dead letter queue: {}, Expected: {}", totalInDeadLetter, sendMessages); + assertEquals(totalInDeadLetter, sendMessages, + "All messages should eventually reach DLQ, but flow control may prevent this"); + + deadLetterConsumer.close(); + consumer.close(); + + } }
