This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aece67e35ec [fix] Remove blocking calls from BookieRackAffinityMapping 
(#22846)
aece67e35ec is described below

commit aece67e35ecec4a9d90a951b78cfc89ca6395054
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Jun 5 10:49:00 2024 -0700

    [fix] Remove blocking calls from BookieRackAffinityMapping (#22846)
---
 .../rackawareness/BookieRackAffinityMapping.java   | 44 +++++++++++++---------
 .../IsolatedBookieEnsemblePlacementPolicy.java     |  2 +-
 2 files changed, 28 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index 983822f2294..4a5ff746f40 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
     private BookiesRackConfiguration racksWithHost = new 
BookiesRackConfiguration();
     private Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
 
-    public static MetadataStore createMetadataStore(Configuration conf) throws 
MetadataException {
+    static MetadataStore getMetadataStore(Configuration conf) throws 
MetadataException {
         MetadataStore store;
         Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
         if (storeProperty != null) {
@@ -116,12 +116,20 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
         super.setConf(conf);
         MetadataStore store;
         try {
-            store = createMetadataStore(conf);
-            bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
-            store.registerListener(this::handleUpdates);
-            racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
-                    .orElseGet(BookiesRackConfiguration::new);
-            for (Map<String, BookieInfo> bookieMapping : 
racksWithHost.values()) {
+            store = getMetadataStore(conf);
+        } catch (MetadataException e) {
+            throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to 
init BookieId list");
+        }
+
+        bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
+        store.registerListener(this::handleUpdates);
+
+        try {
+            var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+                    .thenApply(optRes -> 
optRes.orElseGet(BookiesRackConfiguration::new))
+                    .get();
+
+            for (var bookieMapping : racksWithHost.values()) {
                 for (String address : bookieMapping.keySet()) {
                     bookieAddressListLastTime.add(BookieId.parse(address));
                 }
@@ -131,10 +139,12 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
                 }
             }
             updateRacksWithHost(racksWithHost);
-            watchAvailableBookies();
-        } catch (InterruptedException | ExecutionException | MetadataException 
e) {
-            throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to 
init BookieId list");
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("Failed to update rack info. ", e);
+            throw new RuntimeException(e);
         }
+
+        watchAvailableBookies();
     }
 
     private void watchAvailableBookies() {
@@ -145,13 +155,13 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
                 field.setAccessible(true);
                 RegistrationClient registrationClient = (RegistrationClient) 
field.get(bookieAddressResolver);
                 registrationClient.watchWritableBookies(versioned -> {
-                    try {
-                        racksWithHost = 
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
-                                .orElseGet(BookiesRackConfiguration::new);
-                        updateRacksWithHost(racksWithHost);
-                    } catch (InterruptedException | ExecutionException e) {
-                        LOG.error("Failed to update rack info. ", e);
-                    }
+                    bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+                            .thenApply(optRes -> 
optRes.orElseGet(BookiesRackConfiguration::new))
+                            .thenAccept(this::updateRacksWithHost)
+                            .exceptionally(ex -> {
+                                LOG.error("Failed to update rack info. ", ex);
+                                return null;
+                            });
                 });
             } catch (NoSuchFieldException | IllegalAccessException e) {
                 LOG.error("Failed watch available bookies.", e);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 8839e6e2d26..62b7ffa1e29 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -73,7 +73,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
             StatsLogger statsLogger, BookieAddressResolver 
bookieAddressResolver) {
         MetadataStore store;
         try {
-            store = BookieRackAffinityMapping.createMetadataStore(conf);
+            store = BookieRackAffinityMapping.getMetadataStore(conf);
         } catch (MetadataException e) {
             throw new RuntimeException(METADATA_STORE_INSTANCE + " failed 
initialized");
         }

Reply via email to