This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b37b5b3fa01 [improve][client] Deduplicate in-progress lookup requests 
also for HttpLookupService (#25017)
b37b5b3fa01 is described below

commit b37b5b3fa01494fd6541f4d460f3586443c221b6
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 2 17:33:44 2025 +0200

    [improve][client] Deduplicate in-progress lookup requests also for 
HttpLookupService (#25017)
---
 .../pulsar/client/api/BrokerServiceLookupTest.java |   7 +-
 .../apache/pulsar/client/api/SimpleSchemaTest.java |   6 +-
 .../client/impl/BinaryProtoLookupService.java      | 205 +++------------
 .../pulsar/client/impl/HttpLookupService.java      |  14 +-
 ...rogressDeduplicationDecoratorLookupService.java | 276 +++++++++++++++++++++
 .../apache/pulsar/client/impl/LookupService.java   |  26 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  22 +-
 .../client/impl/BinaryProtoLookupServiceTest.java  |  21 +-
 .../proxy/server/ProxyLookupThrottlingTest.java    |   3 +-
 .../server/ProxyWithExtensibleLoadManagerTest.java |  13 +-
 .../pulsar/websocket/LookupProtocolTest.java       |  29 +--
 11 files changed, 388 insertions(+), 234 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index ddbe6547213..e00e6d69ce8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -82,7 +82,6 @@ import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
-import org.apache.pulsar.client.impl.BinaryProtoLookupService;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -957,7 +956,7 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase implements ITe
         // Assert the lookup service is a "BinaryProtoLookupService".
         final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) 
pulsarClient;
         final LookupService lookupService = pulsarClientImpl.getLookup();
-        assertTrue(lookupService instanceof BinaryProtoLookupService);
+        assertTrue(lookupService.isBinaryProtoLookupService());
 
         final String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
         final int topicPartitions = 10;
@@ -981,7 +980,7 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase implements ITe
         // Assert the lookup service is a "BinaryProtoLookupService".
         final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) 
pulsarClient;
         final LookupService lookupService = pulsarClientImpl.getLookup();
-        assertTrue(lookupService instanceof BinaryProtoLookupService);
+        assertTrue(lookupService.isBinaryProtoLookupService());
 
         final String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
         admin.topics().createNonPartitionedTopic(tpName);
@@ -1234,7 +1233,7 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase implements ITe
         Consumer<String> consumer = 
pulsarClientImpl.newConsumer(Schema.STRING).topic(tpName)
                 .subscriptionName("s1").isAckReceiptEnabled(true).subscribe();
         LookupService lookupService = pulsarClientImpl.getLookup();
-        assertTrue(lookupService instanceof BinaryProtoLookupService);
+        assertTrue(lookupService.isBinaryProtoLookupService());
         ClientCnx lookupConnection = 
pulsarClientImpl.getCnxPool().getConnection(lookupService.resolveHost()).join();
 
         var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 3a7f65f0b07..d581e0a6b32 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -49,10 +49,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import 
org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
 import 
org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.BinaryProtoLookupService;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.HttpLookupService;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -1262,8 +1260,8 @@ public class SimpleSchemaTest extends 
ProducerConsumerBase {
 
         LookupService httpLookupService = httpProtocolClient.getLookup();
         LookupService binaryLookupService = binaryProtocolClient.getLookup();
-        Assert.assertTrue(httpLookupService instanceof HttpLookupService);
-        Assert.assertTrue(binaryLookupService instanceof 
BinaryProtoLookupService);
+        Assert.assertTrue(!httpLookupService.isBinaryProtoLookupService());
+        Assert.assertTrue(binaryLookupService.isBinaryProtoLookupService());
         Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
         Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic),
                 ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 368d9c3809a..674fb40793f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -26,17 +26,13 @@ import io.opentelemetry.api.common.Attributes;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
