This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b121d854c0933309475891774b7a90911afb2c81
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Sep 14 11:24:57 2022 +0300

    [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper 
event thread (#17620)
---
 .../metadata/bookkeeper/PulsarRegistrationClient.java    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 38e2a33ef3f..52b50e3ea4b 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -21,12 +21,15 @@ package org.apache.pulsar.metadata.bookkeeper;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.versioning.Version;
@@ -46,6 +49,7 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
 
     private final Map<RegistrationListener, Boolean> writableBookiesWatchers = 
new ConcurrentHashMap<>();
     private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = 
new ConcurrentHashMap<>();
+    private final ScheduledExecutorService executor;
 
     public PulsarRegistrationClient(MetadataStore store,
                                     String ledgersRootPath) {
@@ -60,11 +64,15 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
         this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
         this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + 
"/" + READONLY;
 
+        this.executor = Executors
+                .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-registration-client"));
+
         store.registerListener(this::updatedBookies);
     }
 
     @Override
     public void close() {
+        executor.shutdownNow();
     }
 
     @Override
@@ -99,7 +107,7 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
     public CompletableFuture<Void> watchWritableBookies(RegistrationListener 
registrationListener) {
         writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
         return getWritableBookies()
-                .thenAccept(registrationListener::onBookiesChanged);
+                .thenAcceptAsync(registrationListener::onBookiesChanged, 
executor);
     }
 
     @Override
@@ -111,7 +119,7 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
     public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener 
registrationListener) {
         readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
         return getReadOnlyBookies()
-                .thenAccept(registrationListener::onBookiesChanged);
+                .thenAcceptAsync(registrationListener::onBookiesChanged, 
executor);
     }
 
     @Override
@@ -124,11 +132,11 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
             if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
                 getReadOnlyBookies().thenAccept(bookies ->
                         readOnlyBookiesWatchers.keySet()
-                                .forEach(w -> w.onBookiesChanged(bookies)));
+                                .forEach(w -> executor.execute(() -> 
w.onBookiesChanged(bookies))));
             } else if (n.getPath().startsWith(bookieRegistrationPath)) {
                 getWritableBookies().thenAccept(bookies ->
                         writableBookiesWatchers.keySet()
-                                .forEach(w -> w.onBookiesChanged(bookies)));
+                                .forEach(w -> executor.execute(() -> 
w.onBookiesChanged(bookies))));
             }
         }
     }

Reply via email to