This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 52a4d5ee84f [improve][client]Reduce unnecessary
getPartitionedTopicMetadata requests when using retry and DLQ topics. (#25172)
52a4d5ee84f is described below
commit 52a4d5ee84fad6af2736376a6fcdd1bc41e7c52f
Author: zhenJiangWang <[email protected]>
AuthorDate: Tue Jan 27 21:04:27 2026 +0800
[improve][client]Reduce unnecessary getPartitionedTopicMetadata requests
when using retry and DLQ topics. (#25172)
---
.../pulsar/client/api/DeadLetterTopicTest.java | 90 ++++++++++++++++++++++
.../pulsar/client/impl/ConsumerBuilderImpl.java | 14 +++-
2 files changed, 102 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index df7e42df80b..832e814e4ee 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -19,6 +19,15 @@
package org.apache.pulsar.client.api;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -48,7 +57,11 @@ import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
@@ -1608,4 +1621,81 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
consumer.close();
deadLetterConsumer.close();
}
+
+ @Test
+ public void
testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ when(client.getConfiguration()).thenReturn(clientConf);
+ when(client.getPartitionedTopicMetadata(anyString(), anyBoolean(),
anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0)));
+ when(client.subscribeAsync(any(ConsumerConfigurationData.class),
any(), any()))
+
.thenReturn(CompletableFuture.completedFuture(mock(Consumer.class)));
+
+ // Case 1: DeadLetterPolicy is null
+ ConsumerBuilderImpl<byte[]> consumerBuilder1 = new
ConsumerBuilderImpl<>(client, Schema.BYTES);
+ consumerBuilder1.topic("persistent://public/default/test");
+ consumerBuilder1.subscriptionName("sub");
+ consumerBuilder1.enableRetry(true);
+ consumerBuilder1.subscribeAsync();
+
+ verify(client, times(1)).getPartitionedTopicMetadata(
+ eq("persistent://public/default/sub-RETRY"), anyBoolean(),
anyBoolean());
+ verify(client, times(1)).getPartitionedTopicMetadata(
+ eq("persistent://public/default/sub-DLQ"), anyBoolean(),
anyBoolean());
+
+ clearInvocations(client);
+
+ // Case 2: DeadLetterPolicy with custom Retry topic
+ ConsumerBuilderImpl<byte[]> consumerBuilder2 = new
ConsumerBuilderImpl<>(client, Schema.BYTES);
+ consumerBuilder2.topic("persistent://public/default/test");
+ consumerBuilder2.subscriptionName("sub");
+ consumerBuilder2.enableRetry(true);
+ consumerBuilder2.deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(10)
+ .retryLetterTopic("persistent://public/default/topic-retry")
+ .build());
+ consumerBuilder2.subscribeAsync();
+
+ verify(client, times(0)).getPartitionedTopicMetadata(
+ eq("persistent://public/default/sub-RETRY"), anyBoolean(),
anyBoolean());
+ verify(client, times(1)).getPartitionedTopicMetadata(
+ eq("persistent://public/default/sub-DLQ"), anyBoolean(),
anyBoolean());
+
+ clearInvocations(client);
+
+ // Case 3: DeadLetterPolicy with custom DLQ topic
+ ConsumerBuilderImpl<byte[]> consumerBuilder3 = new
ConsumerBuilderImpl<>(client, Schema.BYTES);
+ consumerBuilder3.topic("persistent://public/default/test");
+ consumerBuilder3.subscriptionName("sub");
+ consumerBuilder3.enableRetry(true);
+ consumerBuilder3.deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(10)
+ .deadLetterTopic("persistent://public/default/topic-dlq")
+ .build());
+ consumerBuilder3.subscribeAsync();
+
+ verify(client, times(1)).getPartitionedTopicMetadata(
+ eq("persistent://public/default/sub-RETRY"), anyBoolean(),
anyBoolean());
+ verify(client, times(0)).getPartitionedTopicMetadata(
+ eq("persistent://public/default/sub-DLQ"), anyBoolean(),
anyBoolean());
+
+ clearInvocations(client);
+
+ // Case 4: DeadLetterPolicy with both custom topics
+ ConsumerBuilderImpl<byte[]> consumerBuilder4 = new
ConsumerBuilderImpl<>(client, Schema.BYTES);
+ consumerBuilder4.topic("persistent://public/default/test");
+ consumerBuilder4.subscriptionName("sub");
+ consumerBuilder4.enableRetry(true);
+ consumerBuilder4.deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(10)
+ .retryLetterTopic("custom-retry")
+ .deadLetterTopic("custom-dlq")
+ .build());
+ consumerBuilder4.subscribeAsync();
+
+ verify(client, times(0)).getPartitionedTopicMetadata(anyString(),
anyBoolean(), anyBoolean());
+ }
+
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 11bf617e2fd..008a9b554e9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -175,8 +175,18 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
if (deadLetterPolicy == null ||
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
||
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
- CompletableFuture<Boolean> retryLetterTopicMetadata =
checkDlqAlreadyExists(oldRetryLetterTopic);
- CompletableFuture<Boolean> deadLetterTopicMetadata =
checkDlqAlreadyExists(oldDeadLetterTopic);
+ CompletableFuture<Boolean> retryLetterTopicMetadata;
+ if (deadLetterPolicy == null ||
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) {
+ retryLetterTopicMetadata =
checkDlqAlreadyExists(oldRetryLetterTopic);
+ } else {
+ retryLetterTopicMetadata =
CompletableFuture.completedFuture(false);
+ }
+ CompletableFuture<Boolean> deadLetterTopicMetadata;
+ if (deadLetterPolicy == null ||
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+ deadLetterTopicMetadata =
checkDlqAlreadyExists(oldDeadLetterTopic);
+ } else {
+ deadLetterTopicMetadata =
CompletableFuture.completedFuture(false);
+ }
applyDLQConfig =
CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
.thenAccept(__ -> {
String retryLetterTopic =
RetryMessageUtil.getRetryTopic(topicFirst.toString(),