This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c528c4dc42  [improve][client] Add message key if exists to deadLetter 
messages (#16615)
8c528c4dc42 is described below

commit 8c528c4dc427c8456b3bdff8a35b273bba1d31c3
Author: gaozhangmin <[email protected]>
AuthorDate: Fri Jul 15 18:03:29 2022 +0800

     [improve][client] Add message key if exists to deadLetter messages (#16615)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 60 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 10 ++--
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2c780cc999c..cb9a4c3c104 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -66,6 +66,66 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Test
+    public void testDeadLetterTopicWithMessageKey() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 1;
+
+        final int sendMessages = 100;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.newMessage()
+                    .key("test-key")
+                    .value(String.format("Hello Pulsar [%d]", i).getBytes())
+                    .send();
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            assertEquals(message.getKey(), "test-key");
+            log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+    }
+
+
     @Test(groups = "quarantine")
     public void testDeadLetterTopic() throws Exception {
         final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c9cf4a12b75..16eb49b1af7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2013,10 +2013,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 for (MessageImpl<T> message : finalDeadLetterMessages) {
                     String originMessageIdStr = getOriginMessageIdStr(message);
                     String originTopicNameStr = getOriginTopicNameStr(message);
-                    
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+                    TypedMessageBuilder<byte[]> typedMessageBuilderNew =
+                            
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
                             .value(message.getData())
-                            .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr))
-                            .sendAsync()
+                            .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr));
+                    if (message.hasKey()) {
+                        typedMessageBuilderNew.key(message.getKey());
+                    }
+                    typedMessageBuilderNew.sendAsync()
                             .thenAccept(messageIdInDLQ -> {
                                 
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
                                 
acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {

Reply via email to