This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new d90da78a419 [improve][client] Deduplicate getTopicsUnderNamespace in
BinaryProtoLookupService (#24962)
d90da78a419 is described below
commit d90da78a419b4ff48ce3ab034f4e4a6fdee6a7c9
Author: Vinkal <[email protected]>
AuthorDate: Tue Nov 11 21:06:54 2025 +0530
[improve][client] Deduplicate getTopicsUnderNamespace in
BinaryProtoLookupService (#24962)
Signed-off-by: Vinkal Chudgar <[email protected]>
(cherry picked from commit 190273590e609d446c0c77cc55ce7e5f35efd24f)
---
.../client/impl/BinaryProtoLookupService.java | 84 +++++++++++--
.../client/impl/BinaryProtoLookupServiceTest.java | 133 +++++++++++++++++++++
2 files changed, 206 insertions(+), 11 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 2c520e06d36..a8c8d1d1d42 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -73,6 +73,9 @@ public class BinaryProtoLookupService implements
LookupService {
private final ConcurrentHashMap<PartitionedTopicMetadataKey,
CompletableFuture<PartitionedTopicMetadata>>
partitionedMetadataInProgress = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<TopicsUnderNamespaceKey,
CompletableFuture<GetTopicsResult>>
+ topicsUnderNamespaceInProgress = new ConcurrentHashMap<>();
+
private final LatencyHistogram histoGetBroker;
private final LatencyHistogram histoGetTopicMetadata;
private final LatencyHistogram histoGetSchema;
@@ -398,17 +401,31 @@ public class BinaryProtoLookupService implements
LookupService {
Mode mode,
String topicsPattern,
String topicsHash) {
- CompletableFuture<GetTopicsResult> topicsFuture = new
CompletableFuture<>();
-
- AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
- .setMax(1, TimeUnit.MINUTES)
- .create();
- getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace,
backoff, opTimeoutMs, topicsFuture, mode,
- topicsPattern, topicsHash);
- return topicsFuture;
+ final MutableObject<CompletableFuture<GetTopicsResult>>
newFutureCreated = new MutableObject<>();
+ final TopicsUnderNamespaceKey key = new
TopicsUnderNamespaceKey(namespace, mode, topicsPattern, topicsHash);
+
+ try {
+ return topicsUnderNamespaceInProgress.computeIfAbsent(key, k -> {
+ CompletableFuture<GetTopicsResult> topicsFuture = new
CompletableFuture<>();
+ AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(opTimeoutMs.get() * 2,
TimeUnit.MILLISECONDS)
+ .setMax(1, TimeUnit.MINUTES)
+ .create();
+
+ newFutureCreated.setValue(topicsFuture);
+ getTopicsUnderNamespace(serviceNameResolver.resolveHost(),
namespace, backoff, opTimeoutMs,
+ topicsFuture, mode, topicsPattern, topicsHash);
+ return topicsFuture;
+ });
+ } finally {
+ if (newFutureCreated.getValue() != null) {
+ newFutureCreated.getValue().whenComplete((v, ex) -> {
+ topicsUnderNamespaceInProgress.remove(key,
newFutureCreated.getValue());
+ });
+ }
+ }
}
private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
@@ -499,6 +516,51 @@ public class BinaryProtoLookupService implements
LookupService {
}
+ private static final class TopicsUnderNamespaceKey {
+ private final NamespaceName namespace;
+ private final Mode mode;
+ private final String topicsPattern;
+ private final String topicsHash;
+
+ TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode,
+ String topicsPattern, String topicsHash) {
+ this.namespace = namespace;
+ this.mode = mode;
+ this.topicsPattern = topicsPattern;
+ this.topicsHash = topicsHash;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o;
+ return Objects.equals(namespace, that.namespace)
+ && mode == that.mode
+ && Objects.equals(topicsPattern, that.topicsPattern)
+ && Objects.equals(topicsHash, that.topicsHash);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(namespace, mode, topicsPattern, topicsHash);
+ }
+
+ @Override
+ public String toString() {
+ return "TopicsUnderNamespaceKey{"
+ + "namespace=" + namespace
+ + ", mode=" + mode
+ + ", topicsPattern='" + topicsPattern + '\''
+ + ", topicsHash='" + topicsHash + '\''
+ + '}';
+ }
+ }
+
private static final class PartitionedTopicMetadataKey {
private final TopicName topicName;
private final boolean metadataAutoCreationEnabled;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index 3661492bfe3..0a121a26b30 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -50,6 +50,9 @@ import
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import org.apache.pulsar.common.lookup.GetTopicsResult;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.Commands;
@@ -200,6 +203,136 @@ public class BinaryProtoLookupServiceTest {
return lookupResult;
}
+ /**
+ * Verifies that getTopicsUnderNamespace() deduplicates concurrent
requests and cleans up after completion.
+ *
+ * First, two concurrent calls with identical parameters should return the
same CompletableFuture
+ * and trigger only one connection pool request (deduplication).
+ *
+ * Second, after the future completes, the map entry should be removed so
a subsequent call
+ * with the same parameters creates a new future (cleanup).
+ *
+ * This test uses a never-completing connection future to isolate the
deduplication logic
+ * without executing the network request path.
+ */
+
+ @Test(timeOut = 60000)
+ public void testGetTopicsUnderNamespaceDeduplication() throws Exception {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool cnxPool = mock(ConnectionPool.class);
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setOperationTimeoutMs(30000);
+ when(client.getConfiguration()).thenReturn(conf);
+ when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+ when(client.getCnxPool()).thenReturn(cnxPool);
+
+ // Never-completing connection prevents the thenAcceptAsync callback
in getTopicsUnderNamespace from executing,
+ // isolating only the deduplication logic without network calls.
+ CompletableFuture<ClientCnx> neverCompletes = new
CompletableFuture<>();
+
when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(neverCompletes);
+
+ ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("lookup-test-sched"));
+
+ try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
+ client, "pulsar://broker:6650", null, false, scheduler,
/*lookupPinnedExecutor*/ null)) {
+
+ NamespaceName ns = NamespaceName.get("public", "default");
+ Mode mode = Mode.PERSISTENT;
+ String pattern = ".*";
+ String topicsHash = null;
+
+ CompletableFuture<GetTopicsResult> f1 =
lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash);
+ CompletableFuture<GetTopicsResult> f1b =
lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash);
+
+ assertSame(f1b, f1, "Concurrent requests with identical parameters
should return the same future");
+
+ verify(cnxPool,
times(1)).getConnection(any(InetSocketAddress.class));
+
+ GetTopicsResult payload = new
GetTopicsResult(java.util.Collections.emptyList(), null, false, true);
+
+ // Complete the future. This triggers the whenComplete callback
that removes the map entry.
+ f1.complete(payload);
+ assertTrue(f1.isDone());
+
+ // Verify cleanup: subsequent call with same parameters creates a
new future.
+ CompletableFuture<GetTopicsResult> f2 =
lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash);
+ assertNotSame(f2, f1,
+ "After completion, the deduplication map entry should be
removed and a new future created");
+ verify(cnxPool,
times(2)).getConnection(any(InetSocketAddress.class));
+ } finally {
+ scheduler.shutdownNow();
+ }
+ }
+
+ /**
+ * Verifies that getTopicsUnderNamespace() treats different topicsHash
values as distinct keys for deduplication.
+ *
+ * Requests with different topicsHash values should create separate
futures and trigger separate connection
+ * pool requests. Cleanup is per key. Completing one future does not
affect another in-flight entry.
+ *
+ * This test uses a never-completing connection future to isolate the
deduplication logic without executing
+ * the network request path.
+ */
+ @Test(timeOut = 60000)
+ public void testGetTopicsUnderNamespaceDeduplicationDifferentHash() throws
Exception {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool cnxPool = mock(ConnectionPool.class);
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setOperationTimeoutMs(30000);
+ when(client.getConfiguration()).thenReturn(conf);
+ when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+ when(client.getCnxPool()).thenReturn(cnxPool);
+
+ // Never-completing connection prevents the thenAcceptAsync callback
in getTopicsUnderNamespace from executing,
+ // isolating only the deduplication logic without network calls.
+ CompletableFuture<ClientCnx> neverCompletes = new
CompletableFuture<>();
+
when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(neverCompletes);
+
+ ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("lookup-test-sched"));
+
+ try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
+ client, "pulsar://broker:6650", null, false, scheduler, null))
{
+
+ NamespaceName ns = NamespaceName.get("public", "default");
+ Mode mode = Mode.PERSISTENT;
+ String pattern = ".*";
+
+ CompletableFuture<GetTopicsResult> futureHashA =
lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashA");
+ CompletableFuture<GetTopicsResult> futureHashB =
lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashB");
+
+ // Verify different hash values create separate futures.
+ assertNotSame(futureHashA, futureHashB,
+ "Requests with different topicsHash must not share the
same future");
+
+ // Verify connection pool called twice, once for each distinct
topicsHash.
+ verify(cnxPool,
times(2)).getConnection(any(InetSocketAddress.class));
+
+ GetTopicsResult payload = new
GetTopicsResult(java.util.Collections.emptyList(), null, false, true);
+
+ futureHashA.complete(payload);
+
+ // Verify cleanup for HashA: subsequent call creates a new future.
+ CompletableFuture<GetTopicsResult> futureHashA2 =
+ lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashA");
+ assertNotSame(futureHashA2, futureHashA,
+ "After completion, a call with the same topicsHash must
create a new future");
+ verify(cnxPool,
times(3)).getConnection(any(InetSocketAddress.class));
+
+ // Verify HashB still in-flight: subsequent call returns the
original future.
+ CompletableFuture<GetTopicsResult> futureHashB2 =
+ lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashB");
+ assertSame(futureHashB2, futureHashB,
+ "An in-flight request for the same topicsHash must return
the same future");
+ verify(cnxPool,
times(3)).getConnection(any(InetSocketAddress.class));
+ } finally {
+ scheduler.shutdownNow();
+ }
+ }
+
/**
* Verifies that getPartitionedTopicMetadata() deduplicates concurrent
requests and cleans up after completion.
*