This is an automated email from the ASF dual-hosted git repository.

shenwenbing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a569a4932c [client][fix] Bookie WatchTask may be stuck (#4481)
a569a4932c is described below

commit a569a4932c1cd8e534f70ee75bd0c7b71a5ceb68
Author: wenbingshen <[email protected]>
AuthorDate: Sat Aug 17 23:40:44 2024 +0800

    [client][fix] Bookie WatchTask may be stuck (#4481)
    
    * Add inner thread for WatchTask
    
    * fix checkstyle
    
    * rename watchTaskScheduler to highPriorityTaskExecutor
---
 .../org/apache/bookkeeper/client/BookKeeper.java   | 15 ++++++-
 .../apache/bookkeeper/client/BookKeeperTest.java   | 48 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 2433bb9efa..978d5355eb 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -119,6 +119,9 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
     private final BookKeeperClientStats clientStats;
     private final double bookieQuarantineRatio;
 
+    // Inner high priority thread for WatchTask. Disable external use.
+    private final OrderedScheduler highPriorityTaskExecutor;
+
     // whether the event loop group is one we created, or is owned by whoever
     // instantiated us
     boolean ownEventLoopGroup = false;
@@ -424,6 +427,8 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
 
         // initialize resources
         this.scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
+        this.highPriorityTaskExecutor =
+                
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperWatchTaskScheduler").build();
         this.mainWorkerPool = OrderedExecutor.newBuilder()
                 .name("BookKeeperClientWorker")
                 .numThreads(conf.getNumWorkerThreads())
@@ -449,7 +454,7 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
             }
             this.metadataDriver.initialize(
                 conf,
-                scheduler,
+                highPriorityTaskExecutor,
                 rootStatsLogger,
                 Optional.ofNullable(zkc));
         } catch (ConfigurationException ce) {
@@ -551,6 +556,7 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
         statsLogger = NullStatsLogger.INSTANCE;
         clientStats = BookKeeperClientStats.newInstance(statsLogger);
         scheduler = null;
+        highPriorityTaskExecutor = null;
         requestTimer = null;
         metadataDriver = null;
         placementPolicy = null;
@@ -1462,6 +1468,13 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
         if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
             LOG.warn("The scheduler did not shutdown cleanly");
         }
+
+        // Close the watchTask scheduler
+        highPriorityTaskExecutor.shutdown();
+        if (!highPriorityTaskExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+            LOG.warn("The highPriorityTaskExecutor for WatchTask did not 
shutdown cleanly");
+        }
+
         mainWorkerPool.shutdown();
         if (!mainWorkerPool.awaitTermination(10, TimeUnit.SECONDS)) {
             LOG.warn("The mainWorkerPool did not shutdown cleanly");
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 14b71a163d..bb534b1e58 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -37,7 +37,9 @@ import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -50,12 +52,14 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
@@ -1298,4 +1302,48 @@ public class BookKeeperTest extends 
BookKeeperClusterTestCase {
         }
     }
 
+    @Test
+    public void testBookieWatcher() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+
+        StaticDNSResolver tested = new StaticDNSResolver();
+        try (BookKeeper bkc = BookKeeper
+                .forConfig(conf)
+                .dnsResolver(tested)
+                .build()) {
+            final Map<BookieId, BookieInfoReader.BookieInfo> bookieInfo = 
bkc.getBookieInfo();
+
+            // 1. check all bookies in client cache successfully.
+            bookieInfo.forEach((bookieId, info) -> {
+                final CompletableFuture<Versioned<BookieServiceInfo>> 
bookieServiceInfo = bkc.getMetadataClientDriver()
+                        
.getRegistrationClient().getBookieServiceInfo(bookieId);
+                assertTrue(bookieServiceInfo.isDone());
+                assertFalse(bookieServiceInfo.isCompletedExceptionally());
+            });
+
+            // 2. add a task to scheduler, blocking zk watch for bookies cache
+            bkc.getClientCtx().getScheduler().schedule(() -> {
+                try {
+                    Thread.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }, 0, TimeUnit.MILLISECONDS);
+
+            // 3. restart one bookie, so the client should update cache by 
WatchTask
+            restartBookie(bookieInfo.keySet().iterator().next());
+
+            // 4. after restart bookie, check again for the client cache
+            final CompletableFuture<Versioned<BookieServiceInfo>> 
bookieServiceInfo =
+                    bkc.getMetadataClientDriver().getRegistrationClient()
+                            
.getBookieServiceInfo(bookieInfo.keySet().iterator().next());
+            assertTrue(bookieServiceInfo.isDone());
+            // 5. Previously, we used scheduler, and here getting bookie from 
client cache would fail.
+            // 6. After this PR, we introduced independent internal thread 
pool watchTaskScheduler,
+            // and here it will succeed.
+            assertFalse(bookieServiceInfo.isCompletedExceptionally());
+        }
+    }
+
 }

Reply via email to