(pulsar) 01/02: [fix] Remove blocking calls from BookieRackAffinityMapping (#22846)

2024-06-05 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 190857e1854271b55ba84d63422a473ae45e64fc
Author: Matteo Merli 
AuthorDate: Wed Jun 5 10:49:00 2024 -0700

[fix] Remove blocking calls from BookieRackAffinityMapping (#22846)
---
 .../rackawareness/BookieRackAffinityMapping.java   | 44 +-
 .../IsolatedBookieEnsemblePlacementPolicy.java |  2 +-
 2 files changed, 28 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index 983822f2294..4a5ff746f40 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
 private BookiesRackConfiguration racksWithHost = new 
BookiesRackConfiguration();
 private Map bookieInfoMap = new HashMap<>();
 
-public static MetadataStore createMetadataStore(Configuration conf) throws 
MetadataException {
+static MetadataStore getMetadataStore(Configuration conf) throws 
MetadataException {
 MetadataStore store;
 Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
 if (storeProperty != null) {
@@ -116,12 +116,20 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
 super.setConf(conf);
 MetadataStore store;
 try {
-store = createMetadataStore(conf);
-bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
-store.registerListener(this::handleUpdates);
-racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
-.orElseGet(BookiesRackConfiguration::new);
-for (Map bookieMapping : 
racksWithHost.values()) {
+store = getMetadataStore(conf);
+} catch (MetadataException e) {
+throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to 
init BookieId list");
+}
+
+bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
+store.registerListener(this::handleUpdates);
+
+try {
+var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+.thenApply(optRes -> 
optRes.orElseGet(BookiesRackConfiguration::new))
+.get();
+
+for (var bookieMapping : racksWithHost.values()) {
 for (String address : bookieMapping.keySet()) {
 bookieAddressListLastTime.add(BookieId.parse(address));
 }
@@ -131,10 +139,12 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
 }
 }
 updateRacksWithHost(racksWithHost);
-watchAvailableBookies();
-} catch (InterruptedException | ExecutionException | MetadataException 
e) {
-throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to 
init BookieId list");
+} catch (ExecutionException | InterruptedException e) {
+LOG.error("Failed to update rack info. ", e);
+throw new RuntimeException(e);
 }
+
+watchAvailableBookies();
 }
 
 private void watchAvailableBookies() {
@@ -145,13 +155,13 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
 field.setAccessible(true);
 RegistrationClient registrationClient = (RegistrationClient) 
field.get(bookieAddressResolver);
 registrationClient.watchWritableBookies(versioned -> {
-try {
-racksWithHost = 
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
-.orElseGet(BookiesRackConfiguration::new);
-updateRacksWithHost(racksWithHost);
-} catch (InterruptedException | ExecutionException e) {
-LOG.error("Failed to update rack info. ", e);
-}
+bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+.thenApply(optRes -> 
optRes.orElseGet(BookiesRackConfiguration::new))
+.thenAccept(this::updateRacksWithHost)
+.exceptionally(ex -> {
+LOG.error("Failed to update rack info. ", ex);
+return null;
+});
 });
 } catch (NoSuchFieldException | IllegalAccessException e) {
 LOG.error("Failed watch available 

(pulsar) 01/02: [fix] Remove blocking calls from BookieRackAffinityMapping (#22846)

2024-06-05 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3e2ca291d3e32d71caa60e5b3ee0be9b920a0626
Author: Matteo Merli 
AuthorDate: Wed Jun 5 10:49:00 2024 -0700

[fix] Remove blocking calls from BookieRackAffinityMapping (#22846)
---
 .../rackawareness/BookieRackAffinityMapping.java   | 44 +-
 .../IsolatedBookieEnsemblePlacementPolicy.java |  2 +-
 2 files changed, 28 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index 983822f2294..4a5ff746f40 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
 private BookiesRackConfiguration racksWithHost = new 
BookiesRackConfiguration();
 private Map bookieInfoMap = new HashMap<>();
 
-public static MetadataStore createMetadataStore(Configuration conf) throws 
MetadataException {
+static MetadataStore getMetadataStore(Configuration conf) throws 
MetadataException {
 MetadataStore store;
 Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
 if (storeProperty != null) {
@@ -116,12 +116,20 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
 super.setConf(conf);
 MetadataStore store;
 try {
-store = createMetadataStore(conf);
-bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
-store.registerListener(this::handleUpdates);
-racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
-.orElseGet(BookiesRackConfiguration::new);
-for (Map bookieMapping : 
racksWithHost.values()) {
+store = getMetadataStore(conf);
+} catch (MetadataException e) {
+throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to 
init BookieId list");
+}
+
+bookieMappingCache = 
store.getMetadataCache(BookiesRackConfiguration.class);
+store.registerListener(this::handleUpdates);
+
+try {
+var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+.thenApply(optRes -> 
optRes.orElseGet(BookiesRackConfiguration::new))
+.get();
+
+for (var bookieMapping : racksWithHost.values()) {
 for (String address : bookieMapping.keySet()) {
 bookieAddressListLastTime.add(BookieId.parse(address));
 }
@@ -131,10 +139,12 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
 }
 }
 updateRacksWithHost(racksWithHost);
-watchAvailableBookies();
-} catch (InterruptedException | ExecutionException | MetadataException 
e) {
-throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to 
init BookieId list");
+} catch (ExecutionException | InterruptedException e) {
+LOG.error("Failed to update rack info. ", e);
+throw new RuntimeException(e);
 }
+
+watchAvailableBookies();
 }
 
 private void watchAvailableBookies() {
@@ -145,13 +155,13 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
 field.setAccessible(true);
 RegistrationClient registrationClient = (RegistrationClient) 
field.get(bookieAddressResolver);
 registrationClient.watchWritableBookies(versioned -> {
-try {
-racksWithHost = 
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
-.orElseGet(BookiesRackConfiguration::new);
-updateRacksWithHost(racksWithHost);
-} catch (InterruptedException | ExecutionException e) {
-LOG.error("Failed to update rack info. ", e);
-}
+bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+.thenApply(optRes -> 
optRes.orElseGet(BookiesRackConfiguration::new))
+.thenAccept(this::updateRacksWithHost)
+.exceptionally(ex -> {
+LOG.error("Failed to update rack info. ", ex);
+return null;
+});
 });
 } catch (NoSuchFieldException | IllegalAccessException e) {
 LOG.error("Failed watch available