This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ad31d18c80c [fix][client] PIP-425: fix updateServiceUrl and fix flaky 
ServiceUrlQuarantineTest (#24574)
ad31d18c80c is described below

commit ad31d18c80c6e7c4b54ff014d0d36f5dc29182e7
Author: Aurora Twinkle <foreverlove...@gmail.com>
AuthorDate: Tue Jul 29 22:14:08 2025 +0800

    [fix][client] PIP-425: fix updateServiceUrl and fix flaky 
ServiceUrlQuarantineTest (#24574)
    
    Co-authored-by: duanlinlin <duanlinl...@xiaohongshu.com>
    Co-authored-by: Lari Hotari <lhot...@users.noreply.github.com>
---
 .../client/impl/ServiceUrlQuarantineTest.java      | 83 ++++++++++++----------
 .../client/impl/PulsarServiceNameResolver.java     |  7 +-
 2 files changed, 52 insertions(+), 38 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
index 14da7bce4c1..a4fc46d079e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-import com.google.common.util.concurrent.Uninterruptibles;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -28,7 +28,6 @@ import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
@@ -38,6 +37,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.common.net.ServiceURI;
+import org.apache.pulsar.common.util.PortManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -53,22 +53,25 @@ public class ServiceUrlQuarantineTest extends 
ProducerConsumerBase {
     private PulsarClientImpl pulsarClientWithBinaryServiceUrlDisableQuarantine;
     private PulsarClientImpl pulsarClientWithHttpServiceUrl;
     private PulsarClientImpl pulsarClientWithHttpServiceUrlDisableQuarantine;
-    private static final int BROKER_SERVICE_PORT = 6666;
-    private static final int WEB_SERVICE_PORT = 8888;
+    private int brokerServicePort;
+    private int webServicePort;
+    private final Set<Integer> lockedFreePortSet = new HashSet<>();
     private static final int UNAVAILABLE_NODES = 20;
-    private static final int TIMEOUT_SECONDS = 1;
+    private static final int TIMEOUT_MS = 500;
 
     @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        this.brokerServicePort = nextLockedFreePort();
+        this.webServicePort = nextLockedFreePort();
         super.internalSetup();
         super.producerBaseSetup();
         // Create a Pulsar client with some unavailable nodes
         StringBuilder binaryServiceUrlBuilder = new 
StringBuilder(pulsar.getBrokerServiceUrl());
         StringBuilder httpServiceUrlBuilder = new 
StringBuilder(pulsar.getWebServiceAddress());
         for (int i = 0; i < UNAVAILABLE_NODES; i++) {
-            
binaryServiceUrlBuilder.append(",127.0.0.1:").append(ThreadLocalRandom.current().nextInt(100,
 1000));
-            
httpServiceUrlBuilder.append(",127.0.0.1:").append(ThreadLocalRandom.current().nextInt(100,
 1000));
+            
binaryServiceUrlBuilder.append(",127.0.0.1:").append(nextLockedFreePort());
+            
httpServiceUrlBuilder.append(",127.0.0.1:").append(nextLockedFreePort());
         }
         this.binaryServiceUrlWithUnavailableNodes = 
binaryServiceUrlBuilder.toString();
         this.httpServiceUrlWithUnavailableNodes = 
httpServiceUrlBuilder.toString();
@@ -82,30 +85,36 @@ public class ServiceUrlQuarantineTest extends 
ProducerConsumerBase {
                                 
.serviceUrl(binaryServiceUrlWithUnavailableNodes)
                                 .serviceUrlQuarantineInitDuration(0, 
TimeUnit.MILLISECONDS)
                                 .serviceUrlQuarantineMaxDuration(0, 
TimeUnit.MILLISECONDS)
-                                .operationTimeout(TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
-                                .lookupTimeout(TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
+                                .operationTimeout(TIMEOUT_MS, 
TimeUnit.MILLISECONDS)
+                                .lookupTimeout(TIMEOUT_MS, 
TimeUnit.MILLISECONDS)
                                 .build();
         this.pulsarClientWithHttpServiceUrlDisableQuarantine =
                 (PulsarClientImpl) PulsarClient.builder()
                         .serviceUrl(httpServiceUrlWithUnavailableNodes)
                         .serviceUrlQuarantineInitDuration(0, 
TimeUnit.MILLISECONDS)
                         .serviceUrlQuarantineMaxDuration(0, 
TimeUnit.MILLISECONDS)
-                        .operationTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
-                        .lookupTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+                        .operationTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                        .lookupTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS)
                         .build();
     }
 
+    private int nextLockedFreePort() {
+        int newLockedFreePort = PortManager.nextLockedFreePort();
+        this.lockedFreePortSet.add(newLockedFreePort);
+        return newLockedFreePort;
+    }
+
     @Override
     protected void doInitConf() throws Exception {
         super.doInitConf();
-        this.conf.setBrokerServicePort(Optional.of(BROKER_SERVICE_PORT));
-        this.conf.setWebServicePort(Optional.of(WEB_SERVICE_PORT));
+        this.conf.setBrokerServicePort(Optional.of(brokerServicePort));
+        this.conf.setWebServicePort(Optional.of(webServicePort));
     }
 
     @Override
     protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
-        clientBuilder.operationTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
-                .lookupTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        clientBuilder.operationTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .lookupTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS);
     }
 
 
@@ -125,6 +134,9 @@ public class ServiceUrlQuarantineTest extends 
ProducerConsumerBase {
         if (pulsarClientWithHttpServiceUrlDisableQuarantine != null) {
             pulsarClientWithHttpServiceUrlDisableQuarantine.close();
         }
+        for (Integer port : lockedFreePortSet) {
+            PortManager.releaseLockedPort(port);
+        }
     }
 
     @Test
@@ -197,21 +209,21 @@ public class ServiceUrlQuarantineTest extends 
ProducerConsumerBase {
         return successCount;
     }
 
-    @Test
+    @Test(invocationCount = 10)
     public void testServiceUrlHealthCheck() throws Exception {
         doTestServiceUrlResolve(pulsarClientWithBinaryServiceUrl,
-                "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + 
BROKER_SERVICE_PORT,
-                InetSocketAddress.createUnresolved("127.0.0.1", 
BROKER_SERVICE_PORT), true);
+                "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + 
brokerServicePort,
+                InetSocketAddress.createUnresolved("127.0.0.1", 
brokerServicePort), true);
         doTestServiceUrlResolve(pulsarClientWithHttpServiceUrl,
-                "http://host1:6651,host2:6651,127.0.0.1:"; + WEB_SERVICE_PORT,
-                InetSocketAddress.createUnresolved("127.0.0.1", 
WEB_SERVICE_PORT), true);
+                "http://host1:6651,host2:6651,127.0.0.1:"; + webServicePort,
+                InetSocketAddress.createUnresolved("127.0.0.1", 
webServicePort), true);
 
         
doTestServiceUrlResolve(pulsarClientWithBinaryServiceUrlDisableQuarantine,
-                "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + 
BROKER_SERVICE_PORT,
-                InetSocketAddress.createUnresolved("127.0.0.1", 
BROKER_SERVICE_PORT), false);
+                "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + 
brokerServicePort,
+                InetSocketAddress.createUnresolved("127.0.0.1", 
brokerServicePort), false);
         
