This is an automated email from the ASF dual-hosted git repository.
shenwenbing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a569a4932c [client][fix] Bookie WatchTask may be stuck (#4481)
a569a4932c is described below
commit a569a4932c1cd8e534f70ee75bd0c7b71a5ceb68
Author: wenbingshen <[email protected]>
AuthorDate: Sat Aug 17 23:40:44 2024 +0800
[client][fix] Bookie WatchTask may be stuck (#4481)
* Add inner thread for WatchTask
* fix checkstyle
* rename watchTaskScheduler to highPriorityTaskExecutor
---
.../org/apache/bookkeeper/client/BookKeeper.java | 15 ++++++-
.../apache/bookkeeper/client/BookKeeperTest.java | 48 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 1 deletion(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 2433bb9efa..978d5355eb 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -119,6 +119,9 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
private final BookKeeperClientStats clientStats;
private final double bookieQuarantineRatio;
+ // Inner high priority thread for WatchTask. Disable external use.
+ private final OrderedScheduler highPriorityTaskExecutor;
+
// whether the event loop group is one we created, or is owned by whoever
// instantiated us
boolean ownEventLoopGroup = false;
@@ -424,6 +427,8 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
// initialize resources
this.scheduler =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
+ this.highPriorityTaskExecutor =
+
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperWatchTaskScheduler").build();
this.mainWorkerPool = OrderedExecutor.newBuilder()
.name("BookKeeperClientWorker")
.numThreads(conf.getNumWorkerThreads())
@@ -449,7 +454,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
}
this.metadataDriver.initialize(
conf,
- scheduler,
+ highPriorityTaskExecutor,
rootStatsLogger,
Optional.ofNullable(zkc));
} catch (ConfigurationException ce) {
@@ -551,6 +556,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
statsLogger = NullStatsLogger.INSTANCE;
clientStats = BookKeeperClientStats.newInstance(statsLogger);
scheduler = null;
+ highPriorityTaskExecutor = null;
requestTimer = null;
metadataDriver = null;
placementPolicy = null;
@@ -1462,6 +1468,13 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The scheduler did not shutdown cleanly");
}
+
+ // Close the watchTask scheduler
+ highPriorityTaskExecutor.shutdown();
+ if (!highPriorityTaskExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.warn("The highPriorityTaskExecutor for WatchTask did not
shutdown cleanly");
+ }
+
mainWorkerPool.shutdown();
if (!mainWorkerPool.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The mainWorkerPool did not shutdown cleanly");
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 14b71a163d..bb534b1e58 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -37,7 +37,9 @@ import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -50,12 +52,14 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
@@ -1298,4 +1302,48 @@ public class BookKeeperTest extends
BookKeeperClusterTestCase {
}
}
+ @Test
+ public void testBookieWatcher() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+
+ StaticDNSResolver tested = new StaticDNSResolver();
+ try (BookKeeper bkc = BookKeeper
+ .forConfig(conf)
+ .dnsResolver(tested)
+ .build()) {
+ final Map<BookieId, BookieInfoReader.BookieInfo> bookieInfo =
bkc.getBookieInfo();
+
+ // 1. check all bookies in client cache successfully.
+ bookieInfo.forEach((bookieId, info) -> {
+ final CompletableFuture<Versioned<BookieServiceInfo>>
bookieServiceInfo = bkc.getMetadataClientDriver()
+
.getRegistrationClient().getBookieServiceInfo(bookieId);
+ assertTrue(bookieServiceInfo.isDone());
+ assertFalse(bookieServiceInfo.isCompletedExceptionally());
+ });
+
+ // 2. add a task to scheduler, blocking zk watch for bookies cache
+ bkc.getClientCtx().getScheduler().schedule(() -> {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }, 0, TimeUnit.MILLISECONDS);
+
+ // 3. restart one bookie, so the client should update cache by
WatchTask
+ restartBookie(bookieInfo.keySet().iterator().next());
+
+ // 4. after restart bookie, check again for the client cache
+ final CompletableFuture<Versioned<BookieServiceInfo>>
bookieServiceInfo =
+ bkc.getMetadataClientDriver().getRegistrationClient()
+
.getBookieServiceInfo(bookieInfo.keySet().iterator().next());
+ assertTrue(bookieServiceInfo.isDone());
+ // 5. Previously, we used scheduler, and here getting bookie from
client cache would fail.
+ // 6. After this PR, we introduced independent internal thread
pool watchTaskScheduler,
+ // and here it will succeed.
+ assertFalse(bookieServiceInfo.isCompletedExceptionally());
+ }
+ }
+
}