This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 52a4d5ee84f [improve][client]Reduce unnecessary 
getPartitionedTopicMetadata requests when using retry and DLQ topics. (#25172)
52a4d5ee84f is described below

commit 52a4d5ee84fad6af2736376a6fcdd1bc41e7c52f
Author: zhenJiangWang <[email protected]>
AuthorDate: Tue Jan 27 21:04:27 2026 +0800

    [improve][client]Reduce unnecessary getPartitionedTopicMetadata requests 
when using retry and DLQ topics. (#25172)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 90 ++++++++++++++++++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 14 +++-
 2 files changed, 102 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index df7e42df80b..832e814e4ee 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -19,6 +19,15 @@
 package org.apache.pulsar.client.api;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -48,7 +57,11 @@ import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
@@ -1608,4 +1621,81 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         consumer.close();
         deadLetterConsumer.close();
     }
+
+    @Test
+    public void 
testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        when(client.getConfiguration()).thenReturn(clientConf);
+        when(client.getPartitionedTopicMetadata(anyString(), anyBoolean(), 
anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0)));
+        when(client.subscribeAsync(any(ConsumerConfigurationData.class), 
any(), any()))
+                
.thenReturn(CompletableFuture.completedFuture(mock(Consumer.class)));
+
+        // Case 1: DeadLetterPolicy is null
+        ConsumerBuilderImpl<byte[]> consumerBuilder1 = new 
ConsumerBuilderImpl<>(client, Schema.BYTES);
+        consumerBuilder1.topic("persistent://public/default/test");
+        consumerBuilder1.subscriptionName("sub");
+        consumerBuilder1.enableRetry(true);
+        consumerBuilder1.subscribeAsync();
+
+        verify(client, times(1)).getPartitionedTopicMetadata(
+                eq("persistent://public/default/sub-RETRY"), anyBoolean(), 
anyBoolean());
+        verify(client, times(1)).getPartitionedTopicMetadata(
+                eq("persistent://public/default/sub-DLQ"), anyBoolean(), 
anyBoolean());
+
+        clearInvocations(client);
+
+        // Case 2: DeadLetterPolicy with custom Retry topic
+        ConsumerBuilderImpl<byte[]> consumerBuilder2 = new 
ConsumerBuilderImpl<>(client, Schema.BYTES);
+        consumerBuilder2.topic("persistent://public/default/test");
+        consumerBuilder2.subscriptionName("sub");
+        consumerBuilder2.enableRetry(true);
+        consumerBuilder2.deadLetterPolicy(DeadLetterPolicy.builder()
+                .maxRedeliverCount(10)
+                .retryLetterTopic("persistent://public/default/topic-retry")
+                .build());
+        consumerBuilder2.subscribeAsync();
+
+        verify(client, times(0)).getPartitionedTopicMetadata(
+                eq("persistent://public/default/sub-RETRY"), anyBoolean(), 
anyBoolean());
+        verify(client, times(1)).getPartitionedTopicMetadata(
+                eq("persistent://public/default/sub-DLQ"), anyBoolean(), 
anyBoolean());
+
+        clearInvocations(client);
+
+        // Case 3: DeadLetterPolicy with custom DLQ topic
+        ConsumerBuilderImpl<byte[]> consumerBuilder3 = new 
ConsumerBuilderImpl<>(client, Schema.BYTES);
+        consumerBuilder3.topic("persistent://public/default/test");
+        consumerBuilder3.subscriptionName("sub");
+        consumerBuilder3.enableRetry(true);
+        consumerBuilder3.deadLetterPolicy(DeadLetterPolicy.builder()
+                .maxRedeliverCount(10)
+                .deadLetterTopic("persistent://public/default/topic-dlq")
+                .build());
+        consumerBuilder3.subscribeAsync();
+
+        verify(client, times(1)).getPartitionedTopicMetadata(
+                eq("persistent://public/default/sub-RETRY"), anyBoolean(), 
anyBoolean());
+        verify(client, times(0)).getPartitionedTopicMetadata(
+                eq("persistent://public/default/sub-DLQ"), anyBoolean(), 
anyBoolean());
+
+        clearInvocations(client);
+
+        // Case 4: DeadLetterPolicy with both custom topics
+        ConsumerBuilderImpl<byte[]> consumerBuilder4 = new 
ConsumerBuilderImpl<>(client, Schema.BYTES);
+        consumerBuilder4.topic("persistent://public/default/test");
+        consumerBuilder4.subscriptionName("sub");
+        consumerBuilder4.enableRetry(true);
+        consumerBuilder4.deadLetterPolicy(DeadLetterPolicy.builder()
+                .maxRedeliverCount(10)
+                .retryLetterTopic("custom-retry")
+                .deadLetterTopic("custom-dlq")
+                .build());
+        consumerBuilder4.subscribeAsync();
+
+        verify(client, times(0)).getPartitionedTopicMetadata(anyString(), 
anyBoolean(), anyBoolean());
+    }
+
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 11bf617e2fd..008a9b554e9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -175,8 +175,18 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
             DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
             if (deadLetterPolicy == null || 
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
                     || 
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
-                CompletableFuture<Boolean> retryLetterTopicMetadata = 
checkDlqAlreadyExists(oldRetryLetterTopic);
-                CompletableFuture<Boolean> deadLetterTopicMetadata = 
checkDlqAlreadyExists(oldDeadLetterTopic);
+                CompletableFuture<Boolean> retryLetterTopicMetadata;
+                if (deadLetterPolicy == null || 
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) {
+                    retryLetterTopicMetadata = 
checkDlqAlreadyExists(oldRetryLetterTopic);
+                } else {
+                    retryLetterTopicMetadata = 
CompletableFuture.completedFuture(false);
+                }
+                CompletableFuture<Boolean> deadLetterTopicMetadata;
+                if (deadLetterPolicy == null || 
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                    deadLetterTopicMetadata = 
checkDlqAlreadyExists(oldDeadLetterTopic);
+                } else {
+                    deadLetterTopicMetadata = 
CompletableFuture.completedFuture(false);
+                }
                 applyDLQConfig = 
CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
                         .thenAccept(__ -> {
                             String retryLetterTopic = 
RetryMessageUtil.getRetryTopic(topicFirst.toString(),

Reply via email to