doTestServiceUrlResolve(pulsarClientWithHttpServiceUrlDisableQuarantine,
-                "http://host1:6651,host2:6651,127.0.0.1:"; + WEB_SERVICE_PORT,
-                InetSocketAddress.createUnresolved("127.0.0.1", 
WEB_SERVICE_PORT), false);
+                "http://host1:6651,host2:6651,127.0.0.1:"; + webServicePort,
+                InetSocketAddress.createUnresolved("127.0.0.1", 
webServicePort), false);
     }
 
     private void doTestServiceUrlResolve(PulsarClientImpl pulsarClient, String 
serviceUrl,
@@ -249,19 +261,16 @@ public class ServiceUrlQuarantineTest extends 
ProducerConsumerBase {
         // Create consumers to trigger unhealthy address removal
         for (int i = 0; i < originAllAddresses.size(); i++) {
             String subName = "my-sub" + UUID.randomUUID();
-            pulsarClient.newConsumer()
-                    .subscriptionMode(SubscriptionMode.Durable)
-                    
.topic(topic).receiverQueueSize(1).subscriptionName(subName)
-                    .subscribeAsync()
-                    .thenAccept(e -> {
-                        try {
-                            e.close();
-                        } catch (PulsarClientException e1) {
-                            log.warn("Failed to close consumer {} for topic 
{}: {}", subName, topic, e1.getMessage());
-                        }
-                    });
+            try {
+                Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                        .subscriptionMode(SubscriptionMode.Durable)
+                        
.topic(topic).receiverQueueSize(1).subscriptionName(subName)
+                        .subscribe();
+                consumer.closeAsync();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to create consumer {} for topic {}: {}", 
subName, topic, e.getMessage());
+            }
         }
-        Uninterruptibles.sleepUninterruptibly(10, 
java.util.concurrent.TimeUnit.SECONDS);
         // check if the unhealthy address is removed
         Set<InetSocketAddress> expectedHealthyAddresses = new HashSet<>();
         expectedHealthyAddresses.add(healthyAddress);
@@ -269,7 +278,7 @@ public class ServiceUrlQuarantineTest extends 
ProducerConsumerBase {
         Set<InetSocketAddress> resolvedAddresses = new HashSet<>();
         for (int i = 0; i < hosts.length; i++) {
             if (enableQuarantine) {
-                
assertTrue(expectedHealthyAddresses.contains(resolver.resolveHost()));
+                
assertThat(expectedHealthyAddresses).contains(resolver.resolveHost());
             } else {
                 resolvedAddresses.add(resolver.resolveHost());
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index 1b12a7c0267..1b23b6ce2fa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -142,7 +142,12 @@ public class PulsarServiceNameResolver implements 
ServiceNameResolver {
             hostAvailabilityMap.keySet().retainAll(allAddressSet);
             allAddressSet.forEach(
                     address -> hostAvailabilityMap.putIfAbsent(address, 
createEndpointStatus(true, address)));
-            availableAddressList = new 
ArrayList<>(hostAvailabilityMap.keySet());
+            // inherited availability status
+            availableAddressList = hostAvailabilityMap.entrySet()
+                    .stream()
+                    .filter(entry -> entry.getValue().isAvailable() && 
allAddressSet.contains(entry.getKey()))
+                    .map(Map.Entry::getKey)
+                    .collect(Collectors.toList());
         }
     }
 

Reply via email to