merlimat closed pull request #2725: Fixed intermittent test failures with "bind 
error"
URL: https://github.com/apache/pulsar/pull/2725
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index cda002bf73..233d6b7b36 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -72,7 +72,7 @@
     void setup() throws Exception {
         log.info("---- Initializing SLAMonitoringTest -----");
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
 
         // start brokers
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 6c20c7762b..8088637855 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -112,7 +112,7 @@ private static Object getField(final Object instance, final 
String fieldName) th
     void setup() throws Exception {
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
 
         // Start broker 1
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index d9e5b78f38..b90fa6faa2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -120,7 +120,7 @@
     @BeforeMethod
     void setup() throws Exception {
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
         ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(),
                 
SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 863833abab..0303fbfeec 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -146,7 +146,7 @@ private static void setField(final Object instance, final 
String fieldName, fina
     void setup() throws Exception {
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
 
         // Start broker 1
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 052c97e110..54c51fcd39 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -127,7 +127,7 @@
     void setup() throws Exception {
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
 
         // Start broker 1
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index dda76b17e6..3990168aa2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -44,7 +45,7 @@
 
     @BeforeMethod
     public void setup() throws Exception {
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
         ServiceConfiguration config = new ServiceConfiguration();
         config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 00265594a8..97155ee233 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -53,7 +54,7 @@
 /**
  */
 public class BacklogQuotaManagerTest {
-    protected static int BROKER_SERVICE_PORT = 16650;
+    protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
     PulsarService pulsar;
     ServiceConfiguration config;
 
@@ -62,15 +63,15 @@
 
     LocalBookkeeperEnsemble bkEnsemble;
 
-    private final int ZOOKEEPER_PORT = 12759;
-    protected final int BROKER_WEBSERVICE_PORT = 15782;
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
     private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
 
     @BeforeMethod
     void setup() throws Exception {
         try {
             // start local bookie and zookeeper
-            bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
+            bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
             bkEnsemble.start();
 
             // start pulsar service
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 94823ad08e..cc82ff4c55 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -37,6 +37,7 @@
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -60,7 +61,7 @@
 /**
  */
 public class BrokerBkEnsemblesTests {
-    protected static int BROKER_SERVICE_PORT = 16650;
+    protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
     protected PulsarService pulsar;
     ServiceConfiguration config;
 
@@ -69,10 +70,9 @@
 
     LocalBookkeeperEnsemble bkEnsemble;
 
-    private final int ZOOKEEPER_PORT = 12759;
-    protected final int BROKER_WEBSERVICE_PORT = 15782;
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
 
-    protected final int bkBasePort = 5001;
     private final int numberOfBookies;
 
     public BrokerBkEnsemblesTests() {
@@ -87,7 +87,7 @@ public BrokerBkEnsemblesTests(int numberOfBookies) {
     protected void setup() throws Exception {
         try {
             // start local bookie and zookeeper
-            bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 
ZOOKEEPER_PORT, 5001);
+            bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 
ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
             bkEnsemble.start();
 
             // start pulsar service
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 2f599eeac8..10754b919e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -105,7 +105,7 @@ void setup() throws Exception {
 
         // Start region 1
         int zkPort1 = PortManager.nextFreePort();
-        bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, 
PortManager.nextFreePort());
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> 
PortManager.nextFreePort());
         bkEnsemble1.start();
 
         int webServicePort1 = PortManager.nextFreePort();
@@ -143,7 +143,7 @@ void setup() throws Exception {
 
         // Start zk & bks
         int zkPort2 = PortManager.nextFreePort();
-        bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, 
PortManager.nextFreePort());
+        bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> 
PortManager.nextFreePort());
         bkEnsemble2.start();
 
         int webServicePort2 = PortManager.nextFreePort();
@@ -177,7 +177,7 @@ void setup() throws Exception {
 
         // Start zk & bks
         int zkPort3 = PortManager.nextFreePort();
-        bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, 
PortManager.nextFreePort());
+        bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> 
PortManager.nextFreePort());
         bkEnsemble3.start();
 
         int webServicePort3 = PortManager.nextFreePort();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
index fc5001b2a6..cdc7c06007 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
@@ -104,7 +104,7 @@ void setup() throws Exception {
 
         // Start region 1
         int zkPort1 = PortManager.nextFreePort();
-        bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, 
PortManager.nextFreePort());
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> 
PortManager.nextFreePort());
         bkEnsemble1.start();
 
         int webServicePort1 = PortManager.nextFreePort();
@@ -142,7 +142,7 @@ void setup() throws Exception {
 
         // Start zk & bks
         int zkPort2 = PortManager.nextFreePort();
-        bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, 
PortManager.nextFreePort());
+        bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> 
PortManager.nextFreePort());
         bkEnsemble2.start();
 
         int webServicePort2 = PortManager.nextFreePort();
@@ -176,7 +176,7 @@ void setup() throws Exception {
 
         // Start zk & bks
         int zkPort3 = PortManager.nextFreePort();
-        bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, 
PortManager.nextFreePort());
+        bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> 
PortManager.nextFreePort());
         bkEnsemble3.start();
 
         int webServicePort3 = PortManager.nextFreePort();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 29901b58a7..2e7a44407f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -871,7 +871,7 @@ void setupReplicationCluster() throws Exception {
 
             // Start region 1
             int zkPort1 = PortManager.nextFreePort();
-            bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, 
PortManager.nextFreePort());
+            bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> 
PortManager.nextFreePort());
             bkEnsemble1.start();
 
             int webServicePort1 = PortManager.nextFreePort();
@@ -901,7 +901,7 @@ void setupReplicationCluster() throws Exception {
 
             // Start zk & bks
             int zkPort2 = PortManager.nextFreePort();
-            bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, 
PortManager.nextFreePort());
+            bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> 
PortManager.nextFreePort());
             bkEnsemble2.start();
 
             int webServicePort2 = PortManager.nextFreePort();
@@ -927,7 +927,7 @@ void setupReplicationCluster() throws Exception {
 
             // Start zk & bks
             int zkPort3 = PortManager.nextFreePort();
-            bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, 
PortManager.nextFreePort());
+            bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> 
PortManager.nextFreePort());
             bkEnsemble3.start();
 
             int webServicePort3 = PortManager.nextFreePort();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 8f182595a9..0f9219d84e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -98,7 +98,7 @@ void setup(Method method) throws Exception {
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
 
         String brokerServiceUrl = "http://127.0.0.1:"; + brokerServicePort;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index a758c87420..996d931879 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -101,7 +101,7 @@ void setup(Method method) throws Exception {
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
 
         String brokerServiceUrl = "https://127.0.0.1:"; + 
brokerWebServiceTlsPort;
@@ -126,7 +126,7 @@ void setup(Method method) throws Exception {
         config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config.setTlsAllowInsecureConnection(true);
-    
+
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         urlTls = new URL(brokerServiceUrl);
@@ -160,11 +160,11 @@ void setup(Method method) throws Exception {
                     workerConfig.getClientAuthenticationParameters());
         }
         pulsarClient = clientBuilder.build();
-       
+
         TenantInfo propAdmin = new TenantInfo();
         
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
-       
+
         Thread.sleep(100);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 97de0b88ca..2b32fd0c60 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -92,7 +92,7 @@ void setup(Method method) throws Exception {
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
 
         config = spy(new ServiceConfiguration());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index dd39222248..907cf86c9b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -133,7 +133,7 @@ void setup(Method method) throws Exception {
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
         bkEnsemble.start();
 
         String brokerServiceUrl = "https://127.0.0.1:"; + 
brokerWebServiceTlsPort;
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 900dcf3dc3..e30e4b19c2 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -66,6 +66,14 @@
       <scope>test</scope>
     </dependency>
 
+     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <version>${project.parent.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-common</artifactId>
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index f7923e41eb..1ead796e99 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -39,6 +39,7 @@
 import java.nio.file.Paths;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
@@ -81,8 +82,25 @@
     int numberOfBookies;
     private boolean clearOldData = false;
 
-    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int 
bkBasePort) {
-        this(numberOfBookies, zkPort, bkBasePort, null, null, true);
+    private static class BasePortManager implements Supplier<Integer> {
+
+        private int port;
+
+        public BasePortManager(int basePort) {
+            this.port = basePort;
+        }
+
+        @Override
+        public synchronized Integer get() {
+            return port++;
+        }
+    }
+
+    private final Supplier<Integer> portManager;
+
+
+    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, 
Supplier<Integer> portManager) {
+        this(numberOfBookies, zkPort, 4181, null, null, true, null, 
portManager);
     }
 
     public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int 
bkBasePort, String zkDataDirName,
@@ -103,10 +121,22 @@ public LocalBookkeeperEnsemble(int numberOfBookies,
                                    String bkDataDirName,
                                    boolean clearOldData,
                                    String advertisedAddress) {
+        this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, 
clearOldData, advertisedAddress,
+                new BasePortManager(bkBasePort));
+    }
+
+    public LocalBookkeeperEnsemble(int numberOfBookies,
+            int zkPort,
+            int streamStoragePort,
+            String zkDataDirName,
+            String bkDataDirName,
+            boolean clearOldData,
+            String advertisedAddress,
+            Supplier<Integer> portManager) {
         this.numberOfBookies = numberOfBookies;
         this.HOSTPORT = "127.0.0.1:" + zkPort;
         this.ZooKeeperDefaultPort = zkPort;
-        this.initialPort = bkBasePort;
+        this.portManager = portManager;
         this.streamStoragePort = streamStoragePort;
         this.zkDataDirName = zkDataDirName;
         this.bkDataDirName = bkDataDirName;
@@ -128,7 +158,6 @@ public LocalBookkeeperEnsemble(int numberOfBookies,
     String bkDataDirName;
     BookieServer bs[];
     ServerConfiguration bsConfs[];
-    Integer initialPort = 5000;
 
     // Stream/Table Storage
     StreamStorageLifecycleComponent streamStorage;
@@ -221,7 +250,7 @@ private void runBookies(ServerConfiguration baseConf) 
throws Exception {
                 cleanDirectory(bkDataDir);
             }
 
-            int bookiePort = initialPort + i;
+            int bookiePort = portManager.get();
 
             // Ensure registration Z-nodes are cleared when standalone service 
is restarted ungracefully
             String registrationZnode = 
String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), 
bookiePort);
@@ -257,7 +286,7 @@ private void runBookies(ServerConfiguration baseConf) 
throws Exception {
                 bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE);
             }
             bs[i].start();
-            LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", 
i, initialPort + i,
+            LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", 
i, bookiePort,
                     bkDataDir.getAbsolutePath());
         }
     }
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
index 95735af6f2..a6dedaf766 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
@@ -18,18 +18,18 @@
  */
 package org.apache.pulsar.zookeeper;
 
-import java.io.File;
-
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertFalse;
+
+import java.io.File;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.commons.io.FileUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import org.apache.bookkeeper.test.PortManager;
-import org.apache.commons.io.FileUtils;
-import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 
 @Test
 public class LocalBookkeeperEnsembleTest {
@@ -62,16 +62,14 @@ void testStartStop() throws Exception {
 
         final int numBk = 1;
         final int zkPort = PortManager.nextFreePort();
-        final int bkPort = PortManager.nextFreePort();
 
         // Start local Bookies/ZooKeepers and confirm that they are running at 
specified ports
-        LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, 
zkPort, bkPort);
+        LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, 
zkPort, () -> PortManager.nextFreePort());
         ensemble.start();
         assertTrue(ensemble.getZkServer().isRunning());
         assertEquals(ensemble.getZkServer().getClientPort(), zkPort);
         assertTrue(ensemble.getZkClient().getState().isConnected());
         assertTrue(ensemble.getBookies()[0].isRunning());
-        assertEquals(ensemble.getBookies()[0].getLocalAddress().getPort(), 
bkPort);
 
         // Stop local Bookies/ZooKeepers and confirm that they are correctly 
closed
         ensemble.stop();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to