This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ab322ed399 [fix][broker] Fix PulsarService/BrokerService shutdown 
when brokerShutdownTimeoutMs=0 (#21496)
6ab322ed399 is described below

commit 6ab322ed39901ca1b3e375ccf66d26a24da5874f
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Wed Nov 1 20:38:48 2023 +0200

    [fix][broker] Fix PulsarService/BrokerService shutdown when 
brokerShutdownTimeoutMs=0 (#21496)
---
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java  | 10 +++++++---
 .../java/org/apache/pulsar/broker/service/BrokerService.java   |  9 ++++++++-
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 84e044df0bf..18e7f554c99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -639,14 +639,18 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     }
 
     private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> 
future) {
+        long brokerShutdownTimeoutMs = 
getConfiguration().getBrokerShutdownTimeoutMs();
+        if (brokerShutdownTimeoutMs <= 0) {
+            return future;
+        }
         ScheduledExecutorService shutdownExecutor = 
Executors.newSingleThreadScheduledExecutor(
                 new 
ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() + 
"-shutdown"));
         FutureUtil.addTimeoutHandling(future,
-                Duration.ofMillis(Math.max(1L, 
getConfiguration().getBrokerShutdownTimeoutMs())),
+                Duration.ofMillis(brokerShutdownTimeoutMs),
                 shutdownExecutor, () -> 
FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
         future.handle((v, t) -> {
-            if (t != null && getConfiguration().getBrokerShutdownTimeoutMs() > 
0) {
-                LOG.info("Shutdown timed out after {} ms", 
getConfiguration().getBrokerShutdownTimeoutMs());
+            if (t instanceof TimeoutException) {
+                LOG.info("Shutdown timed out after {} ms", 
brokerShutdownTimeoutMs);
                 LOG.info(ThreadDumpUtil.buildThreadDiagnosticString());
             }
             // shutdown the shutdown executor
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1dccec6f305..93337bafd90 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -826,6 +826,7 @@ public class BrokerService implements Closeable {
             for (EventLoopGroup group : protocolHandlersWorkerGroups) {
                 shutdownEventLoops.add(shutdownEventLoopGracefully(group));
             }
+
             CompletableFuture<Void> shutdownFuture =
                     CompletableFuture.allOf(shutdownEventLoops.toArray(new 
CompletableFuture[0]))
                             .handle((v, t) -> {
@@ -836,7 +837,7 @@ public class BrokerService implements Closeable {
                                 }
                                 return null;
                             })
-                            .thenCompose(__ -> {
+                            .thenComposeAsync(__ -> {
                                 log.info("Continuing to second phase in 
shutdown.");
 
                                 List<CompletableFuture<Void>> 
asyncCloseFutures = new ArrayList<>();
@@ -900,6 +901,12 @@ public class BrokerService implements Closeable {
                                     return null;
                                 });
                                 return combined;
+                            }, runnable -> {
+                                // run the 2nd phase of the shutdown in a 
separate thread
+                                Thread thread = new Thread(runnable);
+                                
thread.setName("BrokerService-shutdown-phase2");
+                                thread.setDaemon(false);
+                                thread.start();
                             });
             FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () -> 
cancellableDownstreamFutureReference
                     .thenAccept(future -> future.cancel(false)));

Reply via email to