This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b37b5b3fa01 [improve][client] Deduplicate in-progress lookup requests
also for HttpLookupService (#25017)
b37b5b3fa01 is described below
commit b37b5b3fa01494fd6541f4d460f3586443c221b6
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 2 17:33:44 2025 +0200
[improve][client] Deduplicate in-progress lookup requests also for
HttpLookupService (#25017)
---
.../pulsar/client/api/BrokerServiceLookupTest.java | 7 +-
.../apache/pulsar/client/api/SimpleSchemaTest.java | 6 +-
.../client/impl/BinaryProtoLookupService.java | 205 +++------------
.../pulsar/client/impl/HttpLookupService.java | 14 +-
...rogressDeduplicationDecoratorLookupService.java | 276 +++++++++++++++++++++
.../apache/pulsar/client/impl/LookupService.java | 26 +-
.../pulsar/client/impl/PulsarClientImpl.java | 22 +-
.../client/impl/BinaryProtoLookupServiceTest.java | 21 +-
.../proxy/server/ProxyLookupThrottlingTest.java | 3 +-
.../server/ProxyWithExtensibleLoadManagerTest.java | 13 +-
.../pulsar/websocket/LookupProtocolTest.java | 29 +--
11 files changed, 388 insertions(+), 234 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index ddbe6547213..e00e6d69ce8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -82,7 +82,6 @@ import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
-import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -957,7 +956,7 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase implements ITe
// Assert the lookup service is a "BinaryProtoLookupService".
final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl)
pulsarClient;
final LookupService lookupService = pulsarClientImpl.getLookup();
- assertTrue(lookupService instanceof BinaryProtoLookupService);
+ assertTrue(lookupService.isBinaryProtoLookupService());
final String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final int topicPartitions = 10;
@@ -981,7 +980,7 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase implements ITe
// Assert the lookup service is a "BinaryProtoLookupService".
final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl)
pulsarClient;
final LookupService lookupService = pulsarClientImpl.getLookup();
- assertTrue(lookupService instanceof BinaryProtoLookupService);
+ assertTrue(lookupService.isBinaryProtoLookupService());
final String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(tpName);
@@ -1234,7 +1233,7 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase implements ITe
Consumer<String> consumer =
pulsarClientImpl.newConsumer(Schema.STRING).topic(tpName)
.subscriptionName("s1").isAckReceiptEnabled(true).subscribe();
LookupService lookupService = pulsarClientImpl.getLookup();
- assertTrue(lookupService instanceof BinaryProtoLookupService);
+ assertTrue(lookupService.isBinaryProtoLookupService());
ClientCnx lookupConnection =
pulsarClientImpl.getCnxPool().getConnection(lookupService.resolveHost()).join();
var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 3a7f65f0b07..d581e0a6b32 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -49,10 +49,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import
org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import
org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -1262,8 +1260,8 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
LookupService httpLookupService = httpProtocolClient.getLookup();
LookupService binaryLookupService = binaryProtocolClient.getLookup();
- Assert.assertTrue(httpLookupService instanceof HttpLookupService);
- Assert.assertTrue(binaryLookupService instanceof
BinaryProtoLookupService);
+ Assert.assertTrue(!httpLookupService.isBinaryProtoLookupService());
+ Assert.assertTrue(binaryLookupService.isBinaryProtoLookupService());
Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic),
ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 368d9c3809a..674fb40793f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -26,17 +26,13 @@ import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
@@ -66,16 +62,6 @@ public class BinaryProtoLookupService implements
LookupService {
private final int maxLookupRedirects;
private final ExecutorService lookupPinnedExecutor;
private final boolean createdLookupPinnedExecutor;
-
- private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>,
CompletableFuture<LookupTopicResult>>
- lookupInProgress = new ConcurrentHashMap<>();
-
- private final ConcurrentHashMap<PartitionedTopicMetadataKey,
CompletableFuture<PartitionedTopicMetadata>>
- partitionedMetadataInProgress = new ConcurrentHashMap<>();
-
- private final ConcurrentHashMap<TopicsUnderNamespaceKey,
CompletableFuture<GetTopicsResult>>
- topicsUnderNamespaceInProgress = new ConcurrentHashMap<>();
-
private final LatencyHistogram histoGetBroker;
private final LatencyHistogram histoGetTopicMetadata;
private final LatencyHistogram histoGetSchema;
@@ -156,32 +142,20 @@ public class BinaryProtoLookupService implements
LookupService {
* topic-name
* @return broker-socket-address that serves given topic
*/
- public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName)
{
- long startTime = System.nanoTime();
- final MutableObject<CompletableFuture> newFutureCreated = new
MutableObject<>();
- final Pair<TopicName, Map<String, String>> key = Pair.of(topicName,
- client.getConfiguration().getLookupProperties());
- try {
- return lookupInProgress.computeIfAbsent(key, tpName -> {
- CompletableFuture<LookupTopicResult> newFuture =
findBroker(serviceNameResolver.resolveHost(), false,
- topicName, 0, key.getRight());
- newFutureCreated.setValue(newFuture);
-
- newFuture.thenRun(() -> {
- histoGetBroker.recordSuccess(System.nanoTime() -
startTime);
- }).exceptionally(x -> {
- histoGetBroker.recordFailure(System.nanoTime() -
startTime);
- return null;
- });
- return newFuture;
- });
- } finally {
- if (newFutureCreated.getValue() != null) {
- newFutureCreated.getValue().whenComplete((v, ex) -> {
- lookupInProgress.remove(key, newFutureCreated.getValue());
- });
- }
+ public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName,
Map<String, String> lookupProperties) {
+ if (lookupProperties == null) {
+ lookupProperties = client.getConfiguration().getLookupProperties();
}
+ long startTime = System.nanoTime();
+ CompletableFuture<LookupTopicResult> newFuture =
findBroker(serviceNameResolver.resolveHost(), false,
+ topicName, 0, lookupProperties);
+ newFuture.thenRun(() -> {
+ histoGetBroker.recordSuccess(System.nanoTime() - startTime);
+ }).exceptionally(x -> {
+ histoGetBroker.recordFailure(System.nanoTime() - startTime);
+ return null;
+ });
+ return newFuture;
}
/**
@@ -191,24 +165,7 @@ public class BinaryProtoLookupService implements
LookupService {
@Override
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(
TopicName topicName, boolean metadataAutoCreationEnabled, boolean
useFallbackForNonPIP344Brokers) {
- final MutableObject<CompletableFuture> newFutureCreated = new
MutableObject<>();
- final PartitionedTopicMetadataKey key = new
PartitionedTopicMetadataKey(
- topicName, metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
- try {
- return partitionedMetadataInProgress.computeIfAbsent(key, k -> {
- CompletableFuture<PartitionedTopicMetadata> newFuture =
getPartitionedTopicMetadataAsync(
- topicName, metadataAutoCreationEnabled,
- useFallbackForNonPIP344Brokers);
- newFutureCreated.setValue(newFuture);
- return newFuture;
- });
- } finally {
- if (newFutureCreated.getValue() != null) {
- newFutureCreated.getValue().whenComplete((v, ex) -> {
- partitionedMetadataInProgress.remove(key,
newFutureCreated.getValue());
- });
- }
- }
+ return getPartitionedTopicMetadataAsync(topicName,
metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers);
}
private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress
socketAddress,
@@ -351,12 +308,6 @@ public class BinaryProtoLookupService implements
LookupService {
return partitionFuture;
}
- @Override
- public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName
topicName) {
- return getSchema(topicName, null);
- }
-
-
@Override
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName
topicName, byte[] version) {
long startTime = System.nanoTime();
@@ -403,31 +354,21 @@ public class BinaryProtoLookupService implements
LookupService {
Mode mode,
String topicsPattern,
String topicsHash) {
- final MutableObject<CompletableFuture<GetTopicsResult>>
newFutureCreated = new MutableObject<>();
- final TopicsUnderNamespaceKey key = new
TopicsUnderNamespaceKey(namespace, mode, topicsPattern, topicsHash);
-
- try {
- return topicsUnderNamespaceInProgress.computeIfAbsent(key, k -> {
- CompletableFuture<GetTopicsResult> topicsFuture = new
CompletableFuture<>();
- AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMandatoryStop(opTimeoutMs.get() * 2,
TimeUnit.MILLISECONDS)
- .setMax(1, TimeUnit.MINUTES)
- .create();
-
- newFutureCreated.setValue(topicsFuture);
- getTopicsUnderNamespace(namespace, backoff, opTimeoutMs,
topicsFuture, mode,
- topicsPattern, topicsHash);
- return topicsFuture;
- });
- } finally {
- if (newFutureCreated.getValue() != null) {
- newFutureCreated.getValue().whenComplete((v, ex) -> {
- topicsUnderNamespaceInProgress.remove(key,
newFutureCreated.getValue());
- });
- }
- }
+ CompletableFuture<GetTopicsResult> topicsFuture = new
CompletableFuture<>();
+ AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+ .setMax(1, TimeUnit.MINUTES)
+ .create();
+ getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture,
mode,
+ topicsPattern, topicsHash);
+ return topicsFuture;
+ }
+
+ @Override
+ public boolean isBinaryProtoLookupService() {
+ return true;
}
private void getTopicsUnderNamespace(
@@ -517,93 +458,5 @@ public class BinaryProtoLookupService implements
LookupService {
}
- private static final class TopicsUnderNamespaceKey {
- private final NamespaceName namespace;
- private final Mode mode;
- private final String topicsPattern;
- private final String topicsHash;
-
- TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode,
- String topicsPattern, String topicsHash) {
- this.namespace = namespace;
- this.mode = mode;
- this.topicsPattern = topicsPattern;
- this.topicsHash = topicsHash;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o;
- return Objects.equals(namespace, that.namespace)
- && mode == that.mode
- && Objects.equals(topicsPattern, that.topicsPattern)
- && Objects.equals(topicsHash, that.topicsHash);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(namespace, mode, topicsPattern, topicsHash);
- }
-
- @Override
- public String toString() {
- return "TopicsUnderNamespaceKey{"
- + "namespace=" + namespace
- + ", mode=" + mode
- + ", topicsPattern='" + topicsPattern + '\''
- + ", topicsHash='" + topicsHash + '\''
- + '}';
- }
- }
-
- private static final class PartitionedTopicMetadataKey {
- private final TopicName topicName;
- private final boolean metadataAutoCreationEnabled;
- private final boolean useFallbackForNonPIP344Brokers;
-
- PartitionedTopicMetadataKey(TopicName topicName,
- boolean metadataAutoCreationEnabled,
- boolean useFallbackForNonPIP344Brokers) {
- this.topicName = topicName;
- this.metadataAutoCreationEnabled = metadataAutoCreationEnabled;
- this.useFallbackForNonPIP344Brokers =
useFallbackForNonPIP344Brokers;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PartitionedTopicMetadataKey that = (PartitionedTopicMetadataKey) o;
- return metadataAutoCreationEnabled ==
that.metadataAutoCreationEnabled
- && useFallbackForNonPIP344Brokers ==
that.useFallbackForNonPIP344Brokers
- && Objects.equals(topicName, that.topicName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(topicName, metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
- }
-
- @Override
- public String toString() {
- return "PartitionedTopicMetadataKey{"
- + "topicName=" + topicName
- + ", metadataAutoCreationEnabled=" +
metadataAutoCreationEnabled
- + ", useFallbackForNonPIP344Brokers=" +
useFallbackForNonPIP344Brokers
- + '}';
- }
- }
-
-
private static final Logger log =
LoggerFactory.getLogger(BinaryProtoLookupService.class);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 08c9956b5be..9044088f724 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -28,6 +28,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
@@ -105,7 +106,14 @@ public class HttpLookupService implements LookupService {
*/
@Override
@SuppressWarnings("deprecation")
- public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName)
{
+ public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName,
Map<String, String> lookupProperties) {
+ if (lookupProperties == null) {
+ lookupProperties = httpClient.clientConf.getLookupProperties();
+ }
+ if (lookupProperties != null && !lookupProperties.isEmpty()) {
+ log.warn("Lookup properties aren't supported for http lookup
service. lookupProperties: {}",
+ lookupProperties);
+ }
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path +
"?listenerName=" + Codec.encode(listenerName);
@@ -211,8 +219,8 @@ public class HttpLookupService implements LookupService {
}
@Override
- public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName
topicName) {
- return getSchema(topicName, null);
+ public boolean isBinaryProtoLookupService() {
+ return false;
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java
new file mode 100644
index 00000000000..8cf95b62634
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import org.apache.pulsar.common.lookup.GetTopicsResult;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * Decorator for {@link LookupService} that deduplicates in-progress lookups
for topics, schemas, partitioned topics
+ * and topic listings for namespace.
+ */
+public class InProgressDeduplicationDecoratorLookupService implements
LookupService {
+ private final LookupService delegate;
+ private final Supplier<Map<String, String>> lookupPropertiesSupplier;
+ private final InProgressHolder<LookupBrokerKey,
CompletableFuture<LookupTopicResult>> topicLookupsInProgress =
+ new InProgressHolder<>();
+ private final InProgressHolder<PartitionedTopicMetadataKey,
CompletableFuture<PartitionedTopicMetadata>>
+ partitionedTopicMetadataInProgress = new InProgressHolder<>();
+ private final InProgressHolder<LookupSchemaKey,
CompletableFuture<Optional<SchemaInfo>>> schemasInProgress =
+ new InProgressHolder<>();
+ private final InProgressHolder<TopicsUnderNamespaceKey,
CompletableFuture<GetTopicsResult>>
+ topicsUnderNamespaceInProgress = new InProgressHolder<>();
+
+ public InProgressDeduplicationDecoratorLookupService(LookupService
delegate,
+ Supplier<Map<String,
String>> lookupPropertiesSupplier) {
+ this.delegate = delegate;
+ this.lookupPropertiesSupplier = lookupPropertiesSupplier;
+ }
+
+ @Override
+ public void updateServiceUrl(String serviceUrl) throws
PulsarClientException {
+ delegate.updateServiceUrl(serviceUrl);
+ }
+
+ @Override
+ public String getServiceUrl() {
+ return delegate.getServiceUrl();
+ }
+
+ @Override
+ public InetSocketAddress resolveHost() {
+ return delegate.resolveHost();
+ }
+
+ @Override
+ public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName,
Map<String, String> lookupProperties) {
+ Map<String, String> lookupPropertiesToUse =
+ lookupProperties != null ? lookupProperties :
lookupPropertiesSupplier.get();
+ return topicLookupsInProgress.getOrComputeIfAbsent(
+ new LookupBrokerKey(topicName.toString(),
lookupPropertiesToUse),
+ () -> delegate.getBroker(topicName, lookupPropertiesToUse));
+ }
+
+ @Override
+ public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(
+ TopicName topicName,
+ boolean metadataAutoCreationEnabled,
+ boolean useFallbackForNonPIP344Brokers) {
+ return partitionedTopicMetadataInProgress.getOrComputeIfAbsent(
+ new PartitionedTopicMetadataKey(topicName,
metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers),
+ () -> delegate.getPartitionedTopicMetadata(topicName,
metadataAutoCreationEnabled,
+ useFallbackForNonPIP344Brokers));
+ }
+
+ @Override
+ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName
topicName, byte[] version) {
+ // all partitions of a partitioned topic share the same schema
+ // therefore, perform the lookup with the partitioned topic name
+ String topicForSchemaLookup = topicName.getPartitionedTopicName();
+ return schemasInProgress.getOrComputeIfAbsent(new
LookupSchemaKey(topicForSchemaLookup, version),
+ () -> delegate.getSchema(TopicName.get(topicForSchemaLookup),
version));
+ }
+
+ @Override
+ public CompletableFuture<GetTopicsResult>
getTopicsUnderNamespace(NamespaceName namespace, Mode mode,
+ String
topicPattern, String topicsHash) {
+ return topicsUnderNamespaceInProgress.getOrComputeIfAbsent(
+ new TopicsUnderNamespaceKey(namespace, mode, topicPattern,
topicsHash),
+ () -> delegate.getTopicsUnderNamespace(namespace, mode,
topicPattern, topicsHash));
+ }
+
+ @Override
+ public void close() throws Exception {
+ delegate.close();
+ }
+
+ @Override
+ public boolean isBinaryProtoLookupService() {
+ return delegate.isBinaryProtoLookupService();
+ }
+
+ private static class InProgressHolder<K, V extends CompletableFuture<?>> {
+ private final ConcurrentHashMap<K, V> inProgress = new
ConcurrentHashMap<>();
+
+ public V getOrComputeIfAbsent(K key, Supplier<V> supplier) {
+ final MutableObject<V> newFutureCreated = new MutableObject<>();
+ try {
+ return inProgress.computeIfAbsent(key, k -> {
+ V newFuture = supplier.get();
+ newFutureCreated.setValue(newFuture);
+ return newFuture;
+ });
+ } finally {
+ V newFutureCreatedValue = newFutureCreated.getValue();
+ if (newFutureCreatedValue != null) {
+ newFutureCreatedValue.whenComplete((v, ex) -> {
+ inProgress.remove(key, newFutureCreatedValue);
+ });
+ }
+ }
+ }
+ }
+
+ private static final class LookupBrokerKey {
+ private final String topic;
+ private final Map<String, String> properties;
+
+ private LookupBrokerKey(String topic, Map<String, String> properties) {
+ this.topic = topic;
+ this.properties = properties.isEmpty() ? Collections.emptyMap() :
new HashMap<>(properties);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ LookupBrokerKey lookupBrokerKey = (LookupBrokerKey) o;
+ return Objects.equals(topic, lookupBrokerKey.topic) &&
properties.equals(lookupBrokerKey.properties);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hashCode(topic);
+ result = 31 * result + properties.hashCode();
+ return result;
+ }
+ }
+
+ private static final class LookupSchemaKey {
+ private final String topic;
+ private final byte[] version;
+
+ private LookupSchemaKey(String topic, byte[] version) {
+ this.topic = topic;
+ this.version = version != null ? version.clone() : null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ LookupSchemaKey that = (LookupSchemaKey) o;
+ return Objects.equals(topic, that.topic) && Arrays.equals(version,
that.version);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hashCode(topic);
+ result = 31 * result + Arrays.hashCode(version);
+ return result;
+ }
+ }
+
+ private static final class TopicsUnderNamespaceKey {
+ private final NamespaceName namespace;
+ private final Mode mode;
+ private final String topicsPattern;
+ private final String topicsHash;
+
+ TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode, String
topicsPattern, String topicsHash) {
+ this.namespace = namespace;
+ this.mode = mode;
+ this.topicsPattern = topicsPattern;
+ this.topicsHash = topicsHash;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o;
+ return Objects.equals(namespace, that.namespace) && mode ==
that.mode && Objects.equals(topicsPattern,
+ that.topicsPattern) && Objects.equals(topicsHash,
that.topicsHash);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(namespace, mode, topicsPattern, topicsHash);
+ }
+
+ @Override
+ public String toString() {
+ return "TopicsUnderNamespaceKey{" + "namespace=" + namespace + ",
mode=" + mode + ", topicsPattern='"
+ + topicsPattern + '\'' + ", topicsHash='" + topicsHash +
'\'' + '}';
+ }
+ }
+
+ private static final class PartitionedTopicMetadataKey {
+ private final TopicName topicName;
+ private final boolean metadataAutoCreationEnabled;
+ private final boolean useFallbackForNonPIP344Brokers;
+
+ PartitionedTopicMetadataKey(TopicName topicName, boolean
metadataAutoCreationEnabled,
+ boolean useFallbackForNonPIP344Brokers) {
+ this.topicName = topicName;
+ this.metadataAutoCreationEnabled = metadataAutoCreationEnabled;
+ this.useFallbackForNonPIP344Brokers =
useFallbackForNonPIP344Brokers;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionedTopicMetadataKey that = (PartitionedTopicMetadataKey) o;
+ return metadataAutoCreationEnabled ==
that.metadataAutoCreationEnabled
+ && useFallbackForNonPIP344Brokers ==
that.useFallbackForNonPIP344Brokers && Objects.equals(
+ topicName, that.topicName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicName, metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionedTopicMetadataKey{" + "topicName=" + topicName +
", metadataAutoCreationEnabled="
+ + metadataAutoCreationEnabled + ",
useFallbackForNonPIP344Brokers=" + useFallbackForNonPIP344Brokers
+ + '}';
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 3367ae99cb1..67cfe449ec9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import java.net.InetSocketAddress;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -56,7 +57,21 @@ public interface LookupService extends AutoCloseable {
* @return a {@link LookupTopicResult} representing the logical and
physical address of the broker that serves the
* given topic, as well as proxying information.
*/
- CompletableFuture<LookupTopicResult> getBroker(TopicName topicName);
+ default CompletableFuture<LookupTopicResult> getBroker(TopicName
topicName) {
+ return getBroker(topicName, null);
+ }
+
+ /**
+ * Calls broker lookup-api to get broker {@link InetSocketAddress} which
serves namespace bundle that contains given
+ * topic. This lookup is made with the given lookup properties. When null
is passed, the
+ * default lookup properties specified in the client configuration are
used.
+ *
+ * @param topicName
+ * topic-name
+ * @return a {@link LookupTopicResult} representing the logical and
physical address of the broker that serves the
+ * given topic, as well as proxying information.
+ */
+ CompletableFuture<LookupTopicResult> getBroker(TopicName topicName,
Map<String, String> lookupProperties);
/**
* Returns {@link PartitionedTopicMetadata} for a given topic.
@@ -104,7 +119,9 @@ public interface LookupService extends AutoCloseable {
* @param topicName topic-name
* @return SchemaInfo
*/
- CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName);
+ default CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName
topicName) {
+ return getSchema(topicName, null);
+ }
/**
* Returns specific version SchemaInfo {@link SchemaInfo} for a given
topic.
@@ -144,4 +161,9 @@ public interface LookupService extends AutoCloseable {
*/
CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName
namespace, Mode mode,
String
topicPattern, String topicsHash);
+
+ /**
+ * Returns true if the lookup service is a binary protocol lookup service.
+ */
+ boolean isBinaryProtoLookupService();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1d8ecb47278..3a2ff97f51e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -270,14 +270,7 @@ public class PulsarClientImpl implements PulsarClient {
} else {
this.timer = timer;
}
- if (conf.getServiceUrl().startsWith("http")) {
- lookup = new HttpLookupService(instrumentProvider, conf,
this.eventLoopGroup, this.timer,
- getNameResolver());
- } else {
- lookup = new BinaryProtoLookupService(this,
conf.getServiceUrl(), conf.getListenerName(),
- conf.isUseTls(),
this.scheduledExecutorProvider.getExecutor(),
- this.lookupExecutorProvider.getExecutor());
- }
+ lookup = createLookup(conf.getServiceUrl());
if (conf.getServiceUrlProvider() != null) {
conf.getServiceUrlProvider().initialize(this);
@@ -1146,7 +1139,7 @@ public class PulsarClientImpl implements PulsarClient {
}
public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
- if (!(lookup instanceof BinaryProtoLookupService)) {
+ if (!lookup.isBinaryProtoLookupService()) {
return FutureUtil.failedFuture(new
PulsarClientException.InvalidServiceURL(
"Can't get client connection to HTTP service URL", null));
}
@@ -1156,7 +1149,7 @@ public class PulsarClientImpl implements PulsarClient {
public CompletableFuture<ClientCnx> getProxyConnection(final
InetSocketAddress logicalAddress,
final int
randomKeyForSelectConnection) {
- if (!(lookup instanceof BinaryProtoLookupService)) {
+ if (!lookup.isBinaryProtoLookupService()) {
return FutureUtil.failedFuture(new
PulsarClientException.InvalidServiceURL(
"Cannot proxy connection through HTTP service URL", null));
}
@@ -1225,12 +1218,15 @@ public class PulsarClientImpl implements PulsarClient {
}
public LookupService createLookup(String url) throws PulsarClientException
{
+ LookupService lookupService;
if (url.startsWith("http")) {
- return new HttpLookupService(instrumentProvider, conf,
eventLoopGroup, timer, getNameResolver());
+ lookupService = new HttpLookupService(instrumentProvider, conf,
eventLoopGroup, timer, getNameResolver());
} else {
- return new BinaryProtoLookupService(this, url,
conf.getListenerName(), conf.isUseTls(),
- externalExecutorProvider.getExecutor());
+ lookupService = new BinaryProtoLookupService(this, url,
conf.getListenerName(), conf.isUseTls(),
+ this.scheduledExecutorProvider.getExecutor(),
this.lookupExecutorProvider.getExecutor());
}
+ return new InProgressDeduplicationDecoratorLookupService(lookupService,
+ () -> getConfiguration().getLookupProperties());
}
/**
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index 8bc78264ec7..7b9d7b37e5e 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -37,6 +37,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -204,6 +205,10 @@ public class BinaryProtoLookupServiceTest {
return lookupResult;
}
+ private LookupService decoratedLookupService(LookupService lookupService) {
+ return new
InProgressDeduplicationDecoratorLookupService(lookupService, () ->
Collections.emptyMap());
+ }
+
/**
* Verifies that getTopicsUnderNamespace() deduplicates concurrent
requests and cleans up after completion.
*
@@ -236,8 +241,8 @@ public class BinaryProtoLookupServiceTest {
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("lookup-test-sched"));
- try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
- client, "pulsar://broker:6650", null, false, scheduler,
/*lookupPinnedExecutor*/ null)) {
+ try (LookupService lookup = decoratedLookupService(new
BinaryProtoLookupService(
+ client, "pulsar://broker:6650", null, false, scheduler,
/*lookupPinnedExecutor*/ null))) {
NamespaceName ns = NamespaceName.get("public", "default");
Mode mode = Mode.PERSISTENT;
@@ -295,8 +300,8 @@ public class BinaryProtoLookupServiceTest {
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("lookup-test-sched"));
- try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
- client, "pulsar://broker:6650", null, false, scheduler, null))
{
+ try (LookupService lookup = decoratedLookupService(new
BinaryProtoLookupService(
+ client, "pulsar://broker:6650", null, false, scheduler,
null))) {
NamespaceName ns = NamespaceName.get("public", "default");
Mode mode = Mode.PERSISTENT;
@@ -366,8 +371,8 @@ public class BinaryProtoLookupServiceTest {
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("lookup-test-sched"));
- try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
- client, "pulsar://broker:6650", null, false, scheduler, null))
{
+ try (LookupService lookup = decoratedLookupService(new
BinaryProtoLookupService(
+ client, "pulsar://broker:6650", null, false, scheduler,
null))) {
TopicName topic = TopicName.get("persistent://public/default/t1");
boolean metadataAutoCreationEnabled = true;
@@ -426,8 +431,8 @@ public class BinaryProtoLookupServiceTest {
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("lookup-test-sched"));
- try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
- client, "pulsar://broker:6650", null, false, scheduler, null))
{
+ try (LookupService lookup = decoratedLookupService(new
BinaryProtoLookupService(
+ client, "pulsar://broker:6650", null, false, scheduler,
null))) {
TopicName topic = TopicName.get("persistent://public/default/t1");
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 00abe6c55c4..83321d0f776 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -135,7 +134,7 @@ public class ProxyLookupThrottlingTest extends
MockedPulsarServiceBaseTest {
.serviceUrl(proxyService.getServiceUrl()).build();
String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
LookupService lookupService = client.getLookup();
- assertTrue(lookupService instanceof BinaryProtoLookupService);
+ assertTrue(lookupService.isBinaryProtoLookupService());
ClientCnx lookupConnection =
client.getCnxPool().getConnection(lookupService.resolveHost()).join();
// Make no permits to lookup.
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
index f0f8a3bbcae..2529c929a6f 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.proxy.server;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -190,7 +191,7 @@ public class ProxyWithExtensibleLoadManagerTest extends
MultiBrokerBaseTest {
var producerClient = producerClientFuture.get();
@Cleanup
var producer =
producerClient.newProducer(Schema.INT32).topic(topicName.toString()).create();
- LookupService producerLookupServiceSpy = spyField(producerClient,
"lookup");
+ LookupService producerLookupServiceSpy =
spyField(producerClient.getLookup(), "delegate");
@Cleanup
var consumerClient = consumerClientFuture.get();
@@ -200,7 +201,7 @@ public class ProxyWithExtensibleLoadManagerTest extends
MultiBrokerBaseTest {
subscriptionName(BrokerTestUtil.newUniqueName("my-sub")).
ackTimeout(1000, TimeUnit.MILLISECONDS).
subscribe();
- LookupService consumerLookupServiceSpy = spyField(consumerClient,
"lookup");
+ LookupService consumerLookupServiceSpy =
spyField(consumerClient.getLookup(), "delegate");
var bundleRange = admin.lookups().getBundleRange(topicName.toString());
@@ -268,7 +269,7 @@ public class ProxyWithExtensibleLoadManagerTest extends
MultiBrokerBaseTest {
@Cleanup
var producer = (ProducerImpl<Integer>)
producerClient.newProducer(Schema.INT32).topic(topicName.toString()).
create();
- LookupService producerLookupServiceSpy = spyField(producerClient,
"lookup");
+ LookupService producerLookupServiceSpy =
spyField(producerClient.getLookup(), "delegate");
when(((ServiceNameResolver) spyField(producerLookupServiceSpy,
"serviceNameResolver")).resolveHost()).
thenCallRealMethod().then(invocation ->
getSourceBrokerInetAddress(topicName));
@@ -280,7 +281,7 @@ public class ProxyWithExtensibleLoadManagerTest extends
MultiBrokerBaseTest {
subscriptionName(BrokerTestUtil.newUniqueName("my-sub")).
ackTimeout(1000, TimeUnit.MILLISECONDS).
subscribe();
- LookupService consumerLookupServiceSpy = spyField(consumerClient,
"lookup");
+ LookupService consumerLookupServiceSpy =
spyField(consumerClient.getLookup(), "delegate");
when(((ServiceNameResolver) spyField(consumerLookupServiceSpy,
"serviceNameResolver")).resolveHost()).
thenCallRealMethod().then(invocation ->
getSourceBrokerInetAddress(topicName));
@@ -341,9 +342,9 @@ public class ProxyWithExtensibleLoadManagerTest extends
MultiBrokerBaseTest {
assertEquals(FieldUtils.readDeclaredField(consumer.getConnectionHandler(),
"useProxy", true), Boolean.FALSE);
verify(producerClient, times(1)).getProxyConnection(any(), anyInt());
- verify(producerLookupServiceSpy, times(1)).getBroker(topicName);
+ verify(producerLookupServiceSpy, times(1)).getBroker(eq(topicName),
any());
verify(consumerClient, times(1)).getProxyConnection(any(), anyInt());
- verify(consumerLookupServiceSpy, times(1)).getBroker(topicName);
+ verify(consumerLookupServiceSpy, times(1)).getBroker(eq(topicName),
any());
}
}
diff --git
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
index 67dff01e1cc..da7c2892936 100644
---
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
+++
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.websocket;
import io.netty.channel.epoll.Epoll;
+import java.io.IOException;
import java.lang.reflect.Field;
+import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.testng.Assert;
@@ -38,12 +40,17 @@ public class LookupProtocolTest {
conf.setServiceUrl("http://localhost:8080");
conf.setServiceUrlTls("https://localhost:8443");
WebSocketService service = new WebSocketService(conf);
+ assertLookupIsBinaryProtoLookup(service, false);
+ service.close();
+ }
+
+ private static void assertLookupIsBinaryProtoLookup(WebSocketService
service, boolean expected)
+ throws IOException, NoSuchFieldException, IllegalAccessException {
PulsarClientImpl testClient = (PulsarClientImpl)
service.getPulsarClient();
Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
lookupField.setAccessible(true);
- Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
- "org.apache.pulsar.client.impl.HttpLookupService");
- service.close();
+ LookupService lookupService = (LookupService)
lookupField.get(testClient);
+ Assert.assertEquals(expected,
lookupService.isBinaryProtoLookupService());
}
@Test(timeOut = 10000)
@@ -55,10 +62,7 @@ public class LookupProtocolTest {
conf.setBrokerClientTlsEnabled(true);
WebSocketService service = new WebSocketService(conf);
PulsarClientImpl testClient = (PulsarClientImpl)
service.getPulsarClient();
- Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
- lookupField.setAccessible(true);
- Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
- "org.apache.pulsar.client.impl.HttpLookupService");
+ assertLookupIsBinaryProtoLookup(service, false);
Assert.assertTrue(testClient.getConfiguration().isUseTls());
service.close();
}
@@ -71,11 +75,7 @@ public class LookupProtocolTest {
conf.setBrokerServiceUrl("pulsar://localhost:6650");
conf.setBrokerServiceUrlTls("pulsar+ssl://localhost:6651");
WebSocketService service = new WebSocketService(conf);
- PulsarClientImpl testClient = (PulsarClientImpl)
service.getPulsarClient();
- Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
- lookupField.setAccessible(true);
- Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
- "org.apache.pulsar.client.impl.BinaryProtoLookupService");
+ assertLookupIsBinaryProtoLookup(service, true);
service.close();
}
@@ -89,10 +89,7 @@ public class LookupProtocolTest {
conf.setBrokerClientTlsEnabled(true);
WebSocketService service = new WebSocketService(conf);
PulsarClientImpl testClient = (PulsarClientImpl)
service.getPulsarClient();
- Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
- lookupField.setAccessible(true);
- Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
- "org.apache.pulsar.client.impl.BinaryProtoLookupService");
+ assertLookupIsBinaryProtoLookup(service, true);
Assert.assertTrue(testClient.getConfiguration().isUseTls());
service.close();
}