wenbingshen opened a new pull request, #4481:
URL: https://github.com/apache/bookkeeper/pull/4481

   ### Motivation
   
   Before understanding the problem solved by this PR, you can try to use this 
Test method to execute it in the existing master branch, and the unit test will 
fail.
   
   ```java
   @Test
       public void testBookieWatcher() throws Exception {
           ClientConfiguration conf = new ClientConfiguration();
           conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
   
           StaticDNSResolver tested = new StaticDNSResolver();
           try (BookKeeper bkc = BookKeeper
                   .forConfig(conf)
                   .dnsResolver(tested)
                   .build()) {
               final Map<BookieId, BookieInfoReader.BookieInfo> bookieInfo = 
bkc.getBookieInfo();
   
               // 1. check all bookies in client cache successfully.
               bookieInfo.forEach((bookieId, info) -> {
                   final CompletableFuture<Versioned<BookieServiceInfo>> 
bookieServiceInfo = bkc.getMetadataClientDriver()
                           
.getRegistrationClient().getBookieServiceInfo(bookieId);
                   assertTrue(bookieServiceInfo.isDone());
                   assertFalse(bookieServiceInfo.isCompletedExceptionally());
               });
   
               // 2. add a task to scheduler, blocking zk watch for bookies 
cache
               bkc.getClientCtx().getScheduler().schedule(() -> {
                   try {
                       Thread.sleep(Long.MAX_VALUE);
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }, 0, TimeUnit.MILLISECONDS);
   
               // 3. restart one bookie, so the client should update cache by 
WatchTask
               restartBookie(bookieInfo.keySet().iterator().next());
   
               // 4. after restart bookie, check again for the client cache
               final CompletableFuture<Versioned<BookieServiceInfo>> 
bookieServiceInfo =
                       bkc.getMetadataClientDriver().getRegistrationClient()
                               
.getBookieServiceInfo(bookieInfo.keySet().iterator().next());
               assertTrue(bookieServiceInfo.isDone());
               // 5. Previously, we used scheduler, and here getting bookie 
from client cache would fail.
               // 6. After this PR, we introduced independent internal thread 
pool watchTaskScheduler,
               // and here it will succeed.
               assertFalse(bookieServiceInfo.isCompletedExceptionally());
           }
       }
   ```
   
   Next, let me tell you my problem:
   1. We execute bookie offline and then go online in ReadOnly state, hoping 
that these bookies can continue to accept client read requests after they come 
back online.
   2. Then, in some of our brokers, we found that some bookies were not 
re-listed in the BookKeeper client cache in ManagedLedger after they were 
online. The following picture shows that Update BookieInfoCache was only 
executed once, but it should be executed twice in theory, because in addition 
to ManagedLedger in Pulsar's Broker, there is also a BookKeeper Client in 
BookKeeperSchemaStroge. As shown in the second figure.
   
![image](https://github.com/user-attachments/assets/f1f22095-a552-4510-9433-2add4c8e4dcb)
   
   
![image](https://github.com/user-attachments/assets/5d61b69f-3671-463e-8574-0cf32b7f8c16)
   
   3. According to the stack analysis, our Scheduler thread is frequently 
performing the following operations:
   Since the thread has been performing this operation, the watch event 
triggered by ZK has been in the queue of the scheduler thread and has not been 
executed, and no new watch listeners will be registered in ZK. Finally, the 
online Bookie node will no longer be updated in the cache of the bk client.
   
   ···
   BookKeeperClientScheduler-OrderedScheduler-0-0
     at 
org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap$Section.removeIf(Ljava/util/function/BiPredicate;)I
 (ConcurrentOpenHashMap.java:406)
     at 
org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap.removeIf(Ljava/util/function/BiPredicate;)I
 (ConcurrentOpenHashMap.java:172)
     at 
org.apache.bookkeeper.proto.PerChannelBookieClient.checkTimeoutOnPendingOperations()V
 (PerChannelBookieClient.java:1015)
     at 
org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.checkTimeoutOnPendingOperations()V
 (DefaultPerChannelBookieClientPool.java:132)
     at 
org.apache.bookkeeper.proto.BookieClientImpl.monitorPendingOperations()V 
(BookieClientImpl.java:572)
     at org.apache.bookkeeper.proto.BookieClientImpl.lambda$new$0()V 
(BookieClientImpl.java:131)
     at org.apache.bookkeeper.proto.BookieClientImpl$$Lambda$77.run()V (Unknown 
Source)
     at org.apache.bookkeeper.util.SafeRunnable$1.safeRun()V 
(SafeRunnable.java:43)
     at org.apache.bookkeeper.common.util.SafeRunnable.run()V 
(SafeRunnable.java:36)
     at 
com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.run()V
 (MoreExecutors.java:705)
     at java.util.concurrent.Executors$RunnableAdapter.call()Ljava/lang/Object; 
(Executors.java:511)
     at java.util.concurrent.FutureTask.runAndReset()Z (FutureTask.java:308)
     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Ljava/util/concurrent/ScheduledThreadPoolExecutor$ScheduledFutureTask;)Z
 (ScheduledThreadPoolExecutor.java:180)
     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run()V 
(ScheduledThreadPoolExecutor.java:294)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V
 (ThreadPoolExecutor.java:1142)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run()V 
(ThreadPoolExecutor.java:617)
     at io.netty.util.concurrent.FastThreadLocalRunnable.run()V 
(FastThreadLocalRunnable.java:30)
     at java.lang.Thread.run()V (Thread.java:748)
   ···
   
   
![image](https://github.com/user-attachments/assets/a85d1dfe-090f-41e1-ae47-7f3c8072b812)
   
![image](https://github.com/user-attachments/assets/267af4f1-8d0f-46c6-95d0-33b66a0d6078)
   
   
   4. The above stack issues have been fixed in the latest version through 
multiple PRs, but I see that the Scheduler thread is still accessed by multiple 
components and publicly accessed through ClientContext, which is a very 
dangerous operation. The harm we suffered this time was that we could not get 
the bookie node from the cache, which affected the reading of consumer data, 
and coincidentally, the three replica bookies node that was originally going to 
read the data just experienced our offline and online operations.
   
   So I suggest that WatchTask provide an internal independent thread pool.
   
   ### Changes
   
   Add inner thread for WatchTask.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to