This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 411aea9f193 [improve][broker] Reduce the broker close time to avoid 
useless wait for event loop shutdown (#24895)
411aea9f193 is described below

commit 411aea9f193553d13ef9daa760c81c21567f652b
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Oct 28 22:53:04 2025 +0800

    [improve][broker] Reduce the broker close time to avoid useless wait for 
event loop shutdown (#24895)
---
 .../pulsar/broker/service/BrokerService.java       | 34 ++++++-----
 .../service/BrokerEventLoopShutdownTest.java       | 71 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 378fda44c2e..0a27a089f63 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -97,6 +97,7 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -206,7 +207,6 @@ public class BrokerService implements Closeable {
     private static final TimeoutException 
FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION =
             FutureUtil.createTimeoutException("Failed to load topic within 
timeout", BrokerService.class,
                     "futureWithDeadline(...)");
-    private static final long GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS = 5000L;
     private static final double 
GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25d;
     private static final double 
GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
 
@@ -308,7 +308,7 @@ public class BrokerService implements Closeable {
     // fallback if recover BucketDelayedDeliveryTracker failed.
     private volatile DelayedDeliveryTrackerFactory 
fallbackDelayedDeliveryTrackerFactory;
     private final ServerBootstrap defaultServerBootstrap;
-    private final List<EventLoopGroup> protocolHandlersWorkerGroups = new 
ArrayList<>();
+    private final List<Pair<String, EventLoopGroup>> 
protocolHandlersWorkerGroups = new ArrayList<>();
 
     @Getter
     private final BundlesQuotas bundlesQuotas;
@@ -546,7 +546,7 @@ public class BrokerService implements Closeable {
             EventLoopGroup dedicatedWorkerGroup =
                     
EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, 
defaultThreadFactory);
             
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
-            protocolHandlersWorkerGroups.add(dedicatedWorkerGroup);
+            protocolHandlersWorkerGroups.add(Pair.of(protocol, 
dedicatedWorkerGroup));
             bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
         } else {
             bootstrap = defaultServerBootstrap.clone();
@@ -863,10 +863,10 @@ public class BrokerService implements Closeable {
             CompletableFuture<CompletableFuture<Void>> 
cancellableDownstreamFutureReference = new CompletableFuture<>();
             log.info("Event loops shutting down gracefully...");
             List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>();
-            shutdownEventLoops.add(shutdownEventLoopGracefully(acceptorGroup));
-            shutdownEventLoops.add(shutdownEventLoopGracefully(workerGroup));
-            for (EventLoopGroup group : protocolHandlersWorkerGroups) {
-                shutdownEventLoops.add(shutdownEventLoopGracefully(group));
+            shutdownEventLoops.add(shutdownEventLoopGracefully("acceptor", 
acceptorGroup));
+            shutdownEventLoops.add(shutdownEventLoopGracefully("worker", 
workerGroup));
+            for (final var pair : protocolHandlersWorkerGroups) {
+                
shutdownEventLoops.add(shutdownEventLoopGracefully(pair.getLeft(), 
pair.getRight()));
             }
 
             CompletableFuture<Void> shutdownFuture =
@@ -961,15 +961,21 @@ public class BrokerService implements Closeable {
         }
     }
 
-    CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup 
eventLoopGroup) {
+    CompletableFuture<Void> shutdownEventLoopGracefully(String name, 
EventLoopGroup eventLoopGroup) {
         long brokerShutdownTimeoutMs = 
pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
-        long quietPeriod = Math.min((long) (
-                GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * 
brokerShutdownTimeoutMs),
-                GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
         long timeout = (long) 
(GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
-        return NettyFutureUtil.toCompletableFutureVoid(
-                eventLoopGroup.shutdownGracefully(quietPeriod,
-                        timeout, MILLISECONDS));
+        long periodMs = (timeout > 0) ? 1 : 0;
+        long startNs = System.nanoTime();
+        return 
NettyFutureUtil.toCompletableFutureVoid(eventLoopGroup.shutdownGracefully(
+                periodMs, timeout, MILLISECONDS)
+        ).whenComplete((__, e) -> {
+            final var elapsedMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+            if (e == null) {
+                log.info("Event loop {} shut down after {} ms", name, 
elapsedMs);
+            } else {
+                log.warn("Failed to shut down event loop {} after {} ms: {}", 
name, elapsedMs, e.getMessage());
+            }
+        });
     }
 
     private CompletableFuture<Void> closeChannel(Channel channel) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
new file mode 100644
index 00000000000..29421f155b6
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test
+public class BrokerEventLoopShutdownTest {
+
+    private LocalBookkeeperEnsemble bk;
+
+    @BeforeClass(alwaysRun = true)
+    public void setup() throws Exception {
+        bk = new LocalBookkeeperEnsemble(2, 0, () -> 0);
+        bk.start();
+    }
+
+    @AfterClass(alwaysRun = true, timeOut = 30000)
+    public void cleanup() throws Exception {
+        bk.stop();
+    }
+
+    @Test(timeOut = 60000)
+    public void testCloseOneBroker() throws Exception {
+        final var clusterName = "test";
+        final Supplier<ServiceConfiguration> configSupplier = () -> {
+            final var config = new ServiceConfiguration();
+            config.setClusterName(clusterName);
+            config.setAdvertisedAddress("localhost");
+            config.setBrokerServicePort(Optional.of(0));
+            config.setWebServicePort(Optional.of(0));
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bk.getZookeeperPort());
+            return config;
+        };
+        @Cleanup final var broker0 = new PulsarService(configSupplier.get());
+        @Cleanup final var broker1 = new PulsarService(configSupplier.get());
+        broker0.start();
+        broker1.start();
+
+        final var startNs = System.nanoTime();
+        broker0.close();
+        final var closeTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+        Assert.assertTrue(closeTimeMs < 1000, "close time: " + closeTimeMs + " 
ms");
+    }
+}

Reply via email to