This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8c528c4dc42 [improve][client] Add message key if exists to deadLetter
messages (#16615)
8c528c4dc42 is described below
commit 8c528c4dc427c8456b3bdff8a35b273bba1d31c3
Author: gaozhangmin <[email protected]>
AuthorDate: Fri Jul 15 18:03:29 2022 +0800
[improve][client] Add message key if exists to deadLetter messages (#16615)
---
.../pulsar/client/api/DeadLetterTopicTest.java | 60 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 10 ++--
2 files changed, 67 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2c780cc999c..cb9a4c3c104 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -66,6 +66,66 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @Test
+ public void testDeadLetterTopicWithMessageKey() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+
+ final int maxRedeliveryCount = 1;
+
+ final int sendMessages = 100;
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+ .receiverQueueSize(100)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
+ Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
+
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .subscriptionName("my-subscription")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .create();
+
+ for (int i = 0; i < sendMessages; i++) {
+ producer.newMessage()
+ .key("test-key")
+ .value(String.format("Hello Pulsar [%d]", i).getBytes())
+ .send();
+ }
+
+ producer.close();
+
+ int totalReceived = 0;
+ do {
+ Message<byte[]> message = consumer.receive();
+ log.info("consumer received message : {} {}",
message.getMessageId(), new String(message.getData()));
+ totalReceived++;
+ } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+ int totalInDeadLetter = 0;
+ do {
+ Message message = deadLetterConsumer.receive();
+ assertEquals(message.getKey(), "test-key");
+ log.info("dead letter consumer received message : {} {}",
message.getMessageId(), new String(message.getData()));
+ deadLetterConsumer.acknowledge(message);
+ totalInDeadLetter++;
+ } while (totalInDeadLetter < sendMessages);
+
+ deadLetterConsumer.close();
+ consumer.close();
+ }
+
+
@Test(groups = "quarantine")
public void testDeadLetterTopic() throws Exception {
final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c9cf4a12b75..16eb49b1af7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2013,10 +2013,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
for (MessageImpl<T> message : finalDeadLetterMessages) {
String originMessageIdStr = getOriginMessageIdStr(message);
String originTopicNameStr = getOriginTopicNameStr(message);
-
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+ TypedMessageBuilder<byte[]> typedMessageBuilderNew =
+
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
- .properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr))
- .sendAsync()
+ .properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr));
+ if (message.hasKey()) {
+ typedMessageBuilderNew.key(message.getKey());
+ }
+ typedMessageBuilderNew.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {