wenbingshen opened a new pull request, #3966:
URL: https://github.com/apache/bookkeeper/pull/3966

   ### Motivation
   
   I am stopping a pulsar broker, and I observed a lot of the following logs in 
the broker log:
   
   17:48:35.894 [shutdown-thread-43-1] INFO  
org.apache.bookkeeper.proto.PerChannelBookieClient - Closing the per channel 
bookie client for 10.184.1.65:3181
   17:48:35.894 [shutdown-thread-43-1] ERROR 
io.netty.util.concurrent.DefaultPromise.rejectedExecution - Failed to submit a 
listener notification task. Event loop shut down?
   java.util.concurrent.RejectedExecutionException: event executor terminated
           at 
io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
 ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
 ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
 ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
 ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
 ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
           at 
io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:842) 
~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
           at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499)
 ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
           at 
io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184) 
~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
           at 
io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
 ~[io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
           at 
io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
 ~[io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
           at 
org.apache.bookkeeper.proto.PerChannelBookieClient.closeChannel(PerChannelBookieClient.java:1090)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
           at 
org.apache.bookkeeper.proto.PerChannelBookieClient.closeInternal(PerChannelBookieClient.java:1079)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
           at 
org.apache.bookkeeper.proto.PerChannelBookieClient.close(PerChannelBookieClient.java:1063)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
           at 
org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.close(DefaultPerChannelBookieClientPool.java:157)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
           at 
org.apache.bookkeeper.proto.BookieClientImpl.close(BookieClientImpl.java:587) 
~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
           at 
org.apache.bookkeeper.client.BookKeeper.close(BookKeeper.java:1435) 
~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
           at 
org.apache.pulsar.broker.ManagedLedgerClientFactory.close(ManagedLedgerClientFactory.java:142)
 ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
           at 
org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:417) 
~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
           at 
org.apache.pulsar.broker.MessagingServiceShutdownHook.lambda$run$1(MessagingServiceShutdownHook.java:62)
 ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[?:1.8.0_131]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[?:1.8.0_131]
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
   
   When I stopped the broker, there were more than 30,000 lines in such a log, 
which seriously polluted my normal log of the broker.
   <img width="1180" alt="image" 
src="https://github.com/apache/bookkeeper/assets/35599757/0ec3c423-14f1-41c4-85c3-d978ece15044";>
   
   According to my next investigation,
   1. When the broker stops, it will close the ManagedLedgerClientFactory. It 
will close all bookkeeper connections
   
   
https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java#L154
   ```java
       @Override
       public void close(boolean wait) {
           for (int i = 0; i < clients.length; i++) {
               clients[i].close(wait);
               if (clients != clientsV3Enforced) {
                   clientsV3Enforced[i].close(wait);
               }
           }
       }
   
       private void closeInternal(boolean permanent, boolean wait) {
           Channel toClose = null;
           synchronized (this) {
               if (permanent) {
                   state = ConnectionState.CLOSED;
               } else if (state != ConnectionState.CLOSED) {
                   state = ConnectionState.DISCONNECTED;
               }
               toClose = channel;
               channel = null;
               makeWritable();
           }
           if (toClose != null) {   <== a. toClose not null
               ChannelFuture cf = closeChannel(toClose);
               if (wait) {
                   cf.awaitUninterruptibly();
               }
           }
       }
   
       private ChannelFuture closeChannel(Channel c) {
           if (LOG.isDebugEnabled()) {
               LOG.debug("Closing channel {}", c);
           }
           return c.close().addListener(x -> makeWritable());  <==  b. here if 
channel is already inactive, add the listener will throw exception
       }
   ```
   
   
   2. When the bookie server first disconnects the client, the client will 
trigger channelInactive
   ```java
       public void channelInactive(ChannelHandlerContext ctx) throws Exception {
           LOG.info("Disconnected from bookie channel {}", ctx.channel());
           if (ctx.channel() != null) {
               closeChannel(ctx.channel());  <== c. here channel is closed
               if (ctx.channel().pipeline().get(SslHandler.class) != null) {
                   activeTlsChannelCounter.dec();
               } else {
                   activeNonTlsChannelCounter.dec();
               }
           }
   
           
errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException);
           
errorOutPendingOps(BKException.Code.BookieHandleNotAvailableException);
   
           synchronized (this) {
               if (this.channel == ctx.channel()
                   && state != ConnectionState.CLOSED) {
                   state = ConnectionState.DISCONNECTED;
               }
           }
   
           // we don't want to reconnect right away. If someone sends a request 
to
           // this address, we will reconnect.
       }
   ```
   
   If step c happens before b, then b throws the above exception.
   
   step c
   <img width="1400" alt="image" 
src="https://github.com/apache/bookkeeper/assets/35599757/4a33855c-5682-4812-ae0e-1a02972ef502";>
   <img width="1407" alt="image" 
src="https://github.com/apache/bookkeeper/assets/35599757/1c99bccb-b2bd-4cfb-ac4b-ee10b2d236b0";>
   
   
   step a and b
   <img width="1403" alt="image" 
src="https://github.com/apache/bookkeeper/assets/35599757/fec372c5-f97b-47e7-b5d7-ebbaa6edf682";>
   
   So when channelInactive, we should reset the channel to null after closed 
the channel.
   Here, when the channel is null, we will reconnect after getting the client, 
which is consistent with the current behavior.
   
![image](https://github.com/apache/bookkeeper/assets/35599757/d1178697-27bd-44a5-acd1-33ccec99e7a4)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to