@@ -66,16 +62,6 @@ public class BinaryProtoLookupService implements 
LookupService {
     private final int maxLookupRedirects;
     private final ExecutorService lookupPinnedExecutor;
     private final boolean createdLookupPinnedExecutor;
-
-    private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, 
CompletableFuture<LookupTopicResult>>
-            lookupInProgress = new ConcurrentHashMap<>();
-
-    private final ConcurrentHashMap<PartitionedTopicMetadataKey, 
CompletableFuture<PartitionedTopicMetadata>>
-            partitionedMetadataInProgress = new ConcurrentHashMap<>();
-
-    private final ConcurrentHashMap<TopicsUnderNamespaceKey, 
CompletableFuture<GetTopicsResult>>
-            topicsUnderNamespaceInProgress = new ConcurrentHashMap<>();
-
     private final LatencyHistogram histoGetBroker;
     private final LatencyHistogram histoGetTopicMetadata;
     private final LatencyHistogram histoGetSchema;
@@ -156,32 +142,20 @@ public class BinaryProtoLookupService implements 
LookupService {
      *            topic-name
      * @return broker-socket-address that serves given topic
      */
-    public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) 
{
-        long startTime = System.nanoTime();
-        final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
-        final Pair<TopicName, Map<String, String>> key = Pair.of(topicName,
-                client.getConfiguration().getLookupProperties());
-        try {
-            return lookupInProgress.computeIfAbsent(key, tpName -> {
-                CompletableFuture<LookupTopicResult> newFuture = 
findBroker(serviceNameResolver.resolveHost(), false,
-                        topicName, 0, key.getRight());
-                newFutureCreated.setValue(newFuture);
-
-                newFuture.thenRun(() -> {
-                    histoGetBroker.recordSuccess(System.nanoTime() - 
startTime);
-                }).exceptionally(x -> {
-                    histoGetBroker.recordFailure(System.nanoTime() - 
startTime);
-                    return null;
-                });
-                return newFuture;
-            });
-        } finally {
-            if (newFutureCreated.getValue() != null) {
-                newFutureCreated.getValue().whenComplete((v, ex) -> {
-                    lookupInProgress.remove(key, newFutureCreated.getValue());
-                });
-            }
+    public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName, 
Map<String, String> lookupProperties) {
+        if (lookupProperties == null) {
+            lookupProperties = client.getConfiguration().getLookupProperties();
         }
+        long startTime = System.nanoTime();
+        CompletableFuture<LookupTopicResult> newFuture = 
findBroker(serviceNameResolver.resolveHost(), false,
+                topicName, 0, lookupProperties);
+        newFuture.thenRun(() -> {
+            histoGetBroker.recordSuccess(System.nanoTime() - startTime);
+        }).exceptionally(x -> {
+            histoGetBroker.recordFailure(System.nanoTime() - startTime);
+            return null;
+        });
+        return newFuture;
     }
 
     /**
@@ -191,24 +165,7 @@ public class BinaryProtoLookupService implements 
LookupService {
     @Override
     public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(
             TopicName topicName, boolean metadataAutoCreationEnabled, boolean 
useFallbackForNonPIP344Brokers) {
-        final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
-        final PartitionedTopicMetadataKey key = new 
PartitionedTopicMetadataKey(
-                topicName, metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
-        try {
-            return partitionedMetadataInProgress.computeIfAbsent(key, k -> {
-                CompletableFuture<PartitionedTopicMetadata> newFuture = 
getPartitionedTopicMetadataAsync(
-                       topicName, metadataAutoCreationEnabled,
-                        useFallbackForNonPIP344Brokers);
-                newFutureCreated.setValue(newFuture);
-                return newFuture;
-            });
-        } finally {
-            if (newFutureCreated.getValue() != null) {
-                newFutureCreated.getValue().whenComplete((v, ex) -> {
-                    partitionedMetadataInProgress.remove(key, 
newFutureCreated.getValue());
-                });
-            }
-        }
+        return getPartitionedTopicMetadataAsync(topicName, 
metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers);
     }
 
     private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress 
socketAddress,
@@ -351,12 +308,6 @@ public class BinaryProtoLookupService implements 
LookupService {
         return partitionFuture;
     }
 
-    @Override
-    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName) {
-        return getSchema(topicName, null);
-    }
-
-
     @Override
     public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName, byte[] version) {
         long startTime = System.nanoTime();
@@ -403,31 +354,21 @@ public class BinaryProtoLookupService implements 
LookupService {
                                                                                
   Mode mode,
                                                                                
   String topicsPattern,
                                                                                
   String topicsHash) {
-        final MutableObject<CompletableFuture<GetTopicsResult>> 
newFutureCreated = new MutableObject<>();
-        final TopicsUnderNamespaceKey key = new 
TopicsUnderNamespaceKey(namespace, mode, topicsPattern, topicsHash);
-
-        try {
-            return topicsUnderNamespaceInProgress.computeIfAbsent(key, k -> {
-                CompletableFuture<GetTopicsResult> topicsFuture = new 
CompletableFuture<>();
-                AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
-                Backoff backoff = new BackoffBuilder()
-                        .setInitialTime(100, TimeUnit.MILLISECONDS)
-                        .setMandatoryStop(opTimeoutMs.get() * 2, 
TimeUnit.MILLISECONDS)
-                        .setMax(1, TimeUnit.MINUTES)
-                        .create();
-
-                newFutureCreated.setValue(topicsFuture);
-                getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, 
topicsFuture, mode,
-                        topicsPattern, topicsHash);
-                return topicsFuture;
-            });
-        } finally {
-            if (newFutureCreated.getValue() != null) {
-                newFutureCreated.getValue().whenComplete((v, ex) -> {
-                    topicsUnderNamespaceInProgress.remove(key, 
newFutureCreated.getValue());
-                });
-            }
-        }
+        CompletableFuture<GetTopicsResult> topicsFuture = new 
CompletableFuture<>();
+        AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(100, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+                .setMax(1, TimeUnit.MINUTES)
+                .create();
+        getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, 
mode,
+                topicsPattern, topicsHash);
+        return topicsFuture;
+    }
+
+    @Override
+    public boolean isBinaryProtoLookupService() {
+        return true;
     }
 
     private void getTopicsUnderNamespace(
@@ -517,93 +458,5 @@ public class BinaryProtoLookupService implements 
LookupService {
 
     }
 
-    private static final class TopicsUnderNamespaceKey {
-        private final NamespaceName namespace;
-        private final Mode mode;
-        private final String topicsPattern;
-        private final String topicsHash;
-
-        TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode,
-                                String topicsPattern, String topicsHash) {
-            this.namespace = namespace;
-            this.mode = mode;
-            this.topicsPattern = topicsPattern;
-            this.topicsHash = topicsHash;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o;
-            return Objects.equals(namespace, that.namespace)
-                    && mode == that.mode
-                    && Objects.equals(topicsPattern, that.topicsPattern)
-                    && Objects.equals(topicsHash, that.topicsHash);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(namespace, mode, topicsPattern, topicsHash);
-        }
-
-        @Override
-        public String toString() {
-            return "TopicsUnderNamespaceKey{"
-                    + "namespace=" + namespace
-                    + ", mode=" + mode
-                    + ", topicsPattern='" + topicsPattern + '\''
-                    + ", topicsHash='" + topicsHash + '\''
-                    + '}';
-        }
-    }
-
-    private static final class PartitionedTopicMetadataKey {
-        private final TopicName topicName;
-        private final boolean metadataAutoCreationEnabled;
-        private final boolean useFallbackForNonPIP344Brokers;
-
-        PartitionedTopicMetadataKey(TopicName topicName,
-                               boolean metadataAutoCreationEnabled,
-                               boolean useFallbackForNonPIP344Brokers) {
-            this.topicName = topicName;
-            this.metadataAutoCreationEnabled = metadataAutoCreationEnabled;
-            this.useFallbackForNonPIP344Brokers = 
useFallbackForNonPIP344Brokers;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            PartitionedTopicMetadataKey that = (PartitionedTopicMetadataKey) o;
-            return metadataAutoCreationEnabled == 
that.metadataAutoCreationEnabled
-                    && useFallbackForNonPIP344Brokers == 
that.useFallbackForNonPIP344Brokers
-                    && Objects.equals(topicName, that.topicName);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(topicName, metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
-        }
-
-        @Override
-        public String toString() {
-            return "PartitionedTopicMetadataKey{"
-                    + "topicName=" + topicName
-                    + ", metadataAutoCreationEnabled=" + 
metadataAutoCreationEnabled
-                    + ", useFallbackForNonPIP344Brokers=" + 
useFallbackForNonPIP344Brokers
-                    + '}';
-        }
-    }
-
-
     private static final Logger log = 
LoggerFactory.getLogger(BinaryProtoLookupService.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 08c9956b5be..9044088f724 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -28,6 +28,7 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
@@ -105,7 +106,14 @@ public class HttpLookupService implements LookupService {
      */
     @Override
     @SuppressWarnings("deprecation")
-    public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) 
{
+    public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName, 
Map<String, String> lookupProperties) {
+        if (lookupProperties == null) {
+            lookupProperties = httpClient.clientConf.getLookupProperties();
+        }
+        if (lookupProperties != null && !lookupProperties.isEmpty()) {
+            log.warn("Lookup properties aren't supported for http lookup 
service. lookupProperties: {}",
+                    lookupProperties);
+        }
         String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
         String path = basePath + topicName.getLookupName();
         path = StringUtils.isBlank(listenerName) ? path : path + 
"?listenerName=" + Codec.encode(listenerName);
@@ -211,8 +219,8 @@ public class HttpLookupService implements LookupService {
     }
 
     @Override
-    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName) {
-        return getSchema(topicName, null);
+    public boolean isBinaryProtoLookupService() {
+        return false;
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java
new file mode 100644
index 00000000000..8cf95b62634
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import org.apache.pulsar.common.lookup.GetTopicsResult;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * Decorator for {@link LookupService} that deduplicates in-progress lookups 
for topics, schemas, partitioned topics
+ * and topic listings for namespace.
+ */
+public class InProgressDeduplicationDecoratorLookupService implements 
LookupService {
+    private final LookupService delegate;
+    private final Supplier<Map<String, String>> lookupPropertiesSupplier;
+    private final InProgressHolder<LookupBrokerKey, 
CompletableFuture<LookupTopicResult>> topicLookupsInProgress =
+            new InProgressHolder<>();
+    private final InProgressHolder<PartitionedTopicMetadataKey, 
CompletableFuture<PartitionedTopicMetadata>>
+            partitionedTopicMetadataInProgress = new InProgressHolder<>();
+    private final InProgressHolder<LookupSchemaKey, 
CompletableFuture<Optional<SchemaInfo>>> schemasInProgress =
+            new InProgressHolder<>();
+    private final InProgressHolder<TopicsUnderNamespaceKey, 
CompletableFuture<GetTopicsResult>>
+            topicsUnderNamespaceInProgress = new InProgressHolder<>();
+
+    public InProgressDeduplicationDecoratorLookupService(LookupService 
delegate,
+                                                         Supplier<Map<String, 
String>> lookupPropertiesSupplier) {
+        this.delegate = delegate;
+        this.lookupPropertiesSupplier = lookupPropertiesSupplier;
+    }
+
+    @Override
+    public void updateServiceUrl(String serviceUrl) throws 
PulsarClientException {
+        delegate.updateServiceUrl(serviceUrl);
+    }
+
+    @Override
+    public String getServiceUrl() {
+        return delegate.getServiceUrl();
+    }
+
+    @Override
+    public InetSocketAddress resolveHost() {
+        return delegate.resolveHost();
+    }
+
+    @Override
+    public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName, 
Map<String, String> lookupProperties) {
+        Map<String, String> lookupPropertiesToUse =
+                lookupProperties != null ? lookupProperties : 
lookupPropertiesSupplier.get();
+        return topicLookupsInProgress.getOrComputeIfAbsent(
+                new LookupBrokerKey(topicName.toString(), 
lookupPropertiesToUse),
+                () -> delegate.getBroker(topicName, lookupPropertiesToUse));
+    }
+
+    @Override
+    public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(
+            TopicName topicName,
+            boolean metadataAutoCreationEnabled,
+            boolean useFallbackForNonPIP344Brokers) {
+        return partitionedTopicMetadataInProgress.getOrComputeIfAbsent(
+                new PartitionedTopicMetadataKey(topicName, 
metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers),
+                () -> delegate.getPartitionedTopicMetadata(topicName, 
metadataAutoCreationEnabled,
+                        useFallbackForNonPIP344Brokers));
+    }
+
+    @Override
+    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName, byte[] version) {
+        // all partitions of a partitioned topic share the same schema
+        // therefore, perform the lookup with the partitioned topic name
+        String topicForSchemaLookup = topicName.getPartitionedTopicName();
+        return schemasInProgress.getOrComputeIfAbsent(new 
LookupSchemaKey(topicForSchemaLookup, version),
+                () -> delegate.getSchema(TopicName.get(topicForSchemaLookup), 
version));
+    }
+
+    @Override
+    public CompletableFuture<GetTopicsResult> 
getTopicsUnderNamespace(NamespaceName namespace, Mode mode,
+                                                                      String 
topicPattern, String topicsHash) {
+        return topicsUnderNamespaceInProgress.getOrComputeIfAbsent(
+                new TopicsUnderNamespaceKey(namespace, mode, topicPattern, 
topicsHash),
+                () -> delegate.getTopicsUnderNamespace(namespace, mode, 
topicPattern, topicsHash));
+    }
+
+    @Override
+    public void close() throws Exception {
+        delegate.close();
+    }
+
+    @Override
+    public boolean isBinaryProtoLookupService() {
+        return delegate.isBinaryProtoLookupService();
+    }
+
+    private static class InProgressHolder<K, V extends CompletableFuture<?>> {
+        private final ConcurrentHashMap<K, V> inProgress = new 
ConcurrentHashMap<>();
+
+        public V getOrComputeIfAbsent(K key, Supplier<V> supplier) {
+            final MutableObject<V> newFutureCreated = new MutableObject<>();
+            try {
+                return inProgress.computeIfAbsent(key, k -> {
+                    V newFuture = supplier.get();
+                    newFutureCreated.setValue(newFuture);
+                    return newFuture;
+                });
+            } finally {
+                V newFutureCreatedValue = newFutureCreated.getValue();
+                if (newFutureCreatedValue != null) {
+                    newFutureCreatedValue.whenComplete((v, ex) -> {
+                        inProgress.remove(key, newFutureCreatedValue);
+                    });
+                }
+            }
+        }
+    }
+
+    private static final class LookupBrokerKey {
+        private final String topic;
+        private final Map<String, String> properties;
+
+        private LookupBrokerKey(String topic, Map<String, String> properties) {
+            this.topic = topic;
+            this.properties = properties.isEmpty() ? Collections.emptyMap() : 
new HashMap<>(properties);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            LookupBrokerKey lookupBrokerKey = (LookupBrokerKey) o;
+            return Objects.equals(topic, lookupBrokerKey.topic) && 
properties.equals(lookupBrokerKey.properties);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hashCode(topic);
+            result = 31 * result + properties.hashCode();
+            return result;
+        }
+    }
+
+    private static final class LookupSchemaKey {
+        private final String topic;
+        private final byte[] version;
+
+        private LookupSchemaKey(String topic, byte[] version) {
+            this.topic = topic;
+            this.version = version != null ? version.clone() : null;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            LookupSchemaKey that = (LookupSchemaKey) o;
+            return Objects.equals(topic, that.topic) && Arrays.equals(version, 
that.version);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hashCode(topic);
+            result = 31 * result + Arrays.hashCode(version);
+            return result;
+        }
+    }
+
+    private static final class TopicsUnderNamespaceKey {
+        private final NamespaceName namespace;
+        private final Mode mode;
+        private final String topicsPattern;
+        private final String topicsHash;
+
+        TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode, String 
topicsPattern, String topicsHash) {
+            this.namespace = namespace;
+            this.mode = mode;
+            this.topicsPattern = topicsPattern;
+            this.topicsHash = topicsHash;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o;
+            return Objects.equals(namespace, that.namespace) && mode == 
that.mode && Objects.equals(topicsPattern,
+                    that.topicsPattern) && Objects.equals(topicsHash, 
that.topicsHash);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(namespace, mode, topicsPattern, topicsHash);
+        }
+
+        @Override
+        public String toString() {
+            return "TopicsUnderNamespaceKey{" + "namespace=" + namespace + ", 
mode=" + mode + ", topicsPattern='"
+                    + topicsPattern + '\'' + ", topicsHash='" + topicsHash + 
'\'' + '}';
+        }
+    }
+
+    private static final class PartitionedTopicMetadataKey {
+        private final TopicName topicName;
+        private final boolean metadataAutoCreationEnabled;
+        private final boolean useFallbackForNonPIP344Brokers;
+
+        PartitionedTopicMetadataKey(TopicName topicName, boolean 
metadataAutoCreationEnabled,
+                                    boolean useFallbackForNonPIP344Brokers) {
+            this.topicName = topicName;
+            this.metadataAutoCreationEnabled = metadataAutoCreationEnabled;
+            this.useFallbackForNonPIP344Brokers = 
useFallbackForNonPIP344Brokers;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PartitionedTopicMetadataKey that = (PartitionedTopicMetadataKey) o;
+            return metadataAutoCreationEnabled == 
that.metadataAutoCreationEnabled
+                    && useFallbackForNonPIP344Brokers == 
that.useFallbackForNonPIP344Brokers && Objects.equals(
+                    topicName, that.topicName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topicName, metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionedTopicMetadataKey{" + "topicName=" + topicName + 
", metadataAutoCreationEnabled="
+                    + metadataAutoCreationEnabled + ", 
useFallbackForNonPIP344Brokers=" + useFallbackForNonPIP344Brokers
+                    + '}';
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 3367ae99cb1..67cfe449ec9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -56,7 +57,21 @@ public interface LookupService extends AutoCloseable {
      * @return a {@link LookupTopicResult} representing the logical and 
physical address of the broker that serves the
      *         given topic, as well as proxying information.
      */
-    CompletableFuture<LookupTopicResult> getBroker(TopicName topicName);
+    default CompletableFuture<LookupTopicResult> getBroker(TopicName 
topicName) {
+        return getBroker(topicName, null);
+    }
+
+    /**
+     * Calls broker lookup-api to get broker {@link InetSocketAddress} which 
serves namespace bundle that contains given
+     * topic. This lookup is made with the given lookup properties. When null 
is passed, the
+     * default lookup properties specified in the client configuration are 
used.
+     *
+     * @param topicName
+     *            topic-name
+     * @return a {@link LookupTopicResult} representing the logical and 
physical address of the broker that serves the
+     *         given topic, as well as proxying information.
+     */
+    CompletableFuture<LookupTopicResult> getBroker(TopicName topicName, 
Map<String, String> lookupProperties);
 
     /**
      * Returns {@link PartitionedTopicMetadata} for a given topic.
@@ -104,7 +119,9 @@ public interface LookupService extends AutoCloseable {
      * @param topicName topic-name
      * @return SchemaInfo
      */
-    CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName);
+    default CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName) {
+        return getSchema(topicName, null);
+    }
 
     /**
      * Returns specific version SchemaInfo {@link SchemaInfo} for a given 
topic.
@@ -144,4 +161,9 @@ public interface LookupService extends AutoCloseable {
      */
     CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName 
namespace, Mode mode,
                                                                String 
topicPattern, String topicsHash);
+
+    /**
+     * Returns true if the lookup service is a binary protocol lookup service.
+     */
+    boolean isBinaryProtoLookupService();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1d8ecb47278..3a2ff97f51e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -270,14 +270,7 @@ public class PulsarClientImpl implements PulsarClient {
             } else {
                 this.timer = timer;
             }
-            if (conf.getServiceUrl().startsWith("http")) {
-                lookup = new HttpLookupService(instrumentProvider, conf, 
this.eventLoopGroup, this.timer,
-                        getNameResolver());
-            } else {
-                lookup = new BinaryProtoLookupService(this, 
conf.getServiceUrl(), conf.getListenerName(),
-                        conf.isUseTls(), 
this.scheduledExecutorProvider.getExecutor(),
-                        this.lookupExecutorProvider.getExecutor());
-            }
+            lookup = createLookup(conf.getServiceUrl());
 
             if (conf.getServiceUrlProvider() != null) {
                 conf.getServiceUrlProvider().initialize(this);
@@ -1146,7 +1139,7 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
-        if (!(lookup instanceof BinaryProtoLookupService)) {
+        if (!lookup.isBinaryProtoLookupService()) {
             return FutureUtil.failedFuture(new 
PulsarClientException.InvalidServiceURL(
                     "Can't get client connection to HTTP service URL", null));
         }
@@ -1156,7 +1149,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     public CompletableFuture<ClientCnx> getProxyConnection(final 
InetSocketAddress logicalAddress,
                                                            final int 
randomKeyForSelectConnection) {
-        if (!(lookup instanceof BinaryProtoLookupService)) {
+        if (!lookup.isBinaryProtoLookupService()) {
             return FutureUtil.failedFuture(new 
PulsarClientException.InvalidServiceURL(
                     "Cannot proxy connection through HTTP service URL", null));
         }
@@ -1225,12 +1218,15 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public LookupService createLookup(String url) throws PulsarClientException 
{
+        LookupService lookupService;
         if (url.startsWith("http")) {
-            return new HttpLookupService(instrumentProvider, conf, 
eventLoopGroup, timer, getNameResolver());
+            lookupService = new HttpLookupService(instrumentProvider, conf, 
eventLoopGroup, timer, getNameResolver());
         } else {
-            return new BinaryProtoLookupService(this, url, 
conf.getListenerName(), conf.isUseTls(),
-                    externalExecutorProvider.getExecutor());
+            lookupService = new BinaryProtoLookupService(this, url, 
conf.getListenerName(), conf.isUseTls(),
+                    this.scheduledExecutorProvider.getExecutor(), 
this.lookupExecutorProvider.getExecutor());
         }
+        return new InProgressDeduplicationDecoratorLookupService(lookupService,
+                () -> getConfiguration().getLookupProperties());
     }
 
     /**
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index 8bc78264ec7..7b9d7b37e5e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -37,6 +37,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -204,6 +205,10 @@ public class BinaryProtoLookupServiceTest {
         return lookupResult;
     }
 
+    private LookupService decoratedLookupService(LookupService lookupService) {
+        return new 
InProgressDeduplicationDecoratorLookupService(lookupService, () -> 
Collections.emptyMap());
+    }
+
     /**
      * Verifies that getTopicsUnderNamespace() deduplicates concurrent 
requests and cleans up after completion.
      *
@@ -236,8 +241,8 @@ public class BinaryProtoLookupServiceTest {
         ScheduledExecutorService scheduler =
                 Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("lookup-test-sched"));
 
-        try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
-                client, "pulsar://broker:6650", null, false, scheduler, 
/*lookupPinnedExecutor*/ null)) {
+        try (LookupService lookup = decoratedLookupService(new 
BinaryProtoLookupService(
+                client, "pulsar://broker:6650", null, false, scheduler, 
/*lookupPinnedExecutor*/ null))) {
 
             NamespaceName ns = NamespaceName.get("public", "default");
             Mode mode = Mode.PERSISTENT;
@@ -295,8 +300,8 @@ public class BinaryProtoLookupServiceTest {
         ScheduledExecutorService scheduler =
                 Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("lookup-test-sched"));
 
-        try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
-                client, "pulsar://broker:6650", null, false, scheduler, null)) 
{
+        try (LookupService lookup = decoratedLookupService(new 
BinaryProtoLookupService(
+                client, "pulsar://broker:6650", null, false, scheduler, 
null))) {
 
             NamespaceName ns = NamespaceName.get("public", "default");
             Mode mode = Mode.PERSISTENT;
@@ -366,8 +371,8 @@ public class BinaryProtoLookupServiceTest {
         ScheduledExecutorService scheduler =
                 Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("lookup-test-sched"));
 
-        try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
-                client, "pulsar://broker:6650", null, false, scheduler, null)) 
{
+        try (LookupService lookup = decoratedLookupService(new 
BinaryProtoLookupService(
+                client, "pulsar://broker:6650", null, false, scheduler, 
null))) {
 
             TopicName topic = TopicName.get("persistent://public/default/t1");
             boolean metadataAutoCreationEnabled = true;
@@ -426,8 +431,8 @@ public class BinaryProtoLookupServiceTest {
         ScheduledExecutorService scheduler =
                 Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("lookup-test-sched"));
 
-        try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
-                client, "pulsar://broker:6650", null, false, scheduler, null)) 
{
+        try (LookupService lookup = decoratedLookupService(new 
BinaryProtoLookupService(
+                client, "pulsar://broker:6650", null, false, scheduler, 
null))) {
 
             TopicName topic = TopicName.get("persistent://public/default/t1");
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 00abe6c55c4..83321d0f776 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.BinaryProtoLookupService;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -135,7 +134,7 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
                 .serviceUrl(proxyService.getServiceUrl()).build();
         String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
         LookupService lookupService = client.getLookup();
-        assertTrue(lookupService instanceof BinaryProtoLookupService);
+        assertTrue(lookupService.isBinaryProtoLookupService());
         ClientCnx lookupConnection = 
client.getCnxPool().getConnection(lookupService.resolveHost()).join();
 
         // Make no permits to lookup.
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
index f0f8a3bbcae..2529c929a6f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.proxy.server;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -190,7 +191,7 @@ public class ProxyWithExtensibleLoadManagerTest extends 
MultiBrokerBaseTest {
         var producerClient = producerClientFuture.get();
         @Cleanup
         var producer = 
producerClient.newProducer(Schema.INT32).topic(topicName.toString()).create();
-        LookupService producerLookupServiceSpy = spyField(producerClient, 
"lookup");
+        LookupService producerLookupServiceSpy = 
spyField(producerClient.getLookup(), "delegate");
 
         @Cleanup
         var consumerClient = consumerClientFuture.get();
@@ -200,7 +201,7 @@ public class ProxyWithExtensibleLoadManagerTest extends 
MultiBrokerBaseTest {
                 subscriptionName(BrokerTestUtil.newUniqueName("my-sub")).
                 ackTimeout(1000, TimeUnit.MILLISECONDS).
                 subscribe();
-        LookupService consumerLookupServiceSpy = spyField(consumerClient, 
"lookup");
+        LookupService consumerLookupServiceSpy = 
spyField(consumerClient.getLookup(), "delegate");
 
         var bundleRange = admin.lookups().getBundleRange(topicName.toString());
 
@@ -268,7 +269,7 @@ public class ProxyWithExtensibleLoadManagerTest extends 
MultiBrokerBaseTest {
         @Cleanup
         var producer = (ProducerImpl<Integer>) 
producerClient.newProducer(Schema.INT32).topic(topicName.toString()).
                 create();
-        LookupService producerLookupServiceSpy = spyField(producerClient, 
"lookup");
+        LookupService producerLookupServiceSpy = 
spyField(producerClient.getLookup(), "delegate");
         when(((ServiceNameResolver) spyField(producerLookupServiceSpy, 
"serviceNameResolver")).resolveHost()).
                 thenCallRealMethod().then(invocation -> 
getSourceBrokerInetAddress(topicName));
 
@@ -280,7 +281,7 @@ public class ProxyWithExtensibleLoadManagerTest extends 
MultiBrokerBaseTest {
                 subscriptionName(BrokerTestUtil.newUniqueName("my-sub")).
                 ackTimeout(1000, TimeUnit.MILLISECONDS).
                 subscribe();
-        LookupService consumerLookupServiceSpy = spyField(consumerClient, 
"lookup");
+        LookupService consumerLookupServiceSpy = 
spyField(consumerClient.getLookup(), "delegate");
         when(((ServiceNameResolver) spyField(consumerLookupServiceSpy, 
"serviceNameResolver")).resolveHost()).
                 thenCallRealMethod().then(invocation -> 
getSourceBrokerInetAddress(topicName));
 
@@ -341,9 +342,9 @@ public class ProxyWithExtensibleLoadManagerTest extends 
MultiBrokerBaseTest {
         
assertEquals(FieldUtils.readDeclaredField(consumer.getConnectionHandler(), 
"useProxy", true), Boolean.FALSE);
 
         verify(producerClient, times(1)).getProxyConnection(any(), anyInt());
-        verify(producerLookupServiceSpy, times(1)).getBroker(topicName);
+        verify(producerLookupServiceSpy, times(1)).getBroker(eq(topicName), 
any());
 
         verify(consumerClient, times(1)).getProxyConnection(any(), anyInt());
-        verify(consumerLookupServiceSpy, times(1)).getBroker(topicName);
+        verify(consumerLookupServiceSpy, times(1)).getBroker(eq(topicName), 
any());
     }
 }
diff --git 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
index 67dff01e1cc..da7c2892936 100644
--- 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
+++ 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.websocket;
 
 import io.netty.channel.epoll.Epoll;
+import java.io.IOException;
 import java.lang.reflect.Field;
+import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
 import org.testng.Assert;
@@ -38,12 +40,17 @@ public class LookupProtocolTest {
         conf.setServiceUrl("http://localhost:8080";);
         conf.setServiceUrlTls("https://localhost:8443";);
         WebSocketService service  = new WebSocketService(conf);
+        assertLookupIsBinaryProtoLookup(service, false);
+        service.close();
+    }
+
+    private static void assertLookupIsBinaryProtoLookup(WebSocketService 
service, boolean expected)
+            throws IOException, NoSuchFieldException, IllegalAccessException {
         PulsarClientImpl testClient = (PulsarClientImpl) 
service.getPulsarClient();
         Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
         lookupField.setAccessible(true);
-        Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
-                "org.apache.pulsar.client.impl.HttpLookupService");
-        service.close();
+        LookupService lookupService = (LookupService) 
lookupField.get(testClient);
+        Assert.assertEquals(expected, 
lookupService.isBinaryProtoLookupService());
     }
 
     @Test(timeOut = 10000)
@@ -55,10 +62,7 @@ public class LookupProtocolTest {
         conf.setBrokerClientTlsEnabled(true);
         WebSocketService service  = new WebSocketService(conf);
         PulsarClientImpl testClient = (PulsarClientImpl) 
service.getPulsarClient();
-        Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
-        lookupField.setAccessible(true);
-        Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
-                "org.apache.pulsar.client.impl.HttpLookupService");
+        assertLookupIsBinaryProtoLookup(service, false);
         Assert.assertTrue(testClient.getConfiguration().isUseTls());
         service.close();
     }
@@ -71,11 +75,7 @@ public class LookupProtocolTest {
         conf.setBrokerServiceUrl("pulsar://localhost:6650");
         conf.setBrokerServiceUrlTls("pulsar+ssl://localhost:6651");
         WebSocketService service  = new WebSocketService(conf);
-        PulsarClientImpl testClient = (PulsarClientImpl) 
service.getPulsarClient();
-        Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
-        lookupField.setAccessible(true);
-        Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
-                "org.apache.pulsar.client.impl.BinaryProtoLookupService");
+        assertLookupIsBinaryProtoLookup(service, true);
         service.close();
     }
 
@@ -89,10 +89,7 @@ public class LookupProtocolTest {
         conf.setBrokerClientTlsEnabled(true);
         WebSocketService service  = new WebSocketService(conf);
         PulsarClientImpl testClient = (PulsarClientImpl) 
service.getPulsarClient();
-        Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
-        lookupField.setAccessible(true);
-        Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
-                "org.apache.pulsar.client.impl.BinaryProtoLookupService");
+        assertLookupIsBinaryProtoLookup(service, true);
         Assert.assertTrue(testClient.getConfiguration().isUseTls());
         service.close();
     }

Reply via email to