This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95a842414a7 [fix][test] Make LocalBookkeeperEnsemble robust to port
race and partial-setup cleanup (#25690)
95a842414a7 is described below
commit 95a842414a706246d53456f9a18bd8958aa6deea
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 6 05:53:03 2026 -0700
[fix][test] Make LocalBookkeeperEnsemble robust to port race and
partial-setup cleanup (#25690)
---
.../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 50 +++++++++++++++++-----
.../service/persistent/ShadowTopicRealBkTest.java | 14 ++++--
2 files changed, 49 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index b3c2b8de5bd..d8c67568304 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -506,23 +506,47 @@ public class LocalBookkeeperEnsemble {
public void stop() throws Exception {
if (null != streamStorage) {
log.debug("Local bk stream storage stopping ...");
- streamStorage.close();
+ try {
+ streamStorage.close();
+ } catch (Exception e) {
+ log.warn().exception(e).log("failed to shutdown stream
storage");
+ }
}
log.debug("Local ZK/BK stopping ...");
- for (LifecycleComponent bookie : bookieComponents) {
- try {
- if (bookie != null) {
- bookie.close();
+ if (bookieComponents != null) {
+ for (LifecycleComponent bookie : bookieComponents) {
+ try {
+ if (bookie != null) {
+ bookie.close();
+ }
+ } catch (Exception e) {
+ log.warn().exception(e).log("failed to shutdown bookie");
}
- } catch (Exception e) {
- log.warn().exception(e).log("failed to shutdown bookie");
}
}
- zkc.close();
- zks.shutdown();
- serverFactory.shutdown();
+ if (zkc != null) {
+ try {
+ zkc.close();
+ } catch (Exception e) {
+ log.warn().exception(e).log("failed to close zk client");
+ }
+ }
+ if (zks != null) {
+ try {
+ zks.shutdown();
+ } catch (Exception e) {
+ log.warn().exception(e).log("failed to shutdown zk server");
+ }
+ }
+ if (serverFactory != null) {
+ try {
+ serverFactory.shutdown();
+ } catch (Exception e) {
+ log.warn().exception(e).log("failed to shutdown zk server
factory");
+ }
+ }
if (zkDataCleanupManager != null) {
zkDataCleanupManager.shutdown();
@@ -530,7 +554,11 @@ public class LocalBookkeeperEnsemble {
log.debug("Local ZK/BK stopped");
for (File managedDir : temporaryDirectories) {
log.info().attr("directory", managedDir).log("deleting test
directory");
- FileUtils.deleteDirectory(managedDir);
+ try {
+ FileUtils.deleteDirectory(managedDir);
+ } catch (Exception e) {
+ log.warn().attr("directory",
managedDir).exception(e).log("failed to delete test directory");
+ }
}
temporaryDirectories.clear();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
index b0e572a826c..e58b8cdcce7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
@@ -31,7 +31,6 @@ import
org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -42,14 +41,17 @@ import org.testng.annotations.Test;
public class ShadowTopicRealBkTest {
private static final String cluster = "test";
- private final int zkPort = PortManager.nextLockedFreePort();
- private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
zkPort, PortManager::nextLockedFreePort);
+ // Pass 0 for both ZK and bookie ports so the kernel picks free ports at
bind time, avoiding
+ // any JVM-vs-OS race on pre-allocated ports. The actual ZK port is read
back via
+ // bk.getZookeeperPort().
+ private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
0, () -> 0);
private PulsarService pulsar;
private PulsarAdmin admin;
@BeforeClass
public void setup() throws Exception {
bk.start();
+ final int zkPort = bk.getZookeeperPort();
final var config = new ServiceConfiguration();
config.setClusterName(cluster);
config.setAdvertisedAddress("localhost");
@@ -68,7 +70,11 @@ public class ShadowTopicRealBkTest {
@AfterClass(alwaysRun = true)
public void cleanup() throws Exception {
if (pulsar != null) {
- pulsar.close();
+ try {
+ pulsar.close();
+ } catch (Exception e) {
+ // best effort cleanup; setup may have failed before pulsar
was fully initialized
+ }
}
bk.stop();
}