lhotari commented on code in PR #15638:
URL: https://github.com/apache/pulsar/pull/15638#discussion_r918754279


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -597,6 +601,8 @@ private CompletableFuture<Void> 
addTimeoutHandling(CompletableFuture<Void> futur
                 Duration.ofMillis(Math.max(1L, 
getConfiguration().getBrokerShutdownTimeoutMs())),
                 shutdownExecutor, () -> 
FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
         future.handle((v, t) -> {
+            LOG.info("Shutdown timed out after {} ms", 
getConfiguration().getBrokerShutdownTimeoutMs());
+            LOG.info(ThreadDumpUtil.buildThreadDiagnosticString());

Review Comment:
   does this need `if (t != null) {` ?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java:
##########
@@ -249,20 +252,20 @@ public void setup() throws Exception {
 
     @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception {
-        metadataStore.close();
-        brokerService.getTopics().clear();
-        brokerService.close(); //to clear pulsarStats
-        try {
-            pulsar.close();
-        } catch (Exception e) {
-            log.warn("Failed to close pulsar service", e);
-            throw e;
-        }
-
-        executor.shutdownNow();
-        if (eventLoopGroup != null) {
-            eventLoopGroup.shutdownGracefully().get();
-        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(brokerService.closeAsync());
+        futures.add(pulsar.closeAsync());
+        futures.add(GracefulExecutorServicesShutdown.initiate()
+                .timeout(Duration.ZERO)
+                .shutdown(executor)
+                .handle());
+        final CompletableFuture<Void> eventLoopGroupCloseFuture = new 
CompletableFuture<>();
+        eventLoopGroup.shutdownGracefully().sync().addListener(f -> {
+            eventLoopGroupCloseFuture.complete(null);
+        });

Review Comment:
   this doesn't look right. 
   
   What is the intention of calling `.sync()`? that would wait for the shutdown 
to happen before proceeding. The future isn't needed in that case at all so 
that's why the `.addListener` doesn't make sense. 
   
   There's 
[org.apache.pulsar.common.util.netty.NettyFutureUtil#toCompletableFuture](https://github.com/apache/pulsar/blob/12d43a86d9fb228b498914155bc3a6194874a2a2/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java#L36)
 to adapt a Netty Future to a CompletableFuture.
   
   



##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesShutdown.java:
##########
@@ -114,4 +121,16 @@ public CompletableFuture<Void> handle() {
         return new GracefulExecutorServicesTerminationHandler(timeout, 
terminationTimeout,
                 executorServices).getFuture();
     }
+
+    public static CompletableFuture<Void> 
shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup,
+                                                                      long 
brokerShutdownTimeoutMs) {
+        long quietPeriod = Math.min((long) (
+                        GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT 
* brokerShutdownTimeoutMs),
+                GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
+        long timeout = (long) 
(GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
+        return NettyFutureUtil.toCompletableFutureVoid(
+                eventLoopGroup.shutdownGracefully(quietPeriod,
+                        timeout, TimeUnit.MILLISECONDS));
+    }
+

Review Comment:
   this method doesn't really belong to this class. 
org.apache.pulsar.common.util.netty.EventLoopUtil is a better location for this 
method.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java:
##########
@@ -238,12 +240,19 @@ public void teardown() throws Exception {
         if (channel != null) {
             channel.close();
         }
-        pulsar.close();
-        brokerService.close();
-        executor.shutdownNow();
-        if (eventLoopGroup != null) {
-            eventLoopGroup.shutdownGracefully().get();
-        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(brokerService.closeAsync());
+        futures.add(pulsar.closeAsync());
+        futures.add(GracefulExecutorServicesShutdown.initiate()
+                .timeout(Duration.ZERO)
+                .shutdown(executor)
+                .handle());
+        final CompletableFuture<Void> eventLoopGroupCloseFuture = new 
CompletableFuture<>();
+        eventLoopGroup.shutdownGracefully().sync().addListener(f -> {
+            eventLoopGroupCloseFuture.complete(null);
+        });

Review Comment:
   this doesn't look right. (please check first comment about this code pattern)



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java:
##########
@@ -190,12 +195,18 @@ public void testHttpLookupRedirect() throws Exception {
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
-        if (this.executorService != null) {
-            this.executorService.shutdownNow();
-        }
-        if (eventExecutors != null) {
-            eventExecutors.shutdownGracefully();
-        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(pulsar.closeAsync());
+        futures.add(GracefulExecutorServicesShutdown.initiate()
+                .timeout(Duration.ZERO)
+                .shutdown(executorService)
+                .handle());
+        final CompletableFuture<Void> eventLoopGroupCloseFuture = new 
CompletableFuture<>();
+        eventExecutors.shutdownGracefully().sync().addListener(f -> {
+            eventLoopGroupCloseFuture.complete(null);
+        });

Review Comment:
   this doesn't look right. (please check first comment about this code pattern)



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java:
##########
@@ -208,19 +211,20 @@ public CompletableFuture<Boolean> 
checkInitializedBefore(PersistentSubscription
 
     @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception {
-        brokerMock.close(); //to clear pulsarStats
-        try {
-            pulsarMock.close();
-        } catch (Exception e) {
-            log.warn("Failed to close pulsar service", e);
-            throw e;
-        }
-
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(brokerMock.closeAsync());
+        futures.add(pulsarMock.closeAsync());
+        futures.add(GracefulExecutorServicesShutdown.initiate()
+                .timeout(Duration.ZERO)
+                .shutdown(executor)
+                .handle());
+        final CompletableFuture<Void> eventLoopGroupCloseFuture = new 
CompletableFuture<>();
+        eventLoopGroup.shutdownGracefully().sync().addListener(f -> {
+            eventLoopGroupCloseFuture.complete(null);
+        });

Review Comment:
   this doesn't look right. (please check first comment about this code pattern)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to