This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cddf53f341b6ebbd86d6e2a8ab5a7a030d8e515c Author: Yike Xiao <[email protected]> AuthorDate: Fri Sep 12 16:12:37 2025 +0800 [fix][broker] Fix cannot shutdown broker gracefully by admin api (#24731) (cherry picked from commit 4169395d1aea6d2a12101ae4ef2cfcca2c6cc48c) --- .../org/apache/pulsar/broker/PulsarService.java | 16 +++++- .../pulsar/broker/admin/impl/BrokersBase.java | 2 +- .../org/apache/pulsar/broker/web/WebService.java | 58 +++++++++++++++------- .../apache/pulsar/broker/PulsarServiceTest.java | 27 ++++++++++ 4 files changed, 82 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 3d93355770f..5fd63502f8e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -480,8 +480,22 @@ public class PulsarService implements AutoCloseable, ShutdownService { /** * Close the current pulsar service. All resources are released. + * <p> + * This method is equivalent with {@code closeAsync(true)}. + * + * @see PulsarService#closeAsync(boolean) */ public CompletableFuture<Void> closeAsync() { + return closeAsync(true); + } + + /** + * Close the current pulsar service. + * + * @param waitForWebServiceToStop if true, waits for the web service to stop before returning from this method. + * @return a future which will be completed when the service is fully closed. + */ + public CompletableFuture<Void> closeAsync(boolean waitForWebServiceToStop) { mutex.lock(); try { // Close protocol handler before unloading namespace bundles because protocol handlers might maintain @@ -528,7 +542,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { if (this.webService != null) { try { - this.webService.close(); + this.webService.close(waitForWebServiceToStop); this.webService = null; } catch (Exception e) { LOG.error("Web service closing failed", e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 17bbead5771..1e4e4ff66dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -473,7 +473,7 @@ public class BrokersBase extends AdminResource { private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int maxConcurrentUnloadPerSec, boolean forcedTerminateTopic) { pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic); - return pulsar().closeAsync(); + return pulsar().closeAsync(false); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 7eb1f2fae09..96e8a516a6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -422,32 +422,52 @@ public class WebService implements AutoCloseable { @Override public void close() throws PulsarServerException { + close(true); + } + + public void close(boolean waitUtilServerStopped) throws PulsarServerException { try { - server.stop(); - // unregister statistics from Prometheus client's default CollectorRegistry singleton - // to prevent memory leaks in tests - if (jettyStatisticsCollector != null) { - try { - CollectorRegistry.defaultRegistry.unregister(jettyStatisticsCollector); - } catch (Exception e) { - // ignore any exception happening in unregister - // exception will be thrown for 2. instance of WebService in tests since - // the register supports a single JettyStatisticsCollector - } - jettyStatisticsCollector = null; - } - webServiceExecutor.join(); - if (this.sslContextRefreshTask != null) { - this.sslContextRefreshTask.cancel(true); + if (waitUtilServerStopped) { + doClose(); + } else { + Thread webServiceTerminator = new Thread(() -> { + try { + doClose(); + } catch (Exception e) { + log.error("Error while closing web service", e); + } + }); + webServiceTerminator.setName("pulsar-web-service-terminator"); + webServiceTerminator.start(); } - webExecutorThreadPoolStats.close(); - this.executorStats.close(); - log.info("Web service closed"); } catch (Exception e) { throw new PulsarServerException(e); } } + private void doClose() throws Exception { + server.stop(); + // unregister statistics from Prometheus client's default CollectorRegistry singleton + // to prevent memory leaks in tests + if (jettyStatisticsCollector != null) { + try { + CollectorRegistry.defaultRegistry.unregister(jettyStatisticsCollector); + } catch (Exception e) { + // ignore any exception happening in unregister + // exception will be thrown for 2. instance of WebService in tests since + // the register supports a single JettyStatisticsCollector + } + jettyStatisticsCollector = null; + } + webServiceExecutor.join(); + if (this.sslContextRefreshTask != null) { + this.sslContextRefreshTask.cancel(true); + } + webExecutorThreadPoolStats.close(); + this.executorStats.close(); + log.info("Web service closed"); + } + public Optional<Integer> getListenPortHTTP() { if (httpConnector != null) { return Optional.of(httpConnector.getLocalPort()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index b47011f2070..6c04889d8f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -23,14 +23,18 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertSame; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; @@ -312,4 +316,27 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest { pulsarService.close(); } } + + @Test + public void testShutdownViaAdminApi() throws Exception { + super.internalSetup(); + super.setupDefaultTenantAndNamespace(); + String topic = "persistent://public/default/testShutdownViaAdminApi"; + admin.topics().createNonPartitionedTopic(topic); + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .sendTimeout(5, TimeUnit.SECONDS) + .create(); + producer.send("message 1".getBytes()); + admin.brokers() + .shutDownBrokerGracefully(0, false) + .get(30, TimeUnit.SECONDS); + try { + producer.send("message 2".getBytes()); + fail("sending msg should timeout, because broker is down and there is only one broker"); + } catch (Exception e) { + assertTrue(e instanceof PulsarClientException.TimeoutException); + } + } }
