This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ad31d18c80c [fix][client] PIP-425: fix updateServiceUrl and fix flaky ServiceUrlQuarantineTest (#24574) ad31d18c80c is described below commit ad31d18c80c6e7c4b54ff014d0d36f5dc29182e7 Author: Aurora Twinkle <foreverlove...@gmail.com> AuthorDate: Tue Jul 29 22:14:08 2025 +0800 [fix][client] PIP-425: fix updateServiceUrl and fix flaky ServiceUrlQuarantineTest (#24574) Co-authored-by: duanlinlin <duanlinl...@xiaohongshu.com> Co-authored-by: Lari Hotari <lhot...@users.noreply.github.com> --- .../client/impl/ServiceUrlQuarantineTest.java | 83 ++++++++++++---------- .../client/impl/PulsarServiceNameResolver.java | 7 +- 2 files changed, 52 insertions(+), 38 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java index 14da7bce4c1..a4fc46d079e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java @@ -18,9 +18,9 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import com.google.common.util.concurrent.Uninterruptibles; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -28,7 +28,6 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; @@ -38,6 +37,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.common.net.ServiceURI; +import org.apache.pulsar.common.util.PortManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -53,22 +53,25 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { private PulsarClientImpl pulsarClientWithBinaryServiceUrlDisableQuarantine; private PulsarClientImpl pulsarClientWithHttpServiceUrl; private PulsarClientImpl pulsarClientWithHttpServiceUrlDisableQuarantine; - private static final int BROKER_SERVICE_PORT = 6666; - private static final int WEB_SERVICE_PORT = 8888; + private int brokerServicePort; + private int webServicePort; + private final Set<Integer> lockedFreePortSet = new HashSet<>(); private static final int UNAVAILABLE_NODES = 20; - private static final int TIMEOUT_SECONDS = 1; + private static final int TIMEOUT_MS = 500; @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { + this.brokerServicePort = nextLockedFreePort(); + this.webServicePort = nextLockedFreePort(); super.internalSetup(); super.producerBaseSetup(); // Create a Pulsar client with some unavailable nodes StringBuilder binaryServiceUrlBuilder = new StringBuilder(pulsar.getBrokerServiceUrl()); StringBuilder httpServiceUrlBuilder = new StringBuilder(pulsar.getWebServiceAddress()); for (int i = 0; i < UNAVAILABLE_NODES; i++) { - binaryServiceUrlBuilder.append(",127.0.0.1:").append(ThreadLocalRandom.current().nextInt(100, 1000)); - httpServiceUrlBuilder.append(",127.0.0.1:").append(ThreadLocalRandom.current().nextInt(100, 1000)); + binaryServiceUrlBuilder.append(",127.0.0.1:").append(nextLockedFreePort()); + httpServiceUrlBuilder.append(",127.0.0.1:").append(nextLockedFreePort()); } this.binaryServiceUrlWithUnavailableNodes = binaryServiceUrlBuilder.toString(); this.httpServiceUrlWithUnavailableNodes = httpServiceUrlBuilder.toString(); @@ -82,30 +85,36 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { .serviceUrl(binaryServiceUrlWithUnavailableNodes) .serviceUrlQuarantineInitDuration(0, TimeUnit.MILLISECONDS) .serviceUrlQuarantineMaxDuration(0, TimeUnit.MILLISECONDS) - .operationTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) - .lookupTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) + .operationTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS) + .lookupTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS) .build(); this.pulsarClientWithHttpServiceUrlDisableQuarantine = (PulsarClientImpl) PulsarClient.builder() .serviceUrl(httpServiceUrlWithUnavailableNodes) .serviceUrlQuarantineInitDuration(0, TimeUnit.MILLISECONDS) .serviceUrlQuarantineMaxDuration(0, TimeUnit.MILLISECONDS) - .operationTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) - .lookupTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) + .operationTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS) + .lookupTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS) .build(); } + private int nextLockedFreePort() { + int newLockedFreePort = PortManager.nextLockedFreePort(); + this.lockedFreePortSet.add(newLockedFreePort); + return newLockedFreePort; + } + @Override protected void doInitConf() throws Exception { super.doInitConf(); - this.conf.setBrokerServicePort(Optional.of(BROKER_SERVICE_PORT)); - this.conf.setWebServicePort(Optional.of(WEB_SERVICE_PORT)); + this.conf.setBrokerServicePort(Optional.of(brokerServicePort)); + this.conf.setWebServicePort(Optional.of(webServicePort)); } @Override protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { - clientBuilder.operationTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS) - .lookupTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS); + clientBuilder.operationTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS) + .lookupTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS); } @@ -125,6 +134,9 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { if (pulsarClientWithHttpServiceUrlDisableQuarantine != null) { pulsarClientWithHttpServiceUrlDisableQuarantine.close(); } + for (Integer port : lockedFreePortSet) { + PortManager.releaseLockedPort(port); + } } @Test @@ -197,21 +209,21 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { return successCount; } - @Test + @Test(invocationCount = 10) public void testServiceUrlHealthCheck() throws Exception { doTestServiceUrlResolve(pulsarClientWithBinaryServiceUrl, - "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + BROKER_SERVICE_PORT, - InetSocketAddress.createUnresolved("127.0.0.1", BROKER_SERVICE_PORT), true); + "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + brokerServicePort, + InetSocketAddress.createUnresolved("127.0.0.1", brokerServicePort), true); doTestServiceUrlResolve(pulsarClientWithHttpServiceUrl, - "http://host1:6651,host2:6651,127.0.0.1:" + WEB_SERVICE_PORT, - InetSocketAddress.createUnresolved("127.0.0.1", WEB_SERVICE_PORT), true); + "http://host1:6651,host2:6651,127.0.0.1:" + webServicePort, + InetSocketAddress.createUnresolved("127.0.0.1", webServicePort), true); doTestServiceUrlResolve(pulsarClientWithBinaryServiceUrlDisableQuarantine, - "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + BROKER_SERVICE_PORT, - InetSocketAddress.createUnresolved("127.0.0.1", BROKER_SERVICE_PORT), false); + "pulsar+ssl://host1:6651,host2:6651,127.0.0.1:" + brokerServicePort, + InetSocketAddress.createUnresolved("127.0.0.1", brokerServicePort), false); doTestServiceUrlResolve(pulsarClientWithHttpServiceUrlDisableQuarantine, - "http://host1:6651,host2:6651,127.0.0.1:" + WEB_SERVICE_PORT, - InetSocketAddress.createUnresolved("127.0.0.1", WEB_SERVICE_PORT), false); + "http://host1:6651,host2:6651,127.0.0.1:" + webServicePort, + InetSocketAddress.createUnresolved("127.0.0.1", webServicePort), false); } private void doTestServiceUrlResolve(PulsarClientImpl pulsarClient, String serviceUrl, @@ -249,19 +261,16 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { // Create consumers to trigger unhealthy address removal for (int i = 0; i < originAllAddresses.size(); i++) { String subName = "my-sub" + UUID.randomUUID(); - pulsarClient.newConsumer() - .subscriptionMode(SubscriptionMode.Durable) - .topic(topic).receiverQueueSize(1).subscriptionName(subName) - .subscribeAsync() - .thenAccept(e -> { - try { - e.close(); - } catch (PulsarClientException e1) { - log.warn("Failed to close consumer {} for topic {}: {}", subName, topic, e1.getMessage()); - } - }); + try { + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .subscriptionMode(SubscriptionMode.Durable) + .topic(topic).receiverQueueSize(1).subscriptionName(subName) + .subscribe(); + consumer.closeAsync(); + } catch (PulsarClientException e) { + log.warn("Failed to create consumer {} for topic {}: {}", subName, topic, e.getMessage()); + } } - Uninterruptibles.sleepUninterruptibly(10, java.util.concurrent.TimeUnit.SECONDS); // check if the unhealthy address is removed Set<InetSocketAddress> expectedHealthyAddresses = new HashSet<>(); expectedHealthyAddresses.add(healthyAddress); @@ -269,7 +278,7 @@ public class ServiceUrlQuarantineTest extends ProducerConsumerBase { Set<InetSocketAddress> resolvedAddresses = new HashSet<>(); for (int i = 0; i < hosts.length; i++) { if (enableQuarantine) { - assertTrue(expectedHealthyAddresses.contains(resolver.resolveHost())); + assertThat(expectedHealthyAddresses).contains(resolver.resolveHost()); } else { resolvedAddresses.add(resolver.resolveHost()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java index 1b12a7c0267..1b23b6ce2fa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java @@ -142,7 +142,12 @@ public class PulsarServiceNameResolver implements ServiceNameResolver { hostAvailabilityMap.keySet().retainAll(allAddressSet); allAddressSet.forEach( address -> hostAvailabilityMap.putIfAbsent(address, createEndpointStatus(true, address))); - availableAddressList = new ArrayList<>(hostAvailabilityMap.keySet()); + // inherited availability status + availableAddressList = hostAvailabilityMap.entrySet() + .stream() + .filter(entry -> entry.getValue().isAvailable() && allAddressSet.contains(entry.getKey())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); } }