This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 71a3994755063298368f738ea47882aba3f92f6a Author: zhenJiangWang <[email protected]> AuthorDate: Tue Jan 27 21:04:27 2026 +0800 [improve][client]Reduce unnecessary getPartitionedTopicMetadata requests when using retry and DLQ topics. (#25172) (cherry picked from commit 52a4d5ee84fad6af2736376a6fcdd1bc41e7c52f) --- .../pulsar/client/api/DeadLetterTopicTest.java | 90 ++++++++++++++++++++++ .../pulsar/client/impl/ConsumerBuilderImpl.java | 14 +++- 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index f5bf31369d8..001edb4de4f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -19,6 +19,15 @@ package org.apache.pulsar.client.api; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -48,7 +57,11 @@ import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -1608,4 +1621,81 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { consumer.close(); deadLetterConsumer.close(); } + + @Test + public void testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + ClientConfigurationData clientConf = new ClientConfigurationData(); + when(client.getConfiguration()).thenReturn(clientConf); + when(client.getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); + when(client.subscribeAsync(any(ConsumerConfigurationData.class), any(), any())) + .thenReturn(CompletableFuture.completedFuture(mock(Consumer.class))); + + // Case 1: DeadLetterPolicy is null + ConsumerBuilderImpl<byte[]> consumerBuilder1 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder1.topic("persistent://public/default/test"); + consumerBuilder1.subscriptionName("sub"); + consumerBuilder1.enableRetry(true); + consumerBuilder1.subscribeAsync(); + + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean()); + + clearInvocations(client); + + // Case 2: DeadLetterPolicy with custom Retry topic + ConsumerBuilderImpl<byte[]> consumerBuilder2 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder2.topic("persistent://public/default/test"); + consumerBuilder2.subscriptionName("sub"); + consumerBuilder2.enableRetry(true); + consumerBuilder2.deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .retryLetterTopic("persistent://public/default/topic-retry") + .build()); + consumerBuilder2.subscribeAsync(); + + verify(client, times(0)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean()); + + clearInvocations(client); + + // Case 3: DeadLetterPolicy with custom DLQ topic + ConsumerBuilderImpl<byte[]> consumerBuilder3 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder3.topic("persistent://public/default/test"); + consumerBuilder3.subscriptionName("sub"); + consumerBuilder3.enableRetry(true); + consumerBuilder3.deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .deadLetterTopic("persistent://public/default/topic-dlq") + .build()); + consumerBuilder3.subscribeAsync(); + + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); + verify(client, times(0)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean()); + + clearInvocations(client); + + // Case 4: DeadLetterPolicy with both custom topics + ConsumerBuilderImpl<byte[]> consumerBuilder4 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder4.topic("persistent://public/default/test"); + consumerBuilder4.subscriptionName("sub"); + consumerBuilder4.enableRetry(true); + consumerBuilder4.deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .retryLetterTopic("custom-retry") + .deadLetterTopic("custom-dlq") + .build()); + consumerBuilder4.subscribeAsync(); + + verify(client, times(0)).getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean()); + } + + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index ab64119cb8f..50addea57d1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -163,8 +163,18 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy(); if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic()) || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { - CompletableFuture<Boolean> retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic); - CompletableFuture<Boolean> deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic); + CompletableFuture<Boolean> retryLetterTopicMetadata; + if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) { + retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic); + } else { + retryLetterTopicMetadata = CompletableFuture.completedFuture(false); + } + CompletableFuture<Boolean> deadLetterTopicMetadata; + if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { + deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic); + } else { + deadLetterTopicMetadata = CompletableFuture.completedFuture(false); + } applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata) .thenAccept(__ -> { String retryLetterTopic = RetryMessageUtil.getRetryTopic(topicFirst.toString(),
