eolivelli opened a new issue, #19579:
URL: https://github.com/apache/pulsar/issues/19579

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   When the broker is shutting down there are many subcomponents to shutdown, 
in particular the Protocol Handlers.
   Most of the shutdown (close) procedure of the Broker is async but we still 
have a blocking operation when closing the Protocol Handlers.
   Protocol Handlers, like KOP/Starlight for Kafka, often start threadpools and 
Pulsar Clients, and it may happen that in order to try a graceful shutdown the 
PH waits for some resources to be disposed, but such disposal may be deferred 
for long time (because the broker is also shuttting down and some resources are 
no more available, leading to errors and backoffs).
   
   The shutdown procedure of the Broker should be as quick as possible in order 
to prevent latency spikes and other unwanted consequences due to having a 
partially working broker.
   
   
[This](https://github.com/datastax/starlight-for-kafka/actions/runs/4224366992/jobs/7335231134)
 is an example of a test failed due to time out on Starlight for Kafka.
   
   ```
   
   Error:  
testConnectListenerNotExist(io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest)
  Time elapsed: 20.037 s  <<< FAILURE!
   org.testng.internal.thread.ThreadTimeoutException: Method 
io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest.testConnectListenerNotExist()
 didn't finish within the time-out 20000
        at 
[email protected]/java.lang.StackStreamFactory$AbstractStackWalker.callStackWalk(Native
 Method)
        at 
[email protected]/java.lang.StackStreamFactory$AbstractStackWalker.beginStackWalk(StackStreamFactory.java:370)
        at 
[email protected]/java.lang.StackStreamFactory$AbstractStackWalker.walk(StackStreamFactory.java:243)
        at [email protected]/java.lang.StackWalker.walk(StackWalker.java:498)
        at 
app//org.apache.logging.log4j.util.StackLocator.calcLocation(StackLocator.java:96)
        at 
app//org.apache.logging.log4j.util.StackLocatorUtil.calcLocation(StackLocatorUtil.java:99)
        at 
app//org.apache.logging.log4j.spi.AbstractLogger.getLocation(AbstractLogger.java:2216)
        at 
app//org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2159)
        at 
app//org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2142)
        at 
app//org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2040)
        at 
app//org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1907)
        at app//org.apache.logging.slf4j.Log4jLogger.warn(Log4jLogger.java:249)
        at 
app//io.streamnative.pulsar.handlers.kop.AbstractPulsarClient.close(AbstractPulsarClient.java:49)
        at 
app//io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.close(KafkaProtocolHandler.java:578)
        at 
app//org.apache.pulsar.broker.protocol.ProtocolHandlerWithClassLoader.close(ProtocolHandlerWithClassLoader.java:90)
        at 
app//org.apache.pulsar.broker.protocol.ProtocolHandlers$$Lambda$1438/0x0000000100b38040.accept(Unknown
 Source)
        at [email protected]/java.lang.Iterable.forEach(Iterable.java:75)
        at 
app//org.apache.pulsar.broker.protocol.ProtocolHandlers.close(ProtocolHandlers.java:154)
        at 
app//org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:458)
        at 
app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.closeAsync$accessor$pCM5WTps(Unknown
 Source)
        at 
app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588$auxiliary$4dJQHnOb.call(Unknown
 Source)
        at 
app//org.mockito.internal.invocation.RealMethod$FromCallable$1.call(RealMethod.java:40)
        at 
app//org.mockito.internal.invocation.RealMethod$FromBehavior.invoke(RealMethod.java:62)
        at 
app//org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:127)
        at 
app//org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:43)
        at app//org.mockito.Answers.answer(Answers.java:100)
        at 
app//org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
        at 
app//org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
        at 
app//org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:35)
        at 
app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:63)
        at 
app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:49)
        at 
app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptSuperCallable(MockMethodInterceptor.java:110)
        at 
app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.closeAsync(Unknown
 Source)
        at 
app//org.apache.pulsar.broker.PulsarService.close(PulsarService.java:380)
        at 
app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.close$accessor$pCM5WTps(Unknown
 Source)
        at 
app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588$auxiliary$Lvw9o3WA.call(Unknown
 Source)
        at 
app//org.mockito.internal.invocation.RealMethod$FromCallable$1.call(RealMethod.java:40)
        at 
app//org.mockito.internal.invocation.RealMethod$FromBehavior.invoke(RealMethod.java:62)
        at 
app//org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:127)
        at 
app//org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:43)
        at app//org.mockito.Answers.answer(Answers.java:100)
        at 
app//org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
        at 
app//org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
        at 
app//org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:35)
        at 
app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:63)
        at 
app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:49)
        at 
app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptSuperCallable(MockMethodInterceptor.java:110)
        at 
app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.close(Unknown 
Source)
        at 
app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.stopBroker(KopProtocolHandlerTestBase.java:411)
        at 
app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.stopBroker(KopProtocolHandlerTestBase.java:415)
        at 
app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.internalCleanup(KopProtocolHandlerTestBase.java:[379](https://github.com/datastax/starlight-for-kafka/actions/runs/4225957299/jobs/7338864110#step:7:380))
        at 
app//io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest.testConnectListenerNotExist(KafkaListenerNameTest.java:211)
        at 
[email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
        at 
[email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
[email protected]/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at [email protected]/java.lang.reflect.Method.invoke(Method.java:566)
        at 
app//org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
        at 
app//org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
        at 
app//org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
        at 
[email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at 
[email protected]/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
[email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
[email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at [email protected]/java.lang.Thread.run(Thread.java:829)
   ```
   
   
   ### Solution
   
   The ProtocolHandler API should provide a closeAsync() method instead of a 
blocking close().
   
   We could make the API backward compatible by leveraging Java default methods:
   
   
   ```
   public default CompletableFuture<?> closeAsync() {
            CompletableFuture<?> result = new CompletableFuture<>();
            try {
               this.close();
               result.complete(null);
            } catch (Throwable t) {
                 // TODO handle InterruptedException here
                 result.completeExceptionally(t);
            }
    }
   ```
   
   
   
   ### Alternatives
   
   Remove the close() API at all and break compatibility.
   
   Rejected because there are already a few ProtocolHandlers and we would make 
harm to the community by breaking the compatibility.
   
   ### Anything else?
   
   We should port this little API change to stable branches, especially 2.10.x 
that is the latest version that support JDK8.
   
   This change is needed in order to enhance the shutdown procedure, that could 
lead to huge latency spikes.
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to