This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cddf53f341b6ebbd86d6e2a8ab5a7a030d8e515c
Author: Yike Xiao <[email protected]>
AuthorDate: Fri Sep 12 16:12:37 2025 +0800

    [fix][broker] Fix cannot shutdown broker gracefully by admin api (#24731)
    
    (cherry picked from commit 4169395d1aea6d2a12101ae4ef2cfcca2c6cc48c)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 16 +++++-
 .../pulsar/broker/admin/impl/BrokersBase.java      |  2 +-
 .../org/apache/pulsar/broker/web/WebService.java   | 58 +++++++++++++++-------
 .../apache/pulsar/broker/PulsarServiceTest.java    | 27 ++++++++++
 4 files changed, 82 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3d93355770f..5fd63502f8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -480,8 +480,22 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
     /**
      * Close the current pulsar service. All resources are released.
+     * <p>
+     * This method is equivalent with {@code closeAsync(true)}.
+     *
+     * @see PulsarService#closeAsync(boolean)
      */
     public CompletableFuture<Void> closeAsync() {
+        return closeAsync(true);
+    }
+
+    /**
+     * Close the current pulsar service.
+     *
+     * @param waitForWebServiceToStop if true, waits for the web service to 
stop before returning from this method.
+     * @return a future which will be completed when the service is fully 
closed.
+     */
+    public CompletableFuture<Void> closeAsync(boolean waitForWebServiceToStop) 
{
         mutex.lock();
         try {
             // Close protocol handler before unloading namespace bundles 
because protocol handlers might maintain
@@ -528,7 +542,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
             if (this.webService != null) {
                 try {
-                    this.webService.close();
+                    this.webService.close(waitForWebServiceToStop);
                     this.webService = null;
                 } catch (Exception e) {
                     LOG.error("Web service closing failed", e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 17bbead5771..1e4e4ff66dd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -473,7 +473,7 @@ public class BrokersBase extends AdminResource {
     private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int 
maxConcurrentUnloadPerSec,
                                                                     boolean 
forcedTerminateTopic) {
         
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec,
 forcedTerminateTopic);
-        return pulsar().closeAsync();
+        return pulsar().closeAsync(false);
     }
 
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 7eb1f2fae09..96e8a516a6c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -422,32 +422,52 @@ public class WebService implements AutoCloseable {
 
     @Override
     public void close() throws PulsarServerException {
+        close(true);
+    }
+
+    public void close(boolean waitUtilServerStopped) throws 
PulsarServerException {
         try {
-            server.stop();
-            // unregister statistics from Prometheus client's default 
CollectorRegistry singleton
-            // to prevent memory leaks in tests
-            if (jettyStatisticsCollector != null) {
-                try {
-                    
CollectorRegistry.defaultRegistry.unregister(jettyStatisticsCollector);
-                } catch (Exception e) {
-                    // ignore any exception happening in unregister
-                    // exception will be thrown for 2. instance of WebService 
in tests since
-                    // the register supports a single JettyStatisticsCollector
-                }
-                jettyStatisticsCollector = null;
-            }
-            webServiceExecutor.join();
-            if (this.sslContextRefreshTask != null) {
-                this.sslContextRefreshTask.cancel(true);
+            if (waitUtilServerStopped) {
+                doClose();
+            } else {
+                Thread webServiceTerminator = new Thread(() -> {
+                    try {
+                        doClose();
+                    } catch (Exception e) {
+                        log.error("Error while closing web service", e);
+                    }
+                });
+                webServiceTerminator.setName("pulsar-web-service-terminator");
+                webServiceTerminator.start();
             }
-            webExecutorThreadPoolStats.close();
-            this.executorStats.close();
-            log.info("Web service closed");
         } catch (Exception e) {
             throw new PulsarServerException(e);
         }
     }
 
+    private void doClose() throws Exception {
+        server.stop();
+        // unregister statistics from Prometheus client's default 
CollectorRegistry singleton
+        // to prevent memory leaks in tests
+        if (jettyStatisticsCollector != null) {
+            try {
+                
CollectorRegistry.defaultRegistry.unregister(jettyStatisticsCollector);
+            } catch (Exception e) {
+                // ignore any exception happening in unregister
+                // exception will be thrown for 2. instance of WebService in 
tests since
+                // the register supports a single JettyStatisticsCollector
+            }
+            jettyStatisticsCollector = null;
+        }
+        webServiceExecutor.join();
+        if (this.sslContextRefreshTask != null) {
+            this.sslContextRefreshTask.cancel(true);
+        }
+        webExecutorThreadPoolStats.close();
+        this.executorStats.close();
+        log.info("Web service closed");
+    }
+
     public Optional<Integer> getListenPortHTTP() {
         if (httpConnector != null) {
             return Optional.of(httpConnector.getLocalPort());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index b47011f2070..6c04889d8f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -23,14 +23,18 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertSame;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -312,4 +316,27 @@ public class PulsarServiceTest extends 
MockedPulsarServiceBaseTest {
             pulsarService.close();
         }
     }
+
+    @Test
+    public void testShutdownViaAdminApi() throws Exception {
+        super.internalSetup();
+        super.setupDefaultTenantAndNamespace();
+        String topic = "persistent://public/default/testShutdownViaAdminApi";
+        admin.topics().createNonPartitionedTopic(topic);
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .create();
+        producer.send("message 1".getBytes());
+        admin.brokers()
+                .shutDownBrokerGracefully(0, false)
+                .get(30, TimeUnit.SECONDS);
+        try {
+            producer.send("message 2".getBytes());
+            fail("sending msg should timeout, because broker is down and there 
is only one broker");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException.TimeoutException);
+        }
+    }
 }

Reply via email to