merlimat closed pull request #2725: Fixed intermittent test failures with "bind error" URL: https://github.com/apache/pulsar/pull/2725
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index cda002bf73..233d6b7b36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -72,7 +72,7 @@ void setup() throws Exception { log.info("---- Initializing SLAMonitoringTest -----"); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); // start brokers diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index 6c20c7762b..8088637855 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -112,7 +112,7 @@ private static Object getField(final Object instance, final String fieldName) th void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); // Start broker 1 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index d9e5b78f38..b90fa6faa2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -120,7 +120,7 @@ @BeforeMethod void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(), SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index 863833abab..0303fbfeec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -146,7 +146,7 @@ private static void setField(final Object instance, final String fieldName, fina void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); // Start broker 1 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index 052c97e110..54c51fcd39 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -127,7 +127,7 @@ void setup() throws Exception { // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); // Start broker 1 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java index dda76b17e6..3990168aa2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; @@ -44,7 +45,7 @@ @BeforeMethod public void setup() throws Exception { - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); ServiceConfiguration config = new ServiceConfiguration(); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 00265594a8..97155ee233 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -53,7 +54,7 @@ /** */ public class BacklogQuotaManagerTest { - protected static int BROKER_SERVICE_PORT = 16650; + protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort(); PulsarService pulsar; ServiceConfiguration config; @@ -62,15 +63,15 @@ LocalBookkeeperEnsemble bkEnsemble; - private final int ZOOKEEPER_PORT = 12759; - protected final int BROKER_WEBSERVICE_PORT = 15782; + private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); + protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5; @BeforeMethod void setup() throws Exception { try { // start local bookie and zookeeper - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); // start pulsar service diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 94823ad08e..cc82ff4c55 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.bookkeeper.test.PortManager; import org.apache.bookkeeper.util.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -60,7 +61,7 @@ /** */ public class BrokerBkEnsemblesTests { - protected static int BROKER_SERVICE_PORT = 16650; + protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort(); protected PulsarService pulsar; ServiceConfiguration config; @@ -69,10 +70,9 @@ LocalBookkeeperEnsemble bkEnsemble; - private final int ZOOKEEPER_PORT = 12759; - protected final int BROKER_WEBSERVICE_PORT = 15782; + private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); + protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); - protected final int bkBasePort = 5001; private final int numberOfBookies; public BrokerBkEnsemblesTests() { @@ -87,7 +87,7 @@ public BrokerBkEnsemblesTests(int numberOfBookies) { protected void setup() throws Exception { try { // start local bookie and zookeeper - bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, 5001); + bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); // start pulsar service diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 2f599eeac8..10754b919e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -105,7 +105,7 @@ void setup() throws Exception { // Start region 1 int zkPort1 = PortManager.nextFreePort(); - bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort()); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort()); bkEnsemble1.start(); int webServicePort1 = PortManager.nextFreePort(); @@ -143,7 +143,7 @@ void setup() throws Exception { // Start zk & bks int zkPort2 = PortManager.nextFreePort(); - bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort()); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort()); bkEnsemble2.start(); int webServicePort2 = PortManager.nextFreePort(); @@ -177,7 +177,7 @@ void setup() throws Exception { // Start zk & bks int zkPort3 = PortManager.nextFreePort(); - bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort()); + bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort()); bkEnsemble3.start(); int webServicePort3 = PortManager.nextFreePort(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java index fc5001b2a6..cdc7c06007 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java @@ -104,7 +104,7 @@ void setup() throws Exception { // Start region 1 int zkPort1 = PortManager.nextFreePort(); - bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort()); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort()); bkEnsemble1.start(); int webServicePort1 = PortManager.nextFreePort(); @@ -142,7 +142,7 @@ void setup() throws Exception { // Start zk & bks int zkPort2 = PortManager.nextFreePort(); - bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort()); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort()); bkEnsemble2.start(); int webServicePort2 = PortManager.nextFreePort(); @@ -176,7 +176,7 @@ void setup() throws Exception { // Start zk & bks int zkPort3 = PortManager.nextFreePort(); - bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort()); + bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort()); bkEnsemble3.start(); int webServicePort3 = PortManager.nextFreePort(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 29901b58a7..2e7a44407f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -871,7 +871,7 @@ void setupReplicationCluster() throws Exception { // Start region 1 int zkPort1 = PortManager.nextFreePort(); - bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort()); + bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort()); bkEnsemble1.start(); int webServicePort1 = PortManager.nextFreePort(); @@ -901,7 +901,7 @@ void setupReplicationCluster() throws Exception { // Start zk & bks int zkPort2 = PortManager.nextFreePort(); - bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort()); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort()); bkEnsemble2.start(); int webServicePort2 = PortManager.nextFreePort(); @@ -927,7 +927,7 @@ void setupReplicationCluster() throws Exception { // Start zk & bks int zkPort3 = PortManager.nextFreePort(); - bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort()); + bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort()); bkEnsemble3.start(); int webServicePort3 = PortManager.nextFreePort(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 8f182595a9..0f9219d84e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -98,7 +98,7 @@ void setup(Method method) throws Exception { log.info("--- Setting up method {} ---", method.getName()); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); String brokerServiceUrl = "http://127.0.0.1:" + brokerServicePort; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index a758c87420..996d931879 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -101,7 +101,7 @@ void setup(Method method) throws Exception { log.info("--- Setting up method {} ---", method.getName()); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort; @@ -126,7 +126,7 @@ void setup(Method method) throws Exception { config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsAllowInsecureConnection(true); - + functionsWorkerService = createPulsarFunctionWorker(config); urlTls = new URL(brokerServiceUrl); @@ -160,11 +160,11 @@ void setup(Method method) throws Exception { workerConfig.getClientAuthenticationParameters()); } pulsarClient = clientBuilder.build(); - + TenantInfo propAdmin = new TenantInfo(); propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); admin.tenants().updateTenant(tenant, propAdmin); - + Thread.sleep(100); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 97de0b88ca..2b32fd0c60 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -92,7 +92,7 @@ void setup(Method method) throws Exception { log.info("--- Setting up method {} ---", method.getName()); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); config = spy(new ServiceConfiguration()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index dd39222248..907cf86c9b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -133,7 +133,7 @@ void setup(Method method) throws Exception { log.info("--- Setting up method {} ---", method.getName()); // Start local bookkeeper ensemble - bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort; diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml index 900dcf3dc3..e30e4b19c2 100644 --- a/pulsar-zookeeper-utils/pom.xml +++ b/pulsar-zookeeper-utils/pom.xml @@ -66,6 +66,14 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>managed-ledger-original</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-common</artifactId> diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index f7923e41eb..1ead796e99 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -39,6 +39,7 @@ import java.nio.file.Paths; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; @@ -81,8 +82,25 @@ int numberOfBookies; private boolean clearOldData = false; - public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort) { - this(numberOfBookies, zkPort, bkBasePort, null, null, true); + private static class BasePortManager implements Supplier<Integer> { + + private int port; + + public BasePortManager(int basePort) { + this.port = basePort; + } + + @Override + public synchronized Integer get() { + return port++; + } + } + + private final Supplier<Integer> portManager; + + + public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier<Integer> portManager) { + this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager); } public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName, @@ -103,10 +121,22 @@ public LocalBookkeeperEnsemble(int numberOfBookies, String bkDataDirName, boolean clearOldData, String advertisedAddress) { + this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress, + new BasePortManager(bkBasePort)); + } + + public LocalBookkeeperEnsemble(int numberOfBookies, + int zkPort, + int streamStoragePort, + String zkDataDirName, + String bkDataDirName, + boolean clearOldData, + String advertisedAddress, + Supplier<Integer> portManager) { this.numberOfBookies = numberOfBookies; this.HOSTPORT = "127.0.0.1:" + zkPort; this.ZooKeeperDefaultPort = zkPort; - this.initialPort = bkBasePort; + this.portManager = portManager; this.streamStoragePort = streamStoragePort; this.zkDataDirName = zkDataDirName; this.bkDataDirName = bkDataDirName; @@ -128,7 +158,6 @@ public LocalBookkeeperEnsemble(int numberOfBookies, String bkDataDirName; BookieServer bs[]; ServerConfiguration bsConfs[]; - Integer initialPort = 5000; // Stream/Table Storage StreamStorageLifecycleComponent streamStorage; @@ -221,7 +250,7 @@ private void runBookies(ServerConfiguration baseConf) throws Exception { cleanDirectory(bkDataDir); } - int bookiePort = initialPort + i; + int bookiePort = portManager.get(); // Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully String registrationZnode = String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort); @@ -257,7 +286,7 @@ private void runBookies(ServerConfiguration baseConf) throws Exception { bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE); } bs[i].start(); - LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, initialPort + i, + LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, bookiePort, bkDataDir.getAbsolutePath()); } } diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java index 95735af6f2..a6dedaf766 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java @@ -18,18 +18,18 @@ */ package org.apache.pulsar.zookeeper; -import java.io.File; - import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.assertFalse; + +import java.io.File; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.commons.io.FileUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import org.apache.bookkeeper.test.PortManager; -import org.apache.commons.io.FileUtils; -import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; @Test public class LocalBookkeeperEnsembleTest { @@ -62,16 +62,14 @@ void testStartStop() throws Exception { final int numBk = 1; final int zkPort = PortManager.nextFreePort(); - final int bkPort = PortManager.nextFreePort(); // Start local Bookies/ZooKeepers and confirm that they are running at specified ports - LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, zkPort, bkPort); + LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, zkPort, () -> PortManager.nextFreePort()); ensemble.start(); assertTrue(ensemble.getZkServer().isRunning()); assertEquals(ensemble.getZkServer().getClientPort(), zkPort); assertTrue(ensemble.getZkClient().getState().isConnected()); assertTrue(ensemble.getBookies()[0].isRunning()); - assertEquals(ensemble.getBookies()[0].getLocalAddress().getPort(), bkPort); // Stop local Bookies/ZooKeepers and confirm that they are correctly closed ensemble.stop(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services