This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d6071c71098 [improve][client]PIP-425:Support connecting with next 
available endpoint for multi-endpoint serviceUrls (#24387)
d6071c71098 is described below

commit d6071c71098a5e55db03ce4af83753a3d18c9a34
Author: Aurora Twinkle <[email protected]>
AuthorDate: Thu Jul 17 12:21:47 2025 +0800

    [improve][client]PIP-425:Support connecting with next available endpoint 
for multi-endpoint serviceUrls (#24387)
    
    Co-authored-by: duanlinlin <[email protected]>
    Co-authored-by: Zixuan Liu <[email protected]>
---
 .../client/impl/ServiceUrlQuarantineTest.java      | 283 +++++++++++++++++++++
 .../apache/pulsar/client/api/ClientBuilder.java    |  38 +++
 .../client/impl/BinaryProtoLookupService.java      |  25 +-
 .../pulsar/client/impl/ClientBuilderImpl.java      |  18 ++
 .../apache/pulsar/client/impl/ConnectionPool.java  |   8 +
 ...erviceNameResolver.java => EndpointStatus.java} |  55 +---
 .../org/apache/pulsar/client/impl/HttpClient.java  |  11 +-
 .../client/impl/PulsarServiceNameResolver.java     | 173 ++++++++++++-
 .../pulsar/client/impl/ServiceNameResolver.java    |   9 +
 .../client/impl/conf/ClientConfigurationData.java  |  14 +
 .../client/impl/BinaryProtoLookupServiceTest.java  |   1 +
 .../pulsar/client/impl/ClientTestFixtures.java     |   6 +-
 .../client/impl/PulsarServiceNameResolverTest.java |  63 ++++-
 13 files changed, 640 insertions(+), 64 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
new file mode 100644
index 00000000000..14da7bce4c1
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.common.net.ServiceURI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ServiceUrlQuarantineTest extends ProducerConsumerBase {
+    private static final Logger log = 
LoggerFactory.getLogger(ServiceUrlQuarantineTest.class);
+    private String binaryServiceUrlWithUnavailableNodes;
+    private String httpServiceUrlWithUnavailableNodes;
+    private PulsarClientImpl pulsarClientWithBinaryServiceUrl;
+    private PulsarClientImpl pulsarClientWithBinaryServiceUrlDisableQuarantine;
+    private PulsarClientImpl pulsarClientWithHttpServiceUrl;
+    private PulsarClientImpl pulsarClientWithHttpServiceUrlDisableQuarantine;
+    private static final int BROKER_SERVICE_PORT = 6666;
+    private static final int WEB_SERVICE_PORT = 8888;
+    private static final int UNAVAILABLE_NODES = 20;
+    private static final int TIMEOUT_SECONDS = 1;
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        // Create a Pulsar client with some unavailable nodes
+        StringBuilder binaryServiceUrlBuilder = new 
StringBuilder(pulsar.getBrokerServiceUrl());
+        StringBuilder httpServiceUrlBuilder = new 
StringBuilder(pulsar.getWebServiceAddress());
+        for (int i = 0; i < UNAVAILABLE_NODES; i++) {
+            
binaryServiceUrlBuilder.append(",127.0.0.1:").append(ThreadLocalRandom.current().nextInt(100,
 1000));
+            
httpServiceUrlBuilder.append(",127.0.0.1:").append(ThreadLocalRandom.current().nextInt(100,
 1000));
+        }
+        this.binaryServiceUrlWithUnavailableNodes = 
binaryServiceUrlBuilder.toString();
+        this.httpServiceUrlWithUnavailableNodes = 
httpServiceUrlBuilder.toString();
+        this.pulsarClientWithBinaryServiceUrl =
+                (PulsarClientImpl) 
newPulsarClient(binaryServiceUrlWithUnavailableNodes, 0);
+        this.pulsarClientWithHttpServiceUrl = (PulsarClientImpl) 
newPulsarClient(
+                httpServiceUrlWithUnavailableNodes, 0);
+        this.pulsarClientWithBinaryServiceUrlDisableQuarantine =
+                (PulsarClientImpl)
+                        PulsarClient.builder()
+                                
.serviceUrl(binaryServiceUrlWithUnavailableNodes)
+                                .serviceUrlQuarantineInitDuration(0, 
TimeUnit.MILLISECONDS)
+                                .serviceUrlQuarantineMaxDuration(0, 
TimeUnit.MILLISECONDS)
+                                .operationTimeout(TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
+                                .lookupTimeout(TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
+                                .build();
+        this.pulsarClientWithHttpServiceUrlDisableQuarantine =
+                (PulsarClientImpl) PulsarClient.builder()
+                        .serviceUrl(httpServiceUrlWithUnavailableNodes)
+                        .serviceUrlQuarantineInitDuration(0, 
TimeUnit.MILLISECONDS)
+                        .serviceUrlQuarantineMaxDuration(0, 
TimeUnit.MILLISECONDS)
+                        .operationTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+                        .lookupTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+                        .build();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        this.conf.setBrokerServicePort(Optional.of(BROKER_SERVICE_PORT));
+        this.conf.setWebServicePort(Optional.of(WEB_SERVICE_PORT));
+    }
+
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+        clientBuilder.operationTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+                .lookupTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (pulsarClientWithBinaryServiceUrl != null) {
+            pulsarClientWithBinaryServiceUrl.close();
+        }
+        if (pulsarClientWithBinaryServiceUrlDisableQuarantine != null) {
+            pulsarClientWithBinaryServiceUrlDisableQuarantine.close();
+        }
+        if (pulsarClientWithHttpServiceUrl != null) {
+            pulsarClientWithHttpServiceUrl.close();
+        }
+        if (pulsarClientWithHttpServiceUrlDisableQuarantine != null) {
+            pulsarClientWithHttpServiceUrlDisableQuarantine.close();
+        }
+    }
+
+    @Test
+    public void testCreateConsumerProducerWithUnavailableBrokerNodes() throws 
Exception {
+        
pulsarClientWithBinaryServiceUrl.updateServiceUrl(binaryServiceUrlWithUnavailableNodes);
+        
pulsarClientWithHttpServiceUrl.updateServiceUrl(httpServiceUrlWithUnavailableNodes);
+        String topic = "persistent://my-property/my-ns/topic" + 
UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        int createCount = 20;
+        // 1. test binary service url
+        // trigger unhealthy address removal by creating consumers and 
producers
+        int successCount = 
createConsumerAndProducers(pulsarClientWithBinaryServiceUrl, createCount, 
topic);
+        assertTrue(successCount < createCount,
+                "Expected some creations to fail due to unavailable nodes, but 
all succeeded.");
+        // all unavailable nodes should have been removed
+        successCount = 
createConsumerAndProducers(pulsarClientWithBinaryServiceUrl, createCount, 
topic);
+        assertEquals(successCount, createCount,
+                "Expected all subscription creations to succeed, but only " + 
successCount + " succeeded.");
+
+        // test binary service url with disable quarantine
+        successCount =
+                
createConsumerAndProducers(pulsarClientWithBinaryServiceUrlDisableQuarantine, 
createCount, topic);
+        assertTrue(successCount < createCount,
+                "Expected some creations to fail due to unavailable nodes, but 
all succeeded.");
+        // no unavailable nodes should be removed since backoff is disabled
+        successCount =
+                
createConsumerAndProducers(pulsarClientWithBinaryServiceUrlDisableQuarantine, 
createCount, topic);
+        assertTrue(successCount < createCount,
+                "Expected all subscription creations to succeed, but only " + 
successCount + " succeeded.");
+
+        // 2. test http service url
+        // trigger unhealthy address removal by creating consumers and 
producers
+        successCount = 
createConsumerAndProducers(pulsarClientWithHttpServiceUrl, createCount, topic);
+        assertTrue(successCount < createCount,
+                "Expected some creations to fail due to unavailable nodes, but 
all succeeded.");
+        // all unavailable nodes should have been removed
+        successCount = 
createConsumerAndProducers(pulsarClientWithHttpServiceUrl, createCount, topic);
+        assertEquals(successCount, createCount,
+                "Expected some creations to fail due to unavailable nodes, but 
all succeeded.");
+
+        // test http service url with disable quarantine
+        successCount = 
createConsumerAndProducers(pulsarClientWithHttpServiceUrlDisableQuarantine, 
createCount, topic);
+        assertTrue(successCount < createCount,
+                "Expected some creations to fail due to unavailable nodes, but 
all succeeded.");
+        // no unavailable nodes should be removed since backoff is disabled
+        successCount = 
createConsumerAndProducers(pulsarClientWithHttpServiceUrlDisableQuarantine, 
createCount, topic);
+        assertTrue(successCount < createCount,
+                "Expected some creations to fail due to unavailable nodes, but 
all succeeded.");
+    }
+
+    private int createConsumerAndProducers(PulsarClientImpl pulsarClient, int 
createCount, String topic) {
+        int successCount = 0;
+        for (int i = 0; i < createCount; i++) {
+            String subName = "my-sub" + UUID.randomUUID();
+            try {
+                Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                        .subscriptionMode(SubscriptionMode.Durable)
+                        
.topic(topic).receiverQueueSize(1).subscriptionName(subName)
+                        .subscribe();
+                consumer.close();
+                Producer<byte[]> producer = pulsarClient.newProducer()
+                        .topic(topic)
+                        .create();
+                producer.close();
+                successCount++;
+            } catch (Exception e) {
+                log.warn("Failed to create consumer and producer {} for topic 
{}: {}", subName, topic, e.getMessage());
+            }
+        }
+        return successCount;
+    }
+
+    @Test
+    public void testServiceUrlHealthCheck() throws Exception {
+        doTestServiceUrlResolve(pulsarClientWithBinaryServiceUrl,
+                "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + 
BROKER_SERVICE_PORT,
+                InetSocketAddress.createUnresolved("127.0.0.1", 
BROKER_SERVICE_PORT), true);
+        doTestServiceUrlResolve(pulsarClientWithHttpServiceUrl,
+                "http://host1:6651,host2:6651,127.0.0.1:"; + WEB_SERVICE_PORT,
+                InetSocketAddress.createUnresolved("127.0.0.1", 
WEB_SERVICE_PORT), true);
+
+        
doTestServiceUrlResolve(pulsarClientWithBinaryServiceUrlDisableQuarantine,
+                "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + 
BROKER_SERVICE_PORT,
+                InetSocketAddress.createUnresolved("127.0.0.1", 
BROKER_SERVICE_PORT), false);
+        
doTestServiceUrlResolve(pulsarClientWithHttpServiceUrlDisableQuarantine,
+                "http://host1:6651,host2:6651,127.0.0.1:"; + WEB_SERVICE_PORT,
+                InetSocketAddress.createUnresolved("127.0.0.1", 
WEB_SERVICE_PORT), false);
+    }
+
+    private void doTestServiceUrlResolve(PulsarClientImpl pulsarClient, String 
serviceUrl,
+                                         InetSocketAddress healthyAddress, 
boolean enableQuarantine)
+            throws Exception {
+        LookupService resolver = pulsarClient.getLookup();
+        resolver.updateServiceUrl(serviceUrl);
+        assertEquals(serviceUrl, resolver.getServiceUrl());
+
+        ServiceURI uri;
+        try {
+            uri = ServiceURI.create(serviceUrl);
+        } catch (IllegalArgumentException iae) {
+            log.error("Invalid service-url {} provided {}", serviceUrl, 
iae.getMessage(), iae);
+            throw new PulsarClientException.InvalidServiceURL(iae);
+        }
+        String[] hosts = uri.getServiceHosts();
+        Set<InetSocketAddress> originAllAddresses = new 
HashSet<>(hosts.length);
+        for (String host : hosts) {
+            String hostUrl = uri.getServiceScheme() + "://" + host;
+            try {
+                URI hostUri = new URI(hostUrl);
+                
originAllAddresses.add(InetSocketAddress.createUnresolved(hostUri.getHost(), 
hostUri.getPort()));
+            } catch (URISyntaxException e) {
+                log.error("Invalid host provided {}", hostUrl, e);
+                throw new PulsarClientException.InvalidServiceURL(e);
+            }
+        }
+
+        for (int i = 0; i < 10; i++) {
+            assertTrue(originAllAddresses.contains(resolver.resolveHost()));
+        }
+        String topic = "persistent://my-property/my-ns/topic" + 
UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        // Create consumers to trigger unhealthy address removal
+        for (int i = 0; i < originAllAddresses.size(); i++) {
+            String subName = "my-sub" + UUID.randomUUID();
+            pulsarClient.newConsumer()
+                    .subscriptionMode(SubscriptionMode.Durable)
+                    
.topic(topic).receiverQueueSize(1).subscriptionName(subName)
+                    .subscribeAsync()
+                    .thenAccept(e -> {
+                        try {
+                            e.close();
+                        } catch (PulsarClientException e1) {
+                            log.warn("Failed to close consumer {} for topic 
{}: {}", subName, topic, e1.getMessage());
+                        }
+                    });
+        }
+        Uninterruptibles.sleepUninterruptibly(10, 
java.util.concurrent.TimeUnit.SECONDS);
+        // check if the unhealthy address is removed
+        Set<InetSocketAddress> expectedHealthyAddresses = new HashSet<>();
+        expectedHealthyAddresses.add(healthyAddress);
+
+        Set<InetSocketAddress> resolvedAddresses = new HashSet<>();
+        for (int i = 0; i < hosts.length; i++) {
+            if (enableQuarantine) {
+                
assertTrue(expectedHealthyAddresses.contains(resolver.resolveHost()));
+            } else {
+                resolvedAddresses.add(resolver.resolveHost());
+            }
+        }
+
+        if (!enableQuarantine) {
+            assertEquals(resolvedAddresses, originAllAddresses,
+                    "Expected all addresses to be healthy, but found: " + 
resolvedAddresses);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 74b7c084d5a..c1ef9ed819b 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -120,6 +120,44 @@ public interface ClientBuilder extends Serializable, 
Cloneable {
      */
     ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);
 
+    /**
+     * Configure the service URL init quarantine duration.
+     * For single host serviceUrl, this setting has no effect.
+     *
+     * <p>When the client is unable to connect to an endpoint from serviceUrl 
with multiple hosts, that endpoint
+     *  will be quarantined for a specific duration that is determined in a 
certain exponential way.
+     * The init value of a single quarantine duration is set by
+     * @param serviceUrlQuarantineInitDuration. A successful usage of the 
endpoint will reset the
+     * duration to the initial value and move it back to the available 
addresses pool.
+     *
+     * <p>
+     * A value of 0 means don't quarantine any endpoints even if they fail.
+     * @param serviceUrlQuarantineInitDuration the initial quarantine duration
+     * for unavailable endpoint. Defaults to 60 seconds.
+     * @param unit the time unit for the quarantine duration
+     * @return the client builder instance
+     */
+    ClientBuilder serviceUrlQuarantineInitDuration(long 
serviceUrlQuarantineInitDuration, TimeUnit unit);
+
+    /**
+     * Configure the service URL max quarantine duration.
+     * For single host serviceUrl, this setting has no effect.
+     *
+     * <p>When the client is unable to connect to an endpoint from serviceUrl 
with multiple hosts, that endpoint
+     * will be quarantined for a specific duration that is determined in a 
certain exponential way.
+     * The max value of a single quarantine duration is set by
+     * @param serviceUrlQuarantineMaxDuration. A successful usage of the 
endpoint will reset the
+     * duration to the initial value and move it back to the available 
addresses pool.
+     *
+     * <p>
+     * A value of 0 means don't quarantine any endpoints even if they fail.
+     * @param serviceUrlQuarantineMaxDuration the maximum quarantine duration 
for
+     * unavailable endpoint. Defaults to 1 day.
+     * @param unit the time unit for the quarantine duration
+     * @return the client builder instance
+     */
+    ClientBuilder serviceUrlQuarantineMaxDuration(long 
serviceUrlQuarantineMaxDuration, TimeUnit unit);
+
     /**
      * Configure the listenerName that the broker will return the 
corresponding `advertisedListener`.
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 795cdc6d693..4709a22e1c4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -115,7 +115,9 @@ public class BinaryProtoLookupService implements 
LookupService {
         this.useTls = useTls;
         this.scheduleExecutor = scheduleExecutor;
         this.maxLookupRedirects = 
client.getConfiguration().getMaxLookupRedirects();
-        this.serviceNameResolver = new PulsarServiceNameResolver();
+        this.serviceNameResolver =
+                new 
PulsarServiceNameResolver(client.getConfiguration().getServiceUrlQuarantineInitDurationMs(),
+                        
client.getConfiguration().getServiceUrlQuarantineMaxDurationMs());
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
 
@@ -188,8 +190,8 @@ public class BinaryProtoLookupService implements 
LookupService {
         final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
         try {
             return partitionedMetadataInProgress.computeIfAbsent(topicName, 
tpName -> {
-                CompletableFuture<PartitionedTopicMetadata> newFuture = 
getPartitionedTopicMetadata(
-                        serviceNameResolver.resolveHost(), topicName, 
metadataAutoCreationEnabled,
+                CompletableFuture<PartitionedTopicMetadata> newFuture = 
getPartitionedTopicMetadataAsync(
+                       topicName, metadataAutoCreationEnabled,
                         useFallbackForNonPIP344Brokers);
                 newFutureCreated.setValue(newFuture);
                 return newFuture;
@@ -281,19 +283,20 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }, lookupPinnedExecutor).exceptionally(connectionException -> {
+            serviceNameResolver.markHostAvailability(socketAddress, false);
             
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
         return addressFuture;
     }
 
-    private CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(InetSocketAddress socketAddress,
+    private CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadataAsync(
             TopicName topicName, boolean metadataAutoCreationEnabled, boolean 
useFallbackForNonPIP344Brokers) {
 
         long startTime = System.nanoTime();
         CompletableFuture<PartitionedTopicMetadata> partitionFuture = new 
CompletableFuture<>();
 
-        
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
+        
client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx
 -> {
             boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
             if (!metadataAutoCreationEnabled && 
!clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
                 if (useFallbackForNonPIP344Brokers) {
@@ -356,8 +359,7 @@ public class BinaryProtoLookupService implements 
LookupService {
             schemaFuture.completeExceptionally(new 
SchemaSerializationException("Empty schema version"));
             return schemaFuture;
         }
-        InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
-        
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
+        
client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx
 -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newGetSchema(requestId, 
topicName.toString(),
                 Optional.ofNullable(BytesSchemaVersion.of(version)));
@@ -403,12 +405,12 @@ public class BinaryProtoLookupService implements 
LookupService {
                 .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
                 .setMax(1, TimeUnit.MINUTES)
                 .create();
-        getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, 
backoff, opTimeoutMs, topicsFuture, mode,
+        getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, 
mode,
                 topicsPattern, topicsHash);
         return topicsFuture;
     }
 
-    private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
+    private void getTopicsUnderNamespace(
                                          NamespaceName namespace,
                                          Backoff backoff,
                                          AtomicLong remainingTime,
@@ -418,7 +420,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                                          String topicsHash) {
         long startTime = System.nanoTime();
 
-        
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
+        
client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx
 -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
                 namespace.toString(), requestId, mode, topicsPattern, 
topicsHash);
@@ -451,14 +453,13 @@ public class BinaryProtoLookupService implements 
LookupService {
                 log.warn("[namespace: {}] Could not get connection while 
getTopicsUnderNamespace -- Will try again in"
                                 + " {} ms", namespace, nextDelay);
                 remainingTime.addAndGet(-nextDelay);
-                getTopicsUnderNamespace(socketAddress, namespace, backoff, 
remainingTime, getTopicsResultFuture,
+                getTopicsUnderNamespace(namespace, backoff, remainingTime, 
getTopicsResultFuture,
                         mode, topicsPattern, topicsHash);
             }, nextDelay, TimeUnit.MILLISECONDS);
             return null;
         });
     }
 
-
     @Override
     public void close() throws Exception {
         if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && 
!lookupPinnedExecutor.isShutdown()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 02140fca8ab..09c668f753e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -100,6 +100,24 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    @Override
+    public ClientBuilder serviceUrlQuarantineInitDuration(long 
serviceUrlQuarantineInitDuration,
+                                                          TimeUnit unit) {
+        checkArgument(serviceUrlQuarantineInitDuration >= 0,
+                "serviceUrlQuarantineInitDuration needs to be >= 0");
+        
conf.setServiceUrlQuarantineInitDurationMs(unit.toMillis(serviceUrlQuarantineInitDuration));
+        return this;
+    }
+
+    @Override
+    public ClientBuilder serviceUrlQuarantineMaxDuration(long 
serviceUrlQuarantineMaxDuration,
+                                                         TimeUnit unit) {
+        checkArgument(serviceUrlQuarantineMaxDuration >= 0,
+                "serviceUrlQuarantineMaxDuration needs to be >= 0");
+        
conf.setServiceUrlQuarantineMaxDurationMs(unit.toMillis(serviceUrlQuarantineMaxDuration));
+        return this;
+    }
+
     @Override
     public ClientBuilder listenerName(String listenerName) {
         checkArgument(StringUtils.isNotBlank(listenerName), "Param 
listenerName must not be blank.");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 1eb603e28f5..d86f9ef1ea0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -216,6 +216,14 @@ public class ConnectionPool implements AutoCloseable {
         return signSafeMod(random.nextInt(), maxConnectionsPerHosts);
     }
 
+    public CompletableFuture<ClientCnx> getConnection(final 
ServiceNameResolver serviceNameResolver) {
+        InetSocketAddress address = serviceNameResolver.resolveHost();
+        CompletableFuture<ClientCnx> clientCnxCompletableFuture = 
getConnection(address);
+        clientCnxCompletableFuture.whenComplete(
+                (__, throwable) -> 
serviceNameResolver.markHostAvailability(address, throwable == null));
+        return clientCnxCompletableFuture;
+    }
+
     public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress 
address) {
         if (maxConnectionsPerHosts == 0) {
             return getConnection(address, address, -1);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
 b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EndpointStatus.java
similarity index 50%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/EndpointStatus.java
index 95f81d45bd7..38d9e22396e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EndpointStatus.java
@@ -19,47 +19,16 @@
 package org.apache.pulsar.client.impl;
 
 import java.net.InetSocketAddress;
-import java.net.URI;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
-import org.apache.pulsar.common.net.ServiceURI;
-
-/**
- * A service name resolver to resolve real socket address.
- */
-public interface ServiceNameResolver {
-
-    /**
-     * Resolve pulsar service url.
-     *
-     * @return resolve the service url to return a socket address
-     */
-    InetSocketAddress resolveHost();
-
-    /**
-     * Resolve pulsar service url.
-     * @return
-     */
-    URI resolveHostUri();
-
-    /**
-     * Get service url.
-     *
-     * @return service url
-     */
-    String getServiceUrl();
-
-    /**
-     * Get service uri.
-     *
-     * @return service uri
-     */
-    ServiceURI getServiceUri();
-
-    /**
-     * Update service url.
-     *
-     * @param serviceUrl service url
-     */
-    void updateServiceUrl(String serviceUrl) throws InvalidServiceURL;
-
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.pulsar.common.util.Backoff;
+
+@Data
+@AllArgsConstructor
+class EndpointStatus {
+    private InetSocketAddress socketAddress;
+    private Backoff quarantineBackoff;
+    private long lastUpdateTimeStampMs;
+    private long nextDelayMsToRecover;
+    private boolean isAvailable;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 8e448d801fa..a86e820af6b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -70,7 +70,8 @@ public class HttpClient implements Closeable {
 
     protected HttpClient(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
         this.authentication = conf.getAuthentication();
-        this.serviceNameResolver = new PulsarServiceNameResolver();
+        this.serviceNameResolver = new 
PulsarServiceNameResolver(conf.getServiceUrlQuarantineInitDurationMs(),
+                conf.getServiceUrlQuarantineMaxDurationMs());
         this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
 
         DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
@@ -114,7 +115,6 @@ public class HttpClient implements Closeable {
                 confBuilder.setSslEngineFactory(sslEngineFactory);
 
 
-
                 
confBuilder.setUseInsecureTrustManager(conf.isTlsAllowInsecureConnection());
                 
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
             } catch (Exception e) {
@@ -168,6 +168,8 @@ public class HttpClient implements Closeable {
             // auth complete, do real request
             authFuture.whenComplete((respHeaders, ex) -> {
                 if (ex != null) {
+                    serviceNameResolver.markHostAvailability(
+                            
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), 
false);
                     log.warn("[{}] Failed to perform http request at 
authentication stage: {}",
                         requestUrl, ex.getMessage());
                     future.completeExceptionally(new 
PulsarClientException(ex));
@@ -194,10 +196,14 @@ public class HttpClient implements Closeable {
 
                 
builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
                     if (t != null) {
+                        serviceNameResolver.markHostAvailability(
+                                
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), 
false);
                         log.warn("[{}] Failed to perform http request: {}", 
requestUrl, t.getMessage());
                         future.completeExceptionally(new 
PulsarClientException(t));
                         return;
                     }
+                    serviceNameResolver.markHostAvailability(
+                            
InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()), true);
 
                     // request not success
                     if (response2.getStatusCode() != 
HttpURLConnection.HTTP_OK) {
@@ -266,4 +272,5 @@ public class HttpClient implements Closeable {
             log.error("Failed to refresh SSL context", e);
         }
     }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index e47750be462..1b12a7c0267 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -19,15 +19,25 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.VisibleForTesting;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
 import org.apache.pulsar.common.net.ServiceURI;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 
 /**
  * The default implementation of {@link ServiceNameResolver}.
@@ -40,11 +50,38 @@ public class PulsarServiceNameResolver implements 
ServiceNameResolver {
     private static final AtomicIntegerFieldUpdater<PulsarServiceNameResolver> 
CURRENT_INDEX_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PulsarServiceNameResolver.class, 
"currentIndex");
     private volatile int currentIndex;
-    private volatile List<InetSocketAddress> addressList;
+    private volatile List<InetSocketAddress> allAddressList;
+    private volatile Set<InetSocketAddress> allAddressSet;
+    private volatile List<InetSocketAddress> availableAddressList;
+    private final Map<InetSocketAddress, EndpointStatus> hostAvailabilityMap = 
new ConcurrentHashMap<>();
+    private final long serviceUrlQuarantineInitDurationMs;
+    private final long serviceUrlQuarantineMaxDurationMs;
+    private final boolean enableServiceUrlQuarantine;
+
+    public PulsarServiceNameResolver() {
+        this(0, 0);
+    }
+
+    public PulsarServiceNameResolver(long serviceUrlQuarantineInitDurationMs, 
long serviceUrlQuarantineMaxDurationMs) {
+        this.serviceUrlQuarantineInitDurationMs = 
serviceUrlQuarantineInitDurationMs;
+        this.serviceUrlQuarantineMaxDurationMs = 
serviceUrlQuarantineMaxDurationMs;
+        this.enableServiceUrlQuarantine =
+                serviceUrlQuarantineInitDurationMs > 0 && 
serviceUrlQuarantineMaxDurationMs > 0;
+    }
 
     @Override
     public InetSocketAddress resolveHost() {
-        List<InetSocketAddress> list = addressList;
+        final List<InetSocketAddress> list;
+        List<InetSocketAddress> availableAddresses = availableAddressList;
+        if (availableAddresses != null && !availableAddresses.isEmpty()) {
+            list = availableAddresses;
+        } else {
+            // if no available address, use the original address list
+            list = allAddressList;
+            if (availableAddressList != null) {
+                log.warn("No available hosts found for service url: {}", 
serviceUrl);
+            }
+        }
         checkState(
             list != null, "No service url is provided yet");
         checkState(
@@ -75,7 +112,7 @@ public class PulsarServiceNameResolver implements 
ServiceNameResolver {
     }
 
     @Override
-    public void updateServiceUrl(String serviceUrl) throws InvalidServiceURL {
+    public synchronized void updateServiceUrl(String serviceUrl) throws 
InvalidServiceURL {
         ServiceURI uri;
         try {
             uri = ServiceURI.create(serviceUrl);
@@ -96,10 +133,17 @@ public class PulsarServiceNameResolver implements 
ServiceNameResolver {
                 throw new InvalidServiceURL(e);
             }
         }
-        this.addressList = addresses;
+        this.allAddressList = addresses;
+        this.allAddressSet = Set.copyOf(addresses);
         this.serviceUrl = serviceUrl;
         this.serviceUri = uri;
         this.currentIndex = randomIndex(addresses.size());
+        if (enableServiceUrlQuarantine) {
+            hostAvailabilityMap.keySet().retainAll(allAddressSet);
+            allAddressSet.forEach(
+                    address -> hostAvailabilityMap.putIfAbsent(address, 
createEndpointStatus(true, address)));
+            availableAddressList = new 
ArrayList<>(hostAvailabilityMap.keySet());
+        }
     }
 
     private static int randomIndex(int numAddresses) {
@@ -107,4 +151,125 @@ public class PulsarServiceNameResolver implements 
ServiceNameResolver {
                 ?
                 0 : 
io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses);
     }
+
+    /**
+     * The method is executed under a synchronized lock and cannot execute 
code that may block, such as network io.
+     * @param address the host address to mark availability for
+     * @param isAvailable true if the host is available, false otherwise
+     */
+    @Override
+    public synchronized void markHostAvailability(InetSocketAddress address, 
boolean isAvailable) {
+        if (!enableServiceUrlQuarantine) {
+            return;
+        }
+
+        if (!allAddressSet.contains(address)) {
+            // If the address is not part of the original service URL, we 
ignore it.
+            log.debug("Address {} is not part of the original service URL, 
ignoring availability update", address);
+            return;
+        }
+
+        AtomicBoolean availableHostsChanged = new AtomicBoolean(false);
+        hostAvailabilityMap.compute(address, (key, oldStatus) -> {
+            if (oldStatus == null) {
+                EndpointStatus endpointStatus = 
createEndpointStatus(isAvailable, key);
+                availableHostsChanged.set(true);
+                return endpointStatus;
+            }
+            if (oldStatus.isAvailable() != isAvailable) {
+                availableHostsChanged.set(true);
+            }
+            computeEndpointStatus(isAvailable, oldStatus);
+            return oldStatus;
+        });
+
+        hostAvailabilityMap.forEach((__, endpointStatus) -> {
+            if (!endpointStatus.isAvailable()) {
+                computeEndpointStatus(false, endpointStatus);
+                if (!availableHostsChanged.get() && 
endpointStatus.isAvailable()) {
+                    availableHostsChanged.set(true);
+                }
+            }
+        });
+
+        if (availableHostsChanged.get()) {
+            availableAddressList = hostAvailabilityMap.entrySet()
+                    .stream()
+                    .filter(entry -> entry.getValue().isAvailable() && 
allAddressSet.contains(entry.getKey()))
+                    .map(Map.Entry::getKey)
+                    .collect(Collectors.toList());
+            log.info("service name resolver available hosts changed, current 
available hosts: {}",
+                    availableAddressList);
+        }
+    }
+
+    @VisibleForTesting
+    List<InetSocketAddress> getAvailableAddressList() {
+        return availableAddressList;
+    }
+
+    /**
+     * Create an {@link EndpointStatus} for the given address.
+     * @param isAvailable the availability status of the endpoint
+     * @param inetSocketAddress the address of the endpoint
+     * @return a new {@link EndpointStatus} instance
+     */
+    private EndpointStatus createEndpointStatus(boolean isAvailable, 
InetSocketAddress inetSocketAddress) {
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(serviceUrlQuarantineInitDurationMs, 
TimeUnit.MILLISECONDS)
+                .setMax(serviceUrlQuarantineMaxDurationMs, 
TimeUnit.MILLISECONDS)
+                .create();
+        EndpointStatus endpointStatus =
+                new EndpointStatus(inetSocketAddress, backoff, 
System.currentTimeMillis(), 0,
+                        isAvailable);
+        if (!isAvailable) {
+            computeEndpointStatus(false, endpointStatus);
+        }
+        return endpointStatus;
+    }
+
+    /**
+     * Updates the endpoint's availability status based on the given input 
flag and internal quarantine logic.
+     *
+     * <p>This method applies the input flag directly, and includes a 
time-based self-healing mechanism: if the
+     * endpoint has been marked unavailable for a sufficient cooldown period 
(quarantine), it automatically transitions
+     * back to available even when {@code newIsAvailable} is {@code false}.
+     *
+     * <p>This allows the system to retry endpoints that were previously 
marked as unavailable.
+     * If the endpoint fails again after recovery, it is marked unavailable 
and re-enters quarantine
+     * with an exponentially increased delay before the next recovery attempt.
+     *
+     * <p>The backoff is only reset when a successful update occurs after 
recovery
+     * (i.e., when {@code newIsAvailable} is {@code true} and the endpoint was 
previously marked unavailable).
+     *
+     * @param newIsAvailable the new availability status of the endpoint
+     * @param status         the current status of the endpoint
+     */
+    private void computeEndpointStatus(boolean newIsAvailable, EndpointStatus 
status) {
+        if (!newIsAvailable) {
+            if (!status.isAvailable()) {
+                // from unavailable to unavailable, check if we need to try to 
recover
+                long elapsedTimeMsSinceLast = System.currentTimeMillis() - 
status.getLastUpdateTimeStampMs();
+                boolean needTryRecover = elapsedTimeMsSinceLast >= 
status.getNextDelayMsToRecover();
+                if (needTryRecover) {
+                    log.info("service name resolver try to recover host {} 
after {}", status.getSocketAddress(),
+                            Duration.ofMillis(elapsedTimeMsSinceLast));
+                    status.setAvailable(true);
+                    
status.setLastUpdateTimeStampMs(System.currentTimeMillis());
+                    
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next());
+                }
+            } else {
+                // from available to unavailable
+                status.setAvailable(false);
+                status.setLastUpdateTimeStampMs(System.currentTimeMillis());
+                
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next());
+            }
+        } else if (!status.isAvailable()) {
+            // from unavailable to available
+            status.setAvailable(true);
+            status.setLastUpdateTimeStampMs(System.currentTimeMillis());
+            status.setNextDelayMsToRecover(0);
+            status.getQuarantineBackoff().reset();
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
index 95f81d45bd7..3fa410905f6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
@@ -62,4 +62,13 @@ public interface ServiceNameResolver {
      */
     void updateServiceUrl(String serviceUrl) throws InvalidServiceURL;
 
+    /**
+     * Mark the availability of a host.
+     * @param address the host address to mark availability for
+     * @param isAvailable true if the host is available, false otherwise
+     */
+    default void markHostAvailability(InetSocketAddress address, boolean 
isAvailable){
+        // Default implementation does nothing
+        // Subclass can override this method to implement host availability 
tracking
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index c4fbf1e1744..72b9cf0c191 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -70,6 +70,20 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
     @JsonIgnore
     private transient ServiceUrlProvider serviceUrlProvider;
 
+    @ApiModelProperty(
+            name = "serviceUrlQuarantineInitDurationMs",
+            value = "The initial duration (in milliseconds) to quarantine 
endpoints that fail to connect."
+                    + "A value of 0 means don't quarantine any endpoints even 
if they fail."
+    )
+    private long serviceUrlQuarantineInitDurationMs = 60000;
+
+    @ApiModelProperty(
+            name = "serviceUrlQuarantineMaxDurationMs",
+            value = "The max duration (in milliseconds) to quarantine 
endpoints that fail to connect."
+                    + "A value of 0 means don't quarantine any endpoints even 
if they fail."
+    )
+    private long serviceUrlQuarantineMaxDurationMs = TimeUnit.DAYS.toMillis(1);
+
     @ApiModelProperty(
             name = "authentication",
             value = "Authentication settings of the client."
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index de3266b40e0..abd17903bc4 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -88,6 +88,7 @@ public class BinaryProtoLookupServiceTest {
 
         ConnectionPool cnxPool = mock(ConnectionPool.class);
         
when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(connectionFuture);
+        
when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(connectionFuture);
 
         ClientConfigurationData clientConfig = 
mock(ClientConfigurationData.class);
         doReturn(0).when(clientConfig).getMaxLookupRedirects();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index bcb3791be85..e966b00b4a2 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -38,6 +38,7 @@ import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.SucceededFuture;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -94,7 +95,10 @@ class ClientTestFixtures {
                 .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
         ConnectionPool connectionPoolMock = mock(ConnectionPool.class);
         when(clientMock.getCnxPool()).thenReturn(connectionPoolMock);
-        
when(connectionPoolMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+        
when(connectionPoolMock.getConnection(any(InetSocketAddress.class))).thenReturn(
+                CompletableFuture.completedFuture(clientCnxMock));
+        
when(connectionPoolMock.getConnection(any(ServiceNameResolver.class))).thenReturn(
+                CompletableFuture.completedFuture(clientCnxMock));
         when(connectionPoolMock.getConnection(any(), any(), anyInt()))
                 .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
         return clientMock;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
index f4ee2c0191d..4f62d6bc2ea 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
@@ -19,9 +19,11 @@
 package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.HashSet;
@@ -35,12 +37,13 @@ import org.testng.annotations.Test;
  * Unit test {@link PulsarServiceNameResolver}.
  */
 public class PulsarServiceNameResolverTest {
-
+    private static final int INIT_QUARANTINE_TIME_MS = 1000;
+    private static final int MAX_QUARANTINE_TIME_MS = 10000;
     private PulsarServiceNameResolver resolver;
 
     @BeforeMethod
     public void setup() {
-        this.resolver = new PulsarServiceNameResolver();
+        this.resolver = new PulsarServiceNameResolver(INIT_QUARANTINE_TIME_MS, 
MAX_QUARANTINE_TIME_MS);
         assertNull(resolver.getServiceUrl());
         assertNull(resolver.getServiceUri());
     }
@@ -128,4 +131,60 @@ public class PulsarServiceNameResolverTest {
             assertTrue(expectedHostUrls.contains(resolver.resolveHostUri()));
         }
     }
+
+    @Test
+    public void testRemoveUnavailableHost() throws InvalidServiceURL {
+        String serviceUrl = "pulsar+ssl://host1:6651,host2:6651,host3:6651";
+        resolver.updateServiceUrl(serviceUrl);
+        assertEquals(resolver.getServiceUrl(), serviceUrl);
+        assertEquals(resolver.getServiceUri(), ServiceURI.create(serviceUrl));
+
+        Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+        Set<URI> expectedHostUrls = new HashSet<>();
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host2", 
6651));
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host3", 
6651));
+        expectedHostUrls.add(URI.create("pulsar+ssl://host2:6651"));
+        expectedHostUrls.add(URI.create("pulsar+ssl://host3:6651"));
+        Set<InetSocketAddress> allOriginAddresses = new 
HashSet<>(expectedAddresses);
+        allOriginAddresses.add(InetSocketAddress.createUnresolved("host1", 
6651));
+
+        // Mark host1 as unavailable
+        
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host1", 
6651), false);
+        // Now host1 should be removed from the available hosts
+        for (int i = 0; i < 10; i++) {
+            InetSocketAddress inetSocketAddress = resolver.resolveHost();
+            assertNotEquals(inetSocketAddress.getHostName(), "host1");
+            assertTrue(expectedAddresses.contains(inetSocketAddress));
+
+            URI uri = resolver.resolveHostUri();
+            assertNotEquals(uri.getHost(), "host1");
+            assertTrue(expectedHostUrls.contains(uri));
+        }
+
+        // After backoff time, host1 should be recovery from the unavailable 
hosts
+        Uninterruptibles.sleepUninterruptibly(INIT_QUARANTINE_TIME_MS, 
java.util.concurrent.TimeUnit.MILLISECONDS);
+        // trigger the recovery of host1
+        
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host2", 
6651), true);
+
+        Set<InetSocketAddress> resolverAddresses = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            InetSocketAddress address = resolver.resolveHost();
+            resolverAddresses.add(address);
+        }
+        assertEquals(resolverAddresses, allOriginAddresses);
+
+        resolverAddresses.clear();
+        // Mark all hosts as unavailable
+        
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host1", 
6651), false);
+        
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host2", 
6651), false);
+        
resolver.markHostAvailability(InetSocketAddress.createUnresolved("host3", 
6651), false);
+
+        // After marking all hosts as unavailable, resolver should fall back 
to select from all origin host
+        assertTrue(resolver.getAvailableAddressList().isEmpty());
+        for (int i = 0; i < 10; i++) {
+            InetSocketAddress address = resolver.resolveHost();
+            resolverAddresses.add(address);
+        }
+        assertEquals(resolverAddresses, allOriginAddresses);
+    }
 }

Reply via email to