wenbingshen opened a new pull request, #4481:
URL: https://github.com/apache/bookkeeper/pull/4481
### Motivation
Before understanding the problem solved by this PR, you can try to use this
Test method to execute it in the existing master branch, and the unit test will
fail.
```java
@Test
public void testBookieWatcher() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
StaticDNSResolver tested = new StaticDNSResolver();
try (BookKeeper bkc = BookKeeper
.forConfig(conf)
.dnsResolver(tested)
.build()) {
final Map<BookieId, BookieInfoReader.BookieInfo> bookieInfo =
bkc.getBookieInfo();
// 1. check all bookies in client cache successfully.
bookieInfo.forEach((bookieId, info) -> {
final CompletableFuture<Versioned<BookieServiceInfo>>
bookieServiceInfo = bkc.getMetadataClientDriver()
.getRegistrationClient().getBookieServiceInfo(bookieId);
assertTrue(bookieServiceInfo.isDone());
assertFalse(bookieServiceInfo.isCompletedExceptionally());
});
// 2. add a task to scheduler, blocking zk watch for bookies
cache
bkc.getClientCtx().getScheduler().schedule(() -> {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, TimeUnit.MILLISECONDS);
// 3. restart one bookie, so the client should update cache by
WatchTask
restartBookie(bookieInfo.keySet().iterator().next());
// 4. after restart bookie, check again for the client cache
final CompletableFuture<Versioned<BookieServiceInfo>>
bookieServiceInfo =
bkc.getMetadataClientDriver().getRegistrationClient()
.getBookieServiceInfo(bookieInfo.keySet().iterator().next());
assertTrue(bookieServiceInfo.isDone());
// 5. Previously, we used scheduler, and here getting bookie
from client cache would fail.
// 6. After this PR, we introduced independent internal thread
pool watchTaskScheduler,
// and here it will succeed.
assertFalse(bookieServiceInfo.isCompletedExceptionally());
}
}
```
Next, let me tell you my problem:
1. We execute bookie offline and then go online in ReadOnly state, hoping
that these bookies can continue to accept client read requests after they come
back online.
2. Then, in some of our brokers, we found that some bookies were not
re-listed in the BookKeeper client cache in ManagedLedger after they were
online. The following picture shows that Update BookieInfoCache was only
executed once, but it should be executed twice in theory, because in addition
to ManagedLedger in Pulsar's Broker, there is also a BookKeeper Client in
BookKeeperSchemaStroge. As shown in the second figure.


3. According to the stack analysis, our Scheduler thread is frequently
performing the following operations:
Since the thread has been performing this operation, the watch event
triggered by ZK has been in the queue of the scheduler thread and has not been
executed, and no new watch listeners will be registered in ZK. Finally, the
online Bookie node will no longer be updated in the cache of the bk client.
···
BookKeeperClientScheduler-OrderedScheduler-0-0
at
org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap$Section.removeIf(Ljava/util/function/BiPredicate;)I
(ConcurrentOpenHashMap.java:406)
at
org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap.removeIf(Ljava/util/function/BiPredicate;)I
(ConcurrentOpenHashMap.java:172)
at
org.apache.bookkeeper.proto.PerChannelBookieClient.checkTimeoutOnPendingOperations()V
(PerChannelBookieClient.java:1015)
at
org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.checkTimeoutOnPendingOperations()V
(DefaultPerChannelBookieClientPool.java:132)
at
org.apache.bookkeeper.proto.BookieClientImpl.monitorPendingOperations()V
(BookieClientImpl.java:572)
at org.apache.bookkeeper.proto.BookieClientImpl.lambda$new$0()V
(BookieClientImpl.java:131)
at org.apache.bookkeeper.proto.BookieClientImpl$$Lambda$77.run()V (Unknown
Source)
at org.apache.bookkeeper.util.SafeRunnable$1.safeRun()V
(SafeRunnable.java:43)
at org.apache.bookkeeper.common.util.SafeRunnable.run()V
(SafeRunnable.java:36)
at
com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.run()V
(MoreExecutors.java:705)
at java.util.concurrent.Executors$RunnableAdapter.call()Ljava/lang/Object;
(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset()Z (FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Ljava/util/concurrent/ScheduledThreadPoolExecutor$ScheduledFutureTask;)Z
(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run()V
(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V
(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run()V
(ThreadPoolExecutor.java:617)
at io.netty.util.concurrent.FastThreadLocalRunnable.run()V
(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run()V (Thread.java:748)
···


4. The above stack issues have been fixed in the latest version through
multiple PRs, but I see that the Scheduler thread is still accessed by multiple
components and publicly accessed through ClientContext, which is a very
dangerous operation. The harm we suffered this time was that we could not get
the bookie node from the cache, which affected the reading of consumer data,
and coincidentally, the three replica bookies node that was originally going to
read the data just experienced our offline and online operations.
So I suggest that WatchTask provide an internal independent thread pool.
### Changes
Add inner thread for WatchTask.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]