This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6ab322ed399 [fix][broker] Fix PulsarService/BrokerService shutdown when brokerShutdownTimeoutMs=0 (#21496) 6ab322ed399 is described below commit 6ab322ed39901ca1b3e375ccf66d26a24da5874f Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Wed Nov 1 20:38:48 2023 +0200 [fix][broker] Fix PulsarService/BrokerService shutdown when brokerShutdownTimeoutMs=0 (#21496) --- .../src/main/java/org/apache/pulsar/broker/PulsarService.java | 10 +++++++--- .../java/org/apache/pulsar/broker/service/BrokerService.java | 9 ++++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 84e044df0bf..18e7f554c99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -639,14 +639,18 @@ public class PulsarService implements AutoCloseable, ShutdownService { } private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) { + long brokerShutdownTimeoutMs = getConfiguration().getBrokerShutdownTimeoutMs(); + if (brokerShutdownTimeoutMs <= 0) { + return future; + } ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor( new ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() + "-shutdown")); FutureUtil.addTimeoutHandling(future, - Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())), + Duration.ofMillis(brokerShutdownTimeoutMs), shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close")); future.handle((v, t) -> { - if (t != null && getConfiguration().getBrokerShutdownTimeoutMs() > 0) { - LOG.info("Shutdown timed out after {} ms", getConfiguration().getBrokerShutdownTimeoutMs()); + if (t instanceof TimeoutException) { + LOG.info("Shutdown timed out after {} ms", brokerShutdownTimeoutMs); LOG.info(ThreadDumpUtil.buildThreadDiagnosticString()); } // shutdown the shutdown executor diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1dccec6f305..93337bafd90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -826,6 +826,7 @@ public class BrokerService implements Closeable { for (EventLoopGroup group : protocolHandlersWorkerGroups) { shutdownEventLoops.add(shutdownEventLoopGracefully(group)); } + CompletableFuture<Void> shutdownFuture = CompletableFuture.allOf(shutdownEventLoops.toArray(new CompletableFuture[0])) .handle((v, t) -> { @@ -836,7 +837,7 @@ public class BrokerService implements Closeable { } return null; }) - .thenCompose(__ -> { + .thenComposeAsync(__ -> { log.info("Continuing to second phase in shutdown."); List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>(); @@ -900,6 +901,12 @@ public class BrokerService implements Closeable { return null; }); return combined; + }, runnable -> { + // run the 2nd phase of the shutdown in a separate thread + Thread thread = new Thread(runnable); + thread.setName("BrokerService-shutdown-phase2"); + thread.setDaemon(false); + thread.start(); }); FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () -> cancellableDownstreamFutureReference .thenAccept(future -> future.cancel(false)));