This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d6071c71098 [improve][client]PIP-425:Support connecting with next
available endpoint for multi-endpoint serviceUrls (#24387)
d6071c71098 is described below
commit d6071c71098a5e55db03ce4af83753a3d18c9a34
Author: Aurora Twinkle <[email protected]>
AuthorDate: Thu Jul 17 12:21:47 2025 +0800
[improve][client]PIP-425:Support connecting with next available endpoint
for multi-endpoint serviceUrls (#24387)
Co-authored-by: duanlinlin <[email protected]>
Co-authored-by: Zixuan Liu <[email protected]>
---
.../client/impl/ServiceUrlQuarantineTest.java | 283 +++++++++++++++++++++
.../apache/pulsar/client/api/ClientBuilder.java | 38 +++
.../client/impl/BinaryProtoLookupService.java | 25 +-
.../pulsar/client/impl/ClientBuilderImpl.java | 18 ++
.../apache/pulsar/client/impl/ConnectionPool.java | 8 +
...erviceNameResolver.java => EndpointStatus.java} | 55 +---
.../org/apache/pulsar/client/impl/HttpClient.java | 11 +-
.../client/impl/PulsarServiceNameResolver.java | 173 ++++++++++++-
.../pulsar/client/impl/ServiceNameResolver.java | 9 +
.../client/impl/conf/ClientConfigurationData.java | 14 +
.../client/impl/BinaryProtoLookupServiceTest.java | 1 +
.../pulsar/client/impl/ClientTestFixtures.java | 6 +-
.../client/impl/PulsarServiceNameResolverTest.java | 63 ++++-
13 files changed, 640 insertions(+), 64 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
new file mode 100644
index 00000000000..14da7bce4c1
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.common.net.ServiceURI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ServiceUrlQuarantineTest extends ProducerConsumerBase {
+ private static final Logger log =
LoggerFactory.getLogger(ServiceUrlQuarantineTest.class);
+ private String binaryServiceUrlWithUnavailableNodes;
+ private String httpServiceUrlWithUnavailableNodes;
+ private PulsarClientImpl pulsarClientWithBinaryServiceUrl;
+ private PulsarClientImpl pulsarClientWithBinaryServiceUrlDisableQuarantine;
+ private PulsarClientImpl pulsarClientWithHttpServiceUrl;
+ private PulsarClientImpl pulsarClientWithHttpServiceUrlDisableQuarantine;
+ private static final int BROKER_SERVICE_PORT = 6666;
+ private static final int WEB_SERVICE_PORT = 8888;
+ private static final int UNAVAILABLE_NODES = 20;
+ private static final int TIMEOUT_SECONDS = 1;
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ // Create a Pulsar client with some unavailable nodes
+ StringBuilder binaryServiceUrlBuilder = new
StringBuilder(pulsar.getBrokerServiceUrl());
+ StringBuilder httpServiceUrlBuilder = new
StringBuilder(pulsar.getWebServiceAddress());
+ for (int i = 0; i < UNAVAILABLE_NODES; i++) {
+
binaryServiceUrlBuilder.append(",127.0.0.1:").append(ThreadLocalRandom.current().nextInt(100,
1000));
+
httpServiceUrlBuilder.append(",127.0.0.1:").append(ThreadLocalRandom.current().nextInt(100,
1000));
+ }
+ this.binaryServiceUrlWithUnavailableNodes =
binaryServiceUrlBuilder.toString();
+ this.httpServiceUrlWithUnavailableNodes =
httpServiceUrlBuilder.toString();
+ this.pulsarClientWithBinaryServiceUrl =
+ (PulsarClientImpl)
newPulsarClient(binaryServiceUrlWithUnavailableNodes, 0);
+ this.pulsarClientWithHttpServiceUrl = (PulsarClientImpl)
newPulsarClient(
+ httpServiceUrlWithUnavailableNodes, 0);
+ this.pulsarClientWithBinaryServiceUrlDisableQuarantine =
+ (PulsarClientImpl)
+ PulsarClient.builder()
+
.serviceUrl(binaryServiceUrlWithUnavailableNodes)
+ .serviceUrlQuarantineInitDuration(0,
TimeUnit.MILLISECONDS)
+ .serviceUrlQuarantineMaxDuration(0,
TimeUnit.MILLISECONDS)
+ .operationTimeout(TIMEOUT_SECONDS,
TimeUnit.SECONDS)
+ .lookupTimeout(TIMEOUT_SECONDS,
TimeUnit.SECONDS)
+ .build();
+ this.pulsarClientWithHttpServiceUrlDisableQuarantine =
+ (PulsarClientImpl) PulsarClient.builder()
+ .serviceUrl(httpServiceUrlWithUnavailableNodes)
+ .serviceUrlQuarantineInitDuration(0,
TimeUnit.MILLISECONDS)
+ .serviceUrlQuarantineMaxDuration(0,
TimeUnit.MILLISECONDS)
+ .operationTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .lookupTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .build();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ this.conf.setBrokerServicePort(Optional.of(BROKER_SERVICE_PORT));
+ this.conf.setWebServicePort(Optional.of(WEB_SERVICE_PORT));
+ }
+
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ clientBuilder.operationTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .lookupTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ if (pulsarClientWithBinaryServiceUrl != null) {
+ pulsarClientWithBinaryServiceUrl.close();
+ }
+ if (pulsarClientWithBinaryServiceUrlDisableQuarantine != null) {
+ pulsarClientWithBinaryServiceUrlDisableQuarantine.close();
+ }
+ if (pulsarClientWithHttpServiceUrl != null) {
+ pulsarClientWithHttpServiceUrl.close();
+ }
+ if (pulsarClientWithHttpServiceUrlDisableQuarantine != null) {
+ pulsarClientWithHttpServiceUrlDisableQuarantine.close();
+ }
+ }
+
+ @Test
+ public void testCreateConsumerProducerWithUnavailableBrokerNodes() throws
Exception {
+
pulsarClientWithBinaryServiceUrl.updateServiceUrl(binaryServiceUrlWithUnavailableNodes);
+
pulsarClientWithHttpServiceUrl.updateServiceUrl(httpServiceUrlWithUnavailableNodes);
+ String topic = "persistent://my-property/my-ns/topic" +
UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ int createCount = 20;
+ // 1. test binary service url
+ // trigger unhealthy address removal by creating consumers and
producers
+ int successCount =
createConsumerAndProducers(pulsarClientWithBinaryServiceUrl, createCount,
topic);
+ assertTrue(successCount < createCount,
+ "Expected some creations to fail due to unavailable nodes, but
all succeeded.");
+ // all unavailable nodes should have been removed
+ successCount =
createConsumerAndProducers(pulsarClientWithBinaryServiceUrl, createCount,
topic);
+ assertEquals(successCount, createCount,
+ "Expected all subscription creations to succeed, but only " +
successCount + " succeeded.");
+
+ // test binary service url with disable quarantine
+ successCount =
+
createConsumerAndProducers(pulsarClientWithBinaryServiceUrlDisableQuarantine,
createCount, topic);
+ assertTrue(successCount < createCount,
+ "Expected some creations to fail due to unavailable nodes, but
all succeeded.");
+ // no unavailable nodes should be removed since backoff is disabled
+ successCount =
+
createConsumerAndProducers(pulsarClientWithBinaryServiceUrlDisableQuarantine,
createCount, topic);
+ assertTrue(successCount < createCount,
+ "Expected all subscription creations to succeed, but only " +
successCount + " succeeded.");
+
+ // 2. test http service url
+ // trigger unhealthy address removal by creating consumers and
producers
+ successCount =
createConsumerAndProducers(pulsarClientWithHttpServiceUrl, createCount, topic);
+ assertTrue(successCount < createCount,
+ "Expected some creations to fail due to unavailable nodes, but
all succeeded.");
+ // all unavailable nodes should have been removed
+ successCount =
createConsumerAndProducers(pulsarClientWithHttpServiceUrl, createCount, topic);
+ assertEquals(successCount, createCount,
+ "Expected some creations to fail due to unavailable nodes, but
all succeeded.");
+
+ // test http service url with disable quarantine
+ successCount =
createConsumerAndProducers(pulsarClientWithHttpServiceUrlDisableQuarantine,
createCount, topic);
+ assertTrue(successCount < createCount,
+ "Expected some creations to fail due to unavailable nodes, but
all succeeded.");
+ // no unavailable nodes should be removed since backoff is disabled
+ successCount =
createConsumerAndProducers(pulsarClientWithHttpServiceUrlDisableQuarantine,
createCount, topic);
+ assertTrue(successCount < createCount,
+ "Expected some creations to fail due to unavailable nodes, but
all succeeded.");
+ }
+
+ private int createConsumerAndProducers(PulsarClientImpl pulsarClient, int
createCount, String topic) {
+ int successCount = 0;
+ for (int i = 0; i < createCount; i++) {
+ String subName = "my-sub" + UUID.randomUUID();
+ try {
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .subscriptionMode(SubscriptionMode.Durable)
+
.topic(topic).receiverQueueSize(1).subscriptionName(subName)
+ .subscribe();
+ consumer.close();
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ producer.close();
+ successCount++;
+ } catch (Exception e) {
+ log.warn("Failed to create consumer and producer {} for topic
{}: {}", subName, topic, e.getMessage());
+ }
+ }
+ return successCount;
+ }
+
+ @Test
+ public void testServiceUrlHealthCheck() throws Exception {
+ doTestServiceUrlResolve(pulsarClientWithBinaryServiceUrl,
+ "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" +
BROKER_SERVICE_PORT,
+ InetSocketAddress.createUnresolved("127.0.0.1",
BROKER_SERVICE_PORT), true);
+ doTestServiceUrlResolve(pulsarClientWithHttpServiceUrl,
+ "http://host1:6651,host2:6651,127.0.0.1:" + WEB_SERVICE_PORT,
+ InetSocketAddress.createUnresolved("127.0.0.1",
WEB_SERVICE_PORT), true);
+
+
doTestServiceUrlResolve(pulsarClientWithBinaryServiceUrlDisableQuarantine,
+ "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" +
BROKER_SERVICE_PORT,
+ InetSocketAddress.createUnresolved("127.0.0.1",
BROKER_SERVICE_PORT), false);
+
doTestServiceUrlResolve(pulsarClientWithHttpServiceUrlDisableQuarantine,
+ "http://host1:6651,host2:6651,127.0.0.1:" + WEB_SERVICE_PORT,
+ InetSocketAddress.createUnresolved("127.0.0.1",
WEB_SERVICE_PORT), false);
+ }
+
+ private void doTestServiceUrlResolve(PulsarClientImpl pulsarClient, String
serviceUrl,
+ InetSocketAddress healthyAddress,
boolean enableQuarantine)
+ throws Exception {
+ LookupService resolver = pulsarClient.getLookup();
+ resolver.updateServiceUrl(serviceUrl);
+ assertEquals(serviceUrl, resolver.getServiceUrl());
+
+ ServiceURI uri;
+ try {
+ uri = ServiceURI.create(serviceUrl);
+ } catch (IllegalArgumentException iae) {
+ log.error("Invalid service-url {} provided {}", serviceUrl,
iae.getMessage(), iae);
+ throw new PulsarClientException.InvalidServiceURL(iae);
+ }
+ String[] hosts = uri.getServiceHosts();
+ Set<InetSocketAddress> originAllAddresses = new
HashSet<>(hosts.length);
+ for (String host : hosts) {
+ String hostUrl = uri.getServiceScheme() + "://" + host;
+ try {
+ URI hostUri = new URI(hostUrl);
+
originAllAddresses.add(InetSocketAddress.createUnresolved(hostUri.getHost(),
hostUri.getPort()));
+ } catch (URISyntaxException e) {
+ log.error("Invalid host provided {}", hostUrl, e);
+ throw new PulsarClientException.InvalidServiceURL(e);
+ }
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertTrue(originAllAddresses.contains(resolver.resolveHost()));
+ }
+ String topic = "persistent://my-property/my-ns/topic" +
UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ // Create consumers to trigger unhealthy address removal
+ for (int i = 0; i < originAllAddresses.size(); i++) {
+ String subName = "my-sub" + UUID.randomUUID();
+ pulsarClient.newConsumer()
+ .subscriptionMode(SubscriptionMode.Durable)
+
.topic(topic).receiverQueueSize(1).subscriptionName(subName)
+ .subscribeAsync()
+ .thenAccept(e -> {
+ try {
+ e.close();
+ } catch (PulsarClientException e1) {
+ log.warn("Failed to close consumer {} for topic
{}: {}", subName, topic, e1.getMessage());
+ }
+ });
+ }
+ Uninterruptibles.sleepUninterruptibly(10,
java.util.concurrent.TimeUnit.SECONDS);
+ // check if the unhealthy address is removed
+ Set<InetSocketAddress> expectedHealthyAddresses = new HashSet<>();
+ expectedHealthyAddresses.add(healthyAddress);
+
+ Set<InetSocketAddress> resolvedAddresses = new HashSet<>();
+ for (int i = 0; i < hosts.length; i++) {
+ if (enableQuarantine) {
+
assertTrue(expectedHealthyAddresses.contains(resolver.resolveHost()));
+ } else {
+ resolvedAddresses.add(resolver.resolveHost());
+ }
+ }
+
+ if (!enableQuarantine) {
+ assertEquals(resolvedAddresses, originAllAddresses,
+ "Expected all addresses to be healthy, but found: " +
resolvedAddresses);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 74b7c084d5a..c1ef9ed819b 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -120,6 +120,44 @@ public interface ClientBuilder extends Serializable,
Cloneable {
*/
ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);
+ /**
+ * Configure the service URL init quarantine duration.
+ * For single host serviceUrl, this setting has no effect.
+ *
+ * <p>When the client is unable to connect to an endpoint from serviceUrl
with multiple hosts, that endpoint
+ * will be quarantined for a specific duration that is determined in a
certain exponential way.
+ * The init value of a single quarantine duration is set by
+ * @param serviceUrlQuarantineInitDuration. A successful usage of the
endpoint will reset the
+ * duration to the initial value and move it back to the available
addresses pool.
+ *
+ * <p>
+ * A value of 0 means don't quarantine any endpoints even if they fail.
+ * @param serviceUrlQuarantineInitDuration the initial quarantine duration
+ * for unavailable endpoint. Defaults to 60 seconds.
+ * @param unit the time unit for the quarantine duration
+ * @return the client builder instance
+ */
+ ClientBuilder serviceUrlQuarantineInitDuration(long
serviceUrlQuarantineInitDuration, TimeUnit unit);
+
+ /**
+ * Configure the service URL max quarantine duration.
+ * For single host serviceUrl, this setting has no effect.
+ *
+ * <p>When the client is unable to connect to an endpoint from serviceUrl
with multiple hosts, that endpoint
+ * will be quarantined for a specific duration that is determined in a
certain exponential way.
+ * The max value of a single quarantine duration is set by
+ * @param serviceUrlQuarantineMaxDuration. A successful usage of the
endpoint will reset the
+ * duration to the initial value and move it back to the available
addresses pool.
+ *
+ * <p>
+ * A value of 0 means don't quarantine any endpoints even if they fail.
+ * @param serviceUrlQuarantineMaxDuration the maximum quarantine duration
for
+ * unavailable endpoint. Defaults to 1 day.
+ * @param unit the time unit for the quarantine duration
+ * @return the client builder instance
+ */
+ ClientBuilder serviceUrlQuarantineMaxDuration(long
serviceUrlQuarantineMaxDuration, TimeUnit unit);
+
/**
* Configure the listenerName that the broker will return the
corresponding `advertisedListener`.
*
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 795cdc6d693..4709a22e1c4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -115,7 +115,9 @@ public class BinaryProtoLookupService implements
LookupService {
this.useTls = useTls;
this.scheduleExecutor = scheduleExecutor;
this.maxLookupRedirects =
client.getConfiguration().getMaxLookupRedirects();
- this.serviceNameResolver = new PulsarServiceNameResolver();
+ this.serviceNameResolver =
+ new
PulsarServiceNameResolver(client.getConfiguration().getServiceUrlQuarantineInitDurationMs(),
+
client.getConfiguration().getServiceUrlQuarantineMaxDurationMs());
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
@@ -188,8 +190,8 @@ public class BinaryProtoLookupService implements
LookupService {
final MutableObject<CompletableFuture> newFutureCreated = new
MutableObject<>();
try {
return partitionedMetadataInProgress.computeIfAbsent(topicName,
tpName -> {
- CompletableFuture<PartitionedTopicMetadata> newFuture =
getPartitionedTopicMetadata(
- serviceNameResolver.resolveHost(), topicName,
metadataAutoCreationEnabled,
+ CompletableFuture<PartitionedTopicMetadata> newFuture =
getPartitionedTopicMetadataAsync(
+ topicName, metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
newFutureCreated.setValue(newFuture);
return newFuture;
@@ -281,19 +283,20 @@ public class BinaryProtoLookupService implements
LookupService {
client.getCnxPool().releaseConnection(clientCnx);
});
}, lookupPinnedExecutor).exceptionally(connectionException -> {
+ serviceNameResolver.markHostAvailability(socketAddress, false);
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
return addressFuture;
}
- private CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(InetSocketAddress socketAddress,
+ private CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadataAsync(
TopicName topicName, boolean metadataAutoCreationEnabled, boolean
useFallbackForNonPIP344Brokers) {
long startTime = System.nanoTime();
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new
CompletableFuture<>();
-
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
+
client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx
-> {
boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
if (!metadataAutoCreationEnabled &&
!clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
if (useFallbackForNonPIP344Brokers) {
@@ -356,8 +359,7 @@ public class BinaryProtoLookupService implements
LookupService {
schemaFuture.completeExceptionally(new
SchemaSerializationException("Empty schema version"));
return schemaFuture;
}
- InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
-
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
+
client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx
-> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId,
topicName.toString(),
Optional.ofNullable(BytesSchemaVersion.of(version)));
@@ -403,12 +405,12 @@ public class BinaryProtoLookupService implements
LookupService {
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(1, TimeUnit.MINUTES)
.create();
- getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace,
backoff, opTimeoutMs, topicsFuture, mode,
+ getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture,
mode,
topicsPattern, topicsHash);
return topicsFuture;
}
- private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
+ private void getTopicsUnderNamespace(
NamespaceName namespace,
Backoff backoff,
AtomicLong remainingTime,
@@ -418,7 +420,7 @@ public class BinaryProtoLookupService implements
LookupService {
String topicsHash) {
long startTime = System.nanoTime();
-
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
+
client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx
-> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern,
topicsHash);
@@ -451,14 +453,13 @@ public class BinaryProtoLookupService implements
LookupService {
log.warn("[namespace: {}] Could not get connection while
getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
- getTopicsUnderNamespace(socketAddress, namespace, backoff,
remainingTime, getTopicsResultFuture,
+ getTopicsUnderNamespace(namespace, backoff, remainingTime,
getTopicsResultFuture,
mode, topicsPattern, topicsHash);
}, nextDelay, TimeUnit.MILLISECONDS);
return null;
});
}
-
@Override
public void close() throws Exception {
if (createdLookupPinnedExecutor && lookupPinnedExecutor != null &&
!lookupPinnedExecutor.isShutdown()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 02140fca8ab..09c668f753e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -100,6 +100,24 @@ public class ClientBuilderImpl implements ClientBuilder {
return this;
}
+ @Override
+ public ClientBuilder serviceUrlQuarantineInitDuration(long
serviceUrlQuarantineInitDuration,
+ TimeUnit unit) {
+ checkArgument(serviceUrlQuarantineInitDuration >= 0,
+ "serviceUrlQuarantineInitDuration needs to be >= 0");
+
conf.setServiceUrlQuarantineInitDurationMs(unit.toMillis(serviceUrlQuarantineInitDuration));
+ return this;
+ }
+
+ @Override
+ public ClientBuilder serviceUrlQuarantineMaxDuration(long
serviceUrlQuarantineMaxDuration,
+ TimeUnit unit) {
+ checkArgument(serviceUrlQuarantineMaxDuration >= 0,
+ "serviceUrlQuarantineMaxDuration needs to be >= 0");
+
conf.setServiceUrlQuarantineMaxDurationMs(unit.toMillis(serviceUrlQuarantineMaxDuration));
+ return this;
+ }
+
@Override
public ClientBuilder listenerName(String listenerName) {
checkArgument(StringUtils.isNotBlank(listenerName), "Param
listenerName must not be blank.");
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 1eb603e28f5..d86f9ef1ea0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -216,6 +216,14 @@ public class ConnectionPool implements AutoCloseable {
return signSafeMod(random.nextInt(), maxConnectionsPerHosts);
}
+ public CompletableFuture<ClientCnx> getConnection(final
ServiceNameResolver serviceNameResolver) {
+ InetSocketAddress address = serviceNameResolver.resolveHost();
+ CompletableFuture<ClientCnx> clientCnxCompletableFuture =
getConnection(address);
+ clientCnxCompletableFuture.whenComplete(
+ (__, throwable) ->
serviceNameResolver.markHostAvailability(address, throwable == null));
+ return clientCnxCompletableFuture;
+ }
+
public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress
address) {
if (maxConnectionsPerHosts == 0) {
return getConnection(address, address, -1);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EndpointStatus.java
similarity index 50%
copy from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
copy to
pulsar-client/src/main/java/org/apache/pulsar/client/impl/EndpointStatus.java
index 95f81d45bd7..38d9e22396e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EndpointStatus.java
@@ -19,47 +19,16 @@
package org.apache.pulsar.client.impl;
import java.net.InetSocketAddress;
-import java.net.URI;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
-import org.apache.pulsar.common.net.ServiceURI;
-
-/**
- * A service name resolver to resolve real socket address.
- */
-public interface ServiceNameResolver {
-
- /**
- * Resolve pulsar service url.
- *
- * @return resolve the service url to return a socket address
- */
- InetSocketAddress resolveHost();
-
- /**
- * Resolve pulsar service url.
- * @return
- */
- URI resolveHostUri();
-
- /**
- * Get service url.
- *
- * @return service url
- */
- String getServiceUrl();
-
- /**
- * Get service uri.
- *
- * @return service uri
- */
- ServiceURI getServiceUri();
-
- /**
- * Update service url.
- *
- * @param serviceUrl service url
- */
- void updateServiceUrl(String serviceUrl) throws InvalidServiceURL;
-
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.pulsar.common.util.Backoff;
+
+@Data
+@AllArgsConstructor
+class EndpointStatus {
+ private InetSocketAddress socketAddress;
+ private Backoff quarantineBackoff;
+ private long lastUpdateTimeStampMs;
+ private long nextDelayMsToRecover;
+ private boolean isAvailable;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 8e448d801fa..a86e820af6b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -70,7 +70,8 @@ public class HttpClient implements Closeable {
protected HttpClient(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) throws PulsarClientException {
this.authentication = conf.getAuthentication();
- this.serviceNameResolver = new PulsarServiceNameResolver();
+ this.serviceNameResolver = new
PulsarServiceNameResolver(conf.getServiceUrlQuarantineInitDurationMs(),
+ conf.getServiceUrlQuarantineMaxDurationMs());
this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
@@ -114,7 +115,6 @@ public class HttpClient implements Closeable {
confBuilder.setSslEngineFactory(sslEngineFactory);
-
confBuilder.setUseInsecureTrustManager(conf.isTlsAllowInsecureConnection());
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
} catch (Exception e) {
@@ -168,6 +168,8 @@ public class HttpClient implements Closeable {
// auth complete, do real request
authFuture.whenComplete((respHeaders, ex) -> {
if (ex != null) {
+ serviceNameResolver.markHostAvailability(
+
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()),
false);
log.warn("[{}] Failed to perform http request at
authentication stage: {}",
requestUrl, ex.getMessage());
future.completeExceptionally(new
PulsarClientException(ex));
@@ -194,10 +196,14 @@ public class HttpClient implements Closeable {
builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
if (t != null) {
+ serviceNameResolver.markHostAvailability(
+
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()),
false);
log.warn("[{}] Failed to perform http request: {}",
requestUrl, t.getMessage());
future.completeExceptionally(new
PulsarClientException(t));
return;
}
+ serviceNameResolver.markHostAvailability(
+
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), true);
// request not success
if (response2.getStatusCode() !=
HttpURLConnection.HTTP_OK) {
@@ -266,4 +272,5 @@ public class HttpClient implements Closeable {
log.error("Failed to refresh SSL context", e);
}
}
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index e47750be462..1b12a7c0267 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -19,15 +19,25 @@
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.VisibleForTesting;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.common.net.ServiceURI;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
/**
* The default implementation of {@link ServiceNameResolver}.
@@ -40,11 +50,38 @@ public class PulsarServiceNameResolver implements
ServiceNameResolver {
private static final AtomicIntegerFieldUpdater<PulsarServiceNameResolver>
CURRENT_INDEX_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PulsarServiceNameResolver.class,
"currentIndex");
private volatile int currentIndex;
- private volatile List<InetSocketAddress> addressList;
+ private volatile List<InetSocketAddress> allAddressList;
+ private volatile Set<InetSocketAddress> allAddressSet;
+ private volatile List<InetSocketAddress> availableAddressList;
+ private final Map<InetSocketAddress, EndpointStatus> hostAvailabilityMap =
new ConcurrentHashMap<>();
+ private final long serviceUrlQuarantineInitDurationMs;
+ private final long serviceUrlQuarantineMaxDurationMs;
+ private final boolean enableServiceUrlQuarantine;
+
+ public PulsarServiceNameResolver() {
+ this(0, 0);
+ }
+
+ public PulsarServiceNameResolver(long serviceUrlQuarantineInitDurationMs,
long serviceUrlQuarantineMaxDurationMs) {
+ this.serviceUrlQuarantineInitDurationMs =
serviceUrlQuarantineInitDurationMs;
+ this.serviceUrlQuarantineMaxDurationMs =
serviceUrlQuarantineMaxDurationMs;
+ this.enableServiceUrlQuarantine =
+ serviceUrlQuarantineInitDurationMs > 0 &&
serviceUrlQuarantineMaxDurationMs > 0;
+ }
@Override
public InetSocketAddress resolveHost() {
- List<InetSocketAddress> list = addressList;
+ final List<InetSocketAddress> list;
+ List<InetSocketAddress> availableAddresses = availableAddressList;
+ if (availableAddresses != null && !availableAddresses.isEmpty()) {
+ list = availableAddresses;
+ } else {
+ // if no available address, use the original address list
+ list = allAddressList;
+ if (availableAddressList != null) {
+ log.warn("No available hosts found for service url: {}",
serviceUrl);
+ }
+ }
checkState(
list != null, "No service url is provided yet");
checkState(
@@ -75,7 +112,7 @@ public class PulsarServiceNameResolver implements
ServiceNameResolver {
}
@Override
- public void updateServiceUrl(String serviceUrl) throws InvalidServiceURL {
+ public synchronized void updateServiceUrl(String serviceUrl) throws
InvalidServiceURL {
ServiceURI uri;
try {
uri = ServiceURI.create(serviceUrl);
@@ -96,10 +133,17 @@ public class PulsarServiceNameResolver implements
ServiceNameResolver {
throw new InvalidServiceURL(e);
}
}
- this.addressList = addresses;
+ this.allAddressList = addresses;
+ this.allAddressSet = Set.copyOf(addresses);
this.serviceUrl = serviceUrl;
this.serviceUri = uri;
this.currentIndex = randomIndex(addresses.size());
+ if (enableServiceUrlQuarantine) {
+ hostAvailabilityMap.keySet().retainAll(allAddressSet);
+ allAddressSet.forEach(
+ address -> hostAvailabilityMap.putIfAbsent(address,
createEndpointStatus(true, address)));
+ availableAddressList = new
ArrayList<>(hostAvailabilityMap.keySet());
+ }
}
private static int randomIndex(int numAddresses) {
@@ -107,4 +151,125 @@ public class PulsarServiceNameResolver implements
ServiceNameResolver {
?
0 :
io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses);
}
+
+ /**
+ * The method is executed under a synchronized lock and cannot execute
code that may block, such as network io.
+ * @param address the host address to mark availability for
+ * @param isAvailable true if the host is available, false otherwise
+ */
+ @Override
+ public synchronized void markHostAvailability(InetSocketAddress address,
boolean isAvailable) {
+ if (!enableServiceUrlQuarantine) {
+ return;
+ }
+
+ if (!allAddressSet.contains(address)) {
+ // If the address is not part of the original service URL, we
ignore it.
+ log.debug("Address {} is not part of the original service URL,
ignoring availability update", address);
+ return;
+ }
+
+ AtomicBoolean availableHostsChanged = new AtomicBoolean(false);
+ hostAvailabilityMap.compute(address, (key, oldStatus) -> {
+ if (oldStatus == null) {
+ EndpointStatus endpointStatus =
createEndpointStatus(isAvailable, key);
+ availableHostsChanged.set(true);
+ return endpointStatus;
+ }
+ if (oldStatus.isAvailable() != isAvailable) {
+ availableHostsChanged.set(true);
+ }
+ computeEndpointStatus(isAvailable, oldStatus);
+ return oldStatus;
+ });
+
+ hostAvailabilityMap.forEach((__, endpointStatus) -> {
+ if (!endpointStatus.isAvailable()) {
+ computeEndpointStatus(false, endpointStatus);
+ if (!availableHostsChanged.get() &&
endpointStatus.isAvailable()) {
+ availableHostsChanged.set(true);
+ }
+ }
+ });
+
+ if (availableHostsChanged.get()) {
+ availableAddressList = hostAvailabilityMap.entrySet()
+ .stream()
+ .filter(entry -> entry.getValue().isAvailable() &&
allAddressSet.contains(entry.getKey()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ log.info("service name resolver available hosts changed, current
available hosts: {}",
+ availableAddressList);
+ }
+ }
+
+ @VisibleForTesting
+ List<InetSocketAddress> getAvailableAddressList() {
+ return availableAddressList;
+ }
+
+ /**
+ * Create an {@link EndpointStatus} for the given address.
+ * @param isAvailable the availability status of the endpoint
+ * @param inetSocketAddress the address of the endpoint
+ * @return a new {@link EndpointStatus} instance
+ */
+ private EndpointStatus createEndpointStatus(boolean isAvailable,
InetSocketAddress inetSocketAddress) {
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(serviceUrlQuarantineInitDurationMs,
TimeUnit.MILLISECONDS)
+ .setMax(serviceUrlQuarantineMaxDurationMs,
TimeUnit.MILLISECONDS)
+ .create();
+ EndpointStatus endpointStatus =
+ new EndpointStatus(inetSocketAddress, backoff,
System.currentTimeMillis(), 0,
+ isAvailable);
+ if (!isAvailable) {
+ computeEndpointStatus(false, endpointStatus);
+ }
+ return endpointStatus;
+ }
+
+ /**
+ * Updates the endpoint's availability status based on the given input
flag and internal quarantine logic.
+ *
+ * <p>This method applies the input flag directly, and includes a
time-based self-healing mechanism: if the
+ * endpoint has been marked unavailable for a sufficient cooldown period
(quarantine), it automatically transitions
+ * back to available even when {@code newIsAvailable} is {@code false}.
+ *
+ * <p>This allows the system to retry endpoints that were previously
marked as unavailable.
+ * If the endpoint fails again after recovery, it is marked unavailable
and re-enters quarantine
+ * with an exponentially increased delay before the next recovery attempt.
+ *
+ * <p>The backoff is only reset when a successful update occurs after
recovery
+ * (i.e., when {@code newIsAvailable} is {@code true} and the endpoint was
previously marked unavailable).
+ *
+ * @param newIsAvailable the new availability status of the endpoint
+ * @param status the current status of the endpoint
+ */
+ private void computeEndpointStatus(boolean newIsAvailable, EndpointStatus
status) {
+ if (!newIsAvailable) {
+ if (!status.isAvailable()) {
+ // from unavailable to unavailable, check if we need to try to
recover
+ long elapsedTimeMsSinceLast = System.currentTimeMillis() -
status.getLastUpdateTimeStampMs();
+ boolean needTryRecover = elapsedTimeMsSinceLast >=
status.getNextDelayMsToRecover();
+ if (needTryRecover) {
+ log.info("service name resolver try to recover host {}
after {}", status.getSocketAddress(),
+ Duration.ofMillis(elapsedTimeMsSinceLast));
+ status.setAvailable(true);
+
status.setLastUpdateTimeStampMs(System.currentTimeMillis());
+
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next());
+ }
+ } else {
+ // from available to unavailable
+ status.setAvailable(false);
+ status.setLastUpdateTimeStampMs(System.currentTimeMillis());
+
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next());
+ }
+ } else if (!status.isAvailable()) {
+ // from unavailable to available
+ status.setAvailable(true);
+ status.setLastUpdateTimeStampMs(System.currentTimeMillis());
+ status.setNextDelayMsToRecover(0);
+ status.getQuarantineBackoff().reset();
+ }
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
index 95f81d45bd7..3fa410905f6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
@@ -62,4 +62,13 @@ public interface ServiceNameResolver {
*/
void updateServiceUrl(String serviceUrl) throws InvalidServiceURL;
+ /**
+ * Mark the availability of a host.
+ * @param address the host address to mark availability for
+ * @param isAvailable true if the host is available, false otherwise
+ */
+ default void markHostAvailability(InetSocketAddress address, boolean
isAvailable){
+ // Default implementation does nothing
+ // Subclass can override this method to implement host availability
tracking
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index c4fbf1e1744..72b9cf0c191 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -70,6 +70,20 @@ public class ClientConfigurationData implements
Serializable, Cloneable {
@JsonIgnore
private transient ServiceUrlProvider serviceUrlProvider;
+ @ApiModelProperty(
+ name = "serviceUrlQuarantineInitDurationMs",
+ value = "The initial duration (in milliseconds) to quarantine
endpoints that fail to connect."
+ + "A value of 0 means don't quarantine any endpoints even
if they fail."
+ )
+ private long serviceUrlQuarantineInitDurationMs = 60000;
+
+ @ApiModelProperty(
+ name = "serviceUrlQuarantineMaxDurationMs",
+ value = "The max duration (in milliseconds) to quarantine
endpoints that fail to connect."
+ + "A value of 0 means don't quarantine any endpoints even
if they fail."
+ )
+ private long serviceUrlQuarantineMaxDurationMs = TimeUnit.DAYS.toMillis(1);
+
@ApiModelProperty(
name = "authentication",
value = "Authentication settings of the client."
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index de3266b40e0..abd17903bc4 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -88,6 +88,7 @@ public class BinaryProtoLookupServiceTest {
ConnectionPool cnxPool = mock(ConnectionPool.class);
when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(connectionFuture);
+
when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(connectionFuture);
ClientConfigurationData clientConfig =
mock(ClientConfigurationData.class);
doReturn(0).when(clientConfig).getMaxLookupRedirects();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index bcb3791be85..e966b00b4a2 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -38,6 +38,7 @@ import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.SucceededFuture;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -94,7 +95,10 @@ class ClientTestFixtures {
.thenReturn(CompletableFuture.completedFuture(clientCnxMock));
ConnectionPool connectionPoolMock = mock(ConnectionPool.class);
when(clientMock.getCnxPool()).thenReturn(connectionPoolMock);
-
when(connectionPoolMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+
when(connectionPoolMock.getConnection(any(InetSocketAddress.class))).thenReturn(
+ CompletableFuture.completedFuture(clientCnxMock));
+
when(connectionPoolMock.getConnection(any(ServiceNameResolver.class))).thenReturn(
+ CompletableFuture.completedFuture(clientCnxMock));
when(connectionPoolMock.getConnection(any(), any(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(clientCnxMock));
return clientMock;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
index f4ee2c0191d..4f62d6bc2ea 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
@@ -19,9 +19,11 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashSet;
@@ -35,12 +37,13 @@ import org.testng.annotations.Test;
* Unit test {@link PulsarServiceNameResolver}.
*/
public class PulsarServiceNameResolverTest {
-
+ private static final int INIT_QUARANTINE_TIME_MS = 1000;
+ private static final int MAX_QUARANTINE_TIME_MS = 10000;
private PulsarServiceNameResolver resolver;
@BeforeMethod
public void setup() {
- this.resolver = new PulsarServiceNameResolver();
+ this.resolver = new PulsarServiceNameResolver(INIT_QUARANTINE_TIME_MS,
MAX_QUARANTINE_TIME_MS);
assertNull(resolver.getServiceUrl());
assertNull(resolver.getServiceUri());
}
@@ -128,4 +131,60 @@ public class PulsarServiceNameResolverTest {
assertTrue(expectedHostUrls.contains(resolver.resolveHostUri()));
}
}
+
+ @Test
+ public void testRemoveUnavailableHost() throws InvalidServiceURL {
+ String serviceUrl = "pulsar+ssl://host1:6651,host2:6651,host3:6651";
+ resolver.updateServiceUrl(serviceUrl);
+ assertEquals(resolver.getServiceUrl(), serviceUrl);
+ assertEquals(resolver.getServiceUri(), ServiceURI.create(serviceUrl));
+
+ Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+ Set<URI> expectedHostUrls = new HashSet<>();
+ expectedAddresses.add(InetSocketAddress.createUnresolved("host2",
6651));
+ expectedAddresses.add(InetSocketAddress.createUnresolved("host3",
6651));
+ expectedHostUrls.add(URI.create("pulsar+ssl://host2:6651"));
+ expectedHostUrls.add(URI.create("pulsar+ssl://host3:6651"));
+ Set<InetSocketAddress> allOriginAddresses = new
HashSet<>(expectedAddresses);
+ allOriginAddresses.add(InetSocketAddress.createUnresolved("host1",
6651));
+
+ // Mark host1 as unavailable
+
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host1",
6651), false);
+ // Now host1 should be removed from the available hosts
+ for (int i = 0; i < 10; i++) {
+ InetSocketAddress inetSocketAddress = resolver.resolveHost();
+ assertNotEquals(inetSocketAddress.getHostName(), "host1");
+ assertTrue(expectedAddresses.contains(inetSocketAddress));
+
+ URI uri = resolver.resolveHostUri();
+ assertNotEquals(uri.getHost(), "host1");
+ assertTrue(expectedHostUrls.contains(uri));
+ }
+
+ // After backoff time, host1 should be recovery from the unavailable
hosts
+ Uninterruptibles.sleepUninterruptibly(INIT_QUARANTINE_TIME_MS,
java.util.concurrent.TimeUnit.MILLISECONDS);
+ // trigger the recovery of host1
+
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host2",
6651), true);
+
+ Set<InetSocketAddress> resolverAddresses = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ InetSocketAddress address = resolver.resolveHost();
+ resolverAddresses.add(address);
+ }
+ assertEquals(resolverAddresses, allOriginAddresses);
+
+ resolverAddresses.clear();
+ // Mark all hosts as unavailable
+
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host1",
6651), false);
+
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host2",
6651), false);
+
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host3",
6651), false);
+
+ // After marking all hosts as unavailable, resolver should fall back
to select from all origin host
+ assertTrue(resolver.getAvailableAddressList().isEmpty());
+ for (int i = 0; i < 10; i++) {
+ InetSocketAddress address = resolver.resolveHost();
+ resolverAddresses.add(address);
+ }
+ assertEquals(resolverAddresses, allOriginAddresses);
+ }
}