This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit faefc6c5443ace8fa473c3e4a109df816f28f2f9 Author: Vinkal <[email protected]> AuthorDate: Tue Nov 11 21:06:54 2025 +0530 [improve][client] Deduplicate getTopicsUnderNamespace in BinaryProtoLookupService (#24962) Signed-off-by: Vinkal Chudgar <[email protected]> (cherry picked from commit 190273590e609d446c0c77cc55ce7e5f35efd24f) --- .../client/impl/BinaryProtoLookupService.java | 84 +++++++++++-- .../client/impl/BinaryProtoLookupServiceTest.java | 133 +++++++++++++++++++++ 2 files changed, 206 insertions(+), 11 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 2365ffdfa11..368d9c3809a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -73,6 +73,9 @@ public class BinaryProtoLookupService implements LookupService { private final ConcurrentHashMap<PartitionedTopicMetadataKey, CompletableFuture<PartitionedTopicMetadata>> partitionedMetadataInProgress = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<TopicsUnderNamespaceKey, CompletableFuture<GetTopicsResult>> + topicsUnderNamespaceInProgress = new ConcurrentHashMap<>(); + private final LatencyHistogram histoGetBroker; private final LatencyHistogram histoGetTopicMetadata; private final LatencyHistogram histoGetSchema; @@ -400,17 +403,31 @@ public class BinaryProtoLookupService implements LookupService { Mode mode, String topicsPattern, String topicsHash) { - CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>(); - - AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); - Backoff backoff = new BackoffBuilder() - .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) - .setMax(1, TimeUnit.MINUTES) - .create(); - getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode, - topicsPattern, topicsHash); - return topicsFuture; + final MutableObject<CompletableFuture<GetTopicsResult>> newFutureCreated = new MutableObject<>(); + final TopicsUnderNamespaceKey key = new TopicsUnderNamespaceKey(namespace, mode, topicsPattern, topicsHash); + + try { + return topicsUnderNamespaceInProgress.computeIfAbsent(key, k -> { + CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>(); + AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) + .setMax(1, TimeUnit.MINUTES) + .create(); + + newFutureCreated.setValue(topicsFuture); + getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode, + topicsPattern, topicsHash); + return topicsFuture; + }); + } finally { + if (newFutureCreated.getValue() != null) { + newFutureCreated.getValue().whenComplete((v, ex) -> { + topicsUnderNamespaceInProgress.remove(key, newFutureCreated.getValue()); + }); + } + } } private void getTopicsUnderNamespace( @@ -500,6 +517,51 @@ public class BinaryProtoLookupService implements LookupService { } + private static final class TopicsUnderNamespaceKey { + private final NamespaceName namespace; + private final Mode mode; + private final String topicsPattern; + private final String topicsHash; + + TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode, + String topicsPattern, String topicsHash) { + this.namespace = namespace; + this.mode = mode; + this.topicsPattern = topicsPattern; + this.topicsHash = topicsHash; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o; + return Objects.equals(namespace, that.namespace) + && mode == that.mode + && Objects.equals(topicsPattern, that.topicsPattern) + && Objects.equals(topicsHash, that.topicsHash); + } + + @Override + public int hashCode() { + return Objects.hash(namespace, mode, topicsPattern, topicsHash); + } + + @Override + public String toString() { + return "TopicsUnderNamespaceKey{" + + "namespace=" + namespace + + ", mode=" + mode + + ", topicsPattern='" + topicsPattern + '\'' + + ", topicsHash='" + topicsHash + '\'' + + '}'; + } + } + private static final class PartitionedTopicMetadataKey { private final TopicName topicName; private final boolean metadataAutoCreationEnabled; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index 7dc5e169801..8bc78264ec7 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -50,6 +50,9 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.BaseCommand.Type; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.lookup.GetTopicsResult; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.protocol.Commands; @@ -201,6 +204,136 @@ public class BinaryProtoLookupServiceTest { return lookupResult; } + /** + * Verifies that getTopicsUnderNamespace() deduplicates concurrent requests and cleans up after completion. + * + * First, two concurrent calls with identical parameters should return the same CompletableFuture + * and trigger only one connection pool request (deduplication). + * + * Second, after the future completes, the map entry should be removed so a subsequent call + * with the same parameters creates a new future (cleanup). + * + * This test uses a never-completing connection future to isolate the deduplication logic + * without executing the network request path. + */ + + @Test(timeOut = 60000) + public void testGetTopicsUnderNamespaceDeduplication() throws Exception { + PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool cnxPool = mock(ConnectionPool.class); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setOperationTimeoutMs(30000); + when(client.getConfiguration()).thenReturn(conf); + when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP); + when(client.getCnxPool()).thenReturn(cnxPool); + + // Never-completing connection prevents the thenAcceptAsync callback in getTopicsUnderNamespace from executing, + // isolating only the deduplication logic without network calls. + CompletableFuture<ClientCnx> neverCompletes = new CompletableFuture<>(); + when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(neverCompletes); + + ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched")); + + try (BinaryProtoLookupService lookup = new BinaryProtoLookupService( + client, "pulsar://broker:6650", null, false, scheduler, /*lookupPinnedExecutor*/ null)) { + + NamespaceName ns = NamespaceName.get("public", "default"); + Mode mode = Mode.PERSISTENT; + String pattern = ".*"; + String topicsHash = null; + + CompletableFuture<GetTopicsResult> f1 = lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash); + CompletableFuture<GetTopicsResult> f1b = lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash); + + assertSame(f1b, f1, "Concurrent requests with identical parameters should return the same future"); + + verify(cnxPool, times(1)).getConnection(any(ServiceNameResolver.class)); + + GetTopicsResult payload = new GetTopicsResult(java.util.Collections.emptyList(), null, false, true); + + // Complete the future. This triggers the whenComplete callback that removes the map entry. + f1.complete(payload); + assertTrue(f1.isDone()); + + // Verify cleanup: subsequent call with same parameters creates a new future. + CompletableFuture<GetTopicsResult> f2 = lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash); + assertNotSame(f2, f1, + "After completion, the deduplication map entry should be removed and a new future created"); + verify(cnxPool, times(2)).getConnection(any(ServiceNameResolver.class)); + } finally { + scheduler.shutdownNow(); + } + } + + /** + * Verifies that getTopicsUnderNamespace() treats different topicsHash values as distinct keys for deduplication. + * + * Requests with different topicsHash values should create separate futures and trigger separate connection + * pool requests. Cleanup is per key. Completing one future does not affect another in-flight entry. + * + * This test uses a never-completing connection future to isolate the deduplication logic without executing + * the network request path. + */ + @Test(timeOut = 60000) + public void testGetTopicsUnderNamespaceDeduplicationDifferentHash() throws Exception { + PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool cnxPool = mock(ConnectionPool.class); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setOperationTimeoutMs(30000); + when(client.getConfiguration()).thenReturn(conf); + when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP); + when(client.getCnxPool()).thenReturn(cnxPool); + + // Never-completing connection prevents the thenAcceptAsync callback in getTopicsUnderNamespace from executing, + // isolating only the deduplication logic without network calls. + CompletableFuture<ClientCnx> neverCompletes = new CompletableFuture<>(); + when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(neverCompletes); + + ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched")); + + try (BinaryProtoLookupService lookup = new BinaryProtoLookupService( + client, "pulsar://broker:6650", null, false, scheduler, null)) { + + NamespaceName ns = NamespaceName.get("public", "default"); + Mode mode = Mode.PERSISTENT; + String pattern = ".*"; + + CompletableFuture<GetTopicsResult> futureHashA = lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashA"); + CompletableFuture<GetTopicsResult> futureHashB = lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashB"); + + // Verify different hash values create separate futures. + assertNotSame(futureHashA, futureHashB, + "Requests with different topicsHash must not share the same future"); + + // Verify connection pool called twice, once for each distinct topicsHash. + verify(cnxPool, times(2)).getConnection(any(ServiceNameResolver.class)); + + GetTopicsResult payload = new GetTopicsResult(java.util.Collections.emptyList(), null, false, true); + + futureHashA.complete(payload); + + // Verify cleanup for HashA: subsequent call creates a new future. + CompletableFuture<GetTopicsResult> futureHashA2 = + lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashA"); + assertNotSame(futureHashA2, futureHashA, + "After completion, a call with the same topicsHash must create a new future"); + verify(cnxPool, times(3)).getConnection(any(ServiceNameResolver.class)); + + // Verify HashB still in-flight: subsequent call returns the original future. + CompletableFuture<GetTopicsResult> futureHashB2 = + lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashB"); + assertSame(futureHashB2, futureHashB, + "An in-flight request for the same topicsHash must return the same future"); + verify(cnxPool, times(3)).getConnection(any(ServiceNameResolver.class)); + } finally { + scheduler.shutdownNow(); + } + } + /** * Verifies that getPartitionedTopicMetadata() deduplicates concurrent requests and cleans up after completion. *
