(pulsar) 01/02: [fix] Remove blocking calls from BookieRackAffinityMapping (#22846)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 190857e1854271b55ba84d63422a473ae45e64fc Author: Matteo Merli AuthorDate: Wed Jun 5 10:49:00 2024 -0700 [fix] Remove blocking calls from BookieRackAffinityMapping (#22846) --- .../rackawareness/BookieRackAffinityMapping.java | 44 +- .../IsolatedBookieEnsemblePlacementPolicy.java | 2 +- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 983822f2294..4a5ff746f40 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); private Map bookieInfoMap = new HashMap<>(); -public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException { +static MetadataStore getMetadataStore(Configuration conf) throws MetadataException { MetadataStore store; Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE); if (storeProperty != null) { @@ -116,12 +116,20 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping super.setConf(conf); MetadataStore store; try { -store = createMetadataStore(conf); -bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); -store.registerListener(this::handleUpdates); -racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() -.orElseGet(BookiesRackConfiguration::new); -for (Map bookieMapping : racksWithHost.values()) { +store = getMetadataStore(conf); +} catch (MetadataException e) { +throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); +} + +bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); +store.registerListener(this::handleUpdates); + +try { +var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) +.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new)) +.get(); + +for (var bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); } @@ -131,10 +139,12 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } } updateRacksWithHost(racksWithHost); -watchAvailableBookies(); -} catch (InterruptedException | ExecutionException | MetadataException e) { -throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); +} catch (ExecutionException | InterruptedException e) { +LOG.error("Failed to update rack info. ", e); +throw new RuntimeException(e); } + +watchAvailableBookies(); } private void watchAvailableBookies() { @@ -145,13 +155,13 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping field.setAccessible(true); RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver); registrationClient.watchWritableBookies(versioned -> { -try { -racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() -.orElseGet(BookiesRackConfiguration::new); -updateRacksWithHost(racksWithHost); -} catch (InterruptedException | ExecutionException e) { -LOG.error("Failed to update rack info. ", e); -} +bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) +.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new)) +.thenAccept(this::updateRacksWithHost) +.exceptionally(ex -> { +LOG.error("Failed to update rack info. ", ex); +return null; +}); }); } catch (NoSuchFieldException | IllegalAccessException e) { LOG.error("Failed watch available
(pulsar) 01/02: [fix] Remove blocking calls from BookieRackAffinityMapping (#22846)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 3e2ca291d3e32d71caa60e5b3ee0be9b920a0626 Author: Matteo Merli AuthorDate: Wed Jun 5 10:49:00 2024 -0700 [fix] Remove blocking calls from BookieRackAffinityMapping (#22846) --- .../rackawareness/BookieRackAffinityMapping.java | 44 +- .../IsolatedBookieEnsemblePlacementPolicy.java | 2 +- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 983822f2294..4a5ff746f40 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); private Map bookieInfoMap = new HashMap<>(); -public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException { +static MetadataStore getMetadataStore(Configuration conf) throws MetadataException { MetadataStore store; Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE); if (storeProperty != null) { @@ -116,12 +116,20 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping super.setConf(conf); MetadataStore store; try { -store = createMetadataStore(conf); -bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); -store.registerListener(this::handleUpdates); -racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() -.orElseGet(BookiesRackConfiguration::new); -for (Map bookieMapping : racksWithHost.values()) { +store = getMetadataStore(conf); +} catch (MetadataException e) { +throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); +} + +bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); +store.registerListener(this::handleUpdates); + +try { +var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) +.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new)) +.get(); + +for (var bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); } @@ -131,10 +139,12 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } } updateRacksWithHost(racksWithHost); -watchAvailableBookies(); -} catch (InterruptedException | ExecutionException | MetadataException e) { -throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); +} catch (ExecutionException | InterruptedException e) { +LOG.error("Failed to update rack info. ", e); +throw new RuntimeException(e); } + +watchAvailableBookies(); } private void watchAvailableBookies() { @@ -145,13 +155,13 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping field.setAccessible(true); RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver); registrationClient.watchWritableBookies(versioned -> { -try { -racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() -.orElseGet(BookiesRackConfiguration::new); -updateRacksWithHost(racksWithHost); -} catch (InterruptedException | ExecutionException e) { -LOG.error("Failed to update rack info. ", e); -} +bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) +.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new)) +.thenAccept(this::updateRacksWithHost) +.exceptionally(ex -> { +LOG.error("Failed to update rack info. ", ex); +return null; +}); }); } catch (NoSuchFieldException | IllegalAccessException e) { LOG.error("Failed watch available