This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f30a4b7fdc5fb6800003a1f95e4665f77688209d
Author: Penghui Li <[email protected]>
AuthorDate: Tue Jul 29 09:40:26 2025 -0700

    [improve][test] Add test for dead letter topic with max unacked messages 
blocking (#24535)
    
    (cherry picked from commit 16a43c679ee9e12ba9900da63569c8c7614d2faf)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 94 ++++++++++++++++++++++
 1 file changed, 94 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 5499b6b129b..a1bd7f3197c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -1174,4 +1174,98 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                         dlqTopic)
                 .isEqualTo(0);
     }
+
+    @Test
+    public void testDeadLetterTopicWithMaxUnackedMessagesBlocking() throws 
Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic-unacked-blocking";
+        final String dlq = 
"persistent://my-property/my-ns/dead-letter-topic-unacked-blocking-my-subscription-DLQ";
+        final int maxRedeliveryCount = 3;
+        final int maxUnackedMessages = 100;
+        final int sendMessages = 1000;
+
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 
maxUnackedMessages);
+        admin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 
maxUnackedMessages);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .deadLetterTopic(dlq)
+                        .build())
+                .receiverQueueSize(200)
+                .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer = 
pulsarClient.newConsumer(Schema.BYTES)
+                .topic(dlq)
+                .subscriptionName("my-subscription")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.newMessage()
+                    .value(String.format("Hello Pulsar [%d]", i).getBytes())
+                    .send();
+        }
+        producer.close();
+
+        Set<String> receivedMessages = new HashSet<>();
+        int totalReceived = 0;
+
+        while (totalReceived < sendMessages * (maxRedeliveryCount + 1)) {
+            try {
+                Message<byte[]> message = consumer.receive();
+                if (message != null) {
+                    String messageContent = new String(message.getData());
+                    receivedMessages.add(messageContent);
+                    totalReceived++;
+                    log.info("Received message: {} (total: {}), redelivery 
count: {}", messageContent,
+                            totalReceived, message.getRedeliveryCount());
+                    consumer.negativeAcknowledge(message);
+                }
+            } catch (Exception e) {
+                log.warn("Exception while receiving message", e);
+                break;
+            }
+        }
+
+        log.info("Total messages received: {}, Expected: {}", totalReceived, 
sendMessages);
+        log.info("Unique messages received: {}", receivedMessages.size());
+
+        int totalInDeadLetter = 0;
+
+        while (totalInDeadLetter < sendMessages) {
+            try {
+                Message<byte[]> message = deadLetterConsumer.receive();
+                if (message != null) {
+                    String messageContent = new String(message.getData());
+                    log.info("Dead letter message received: {}", 
messageContent);
+                    deadLetterConsumer.acknowledge(message);
+                    totalInDeadLetter++;
+                } else {
+                    log.warn("No more messages in DLQ");
+                    break;
+                }
+            } catch (Exception e) {
+                log.warn("Exception while receiving from DLQ", e);
+                break;
+            }
+        }
+
+        log.info("Total messages in dead letter queue: {}, Expected: {}", 
totalInDeadLetter, sendMessages);
+        assertEquals(totalInDeadLetter, sendMessages,
+                "All messages should eventually reach DLQ, but flow control 
may prevent this");
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+    }
 }

Reply via email to