This is an automated email from the ASF dual-hosted git repository. heesung pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 11821c21e16 [fix][broker] Make ExtensibleLoadManagerImpl.getOwnedServiceUnits async (#22727) 11821c21e16 is described below commit 11821c21e16a70ea159e666f6f6c8bd1c9f3304b Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Thu May 16 22:17:38 2024 -0700 [fix][broker] Make ExtensibleLoadManagerImpl.getOwnedServiceUnits async (#22727) (cherry picked from commit fd5916cca6ee2041efa3947d19910e16d94d1bee) --- .../extensions/ExtensibleLoadManagerImpl.java | 47 +++++++++------------- .../pulsar/broker/namespace/NamespaceService.java | 19 +++++---- .../extensions/ExtensibleLoadManagerImplTest.java | 17 +++++--- 3 files changed, 43 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 41832fb6007..c22a4086a63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -204,13 +204,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS /** * Get all the bundles that are owned by this broker. */ - public Set<NamespaceBundle> getOwnedServiceUnits() { + public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() { if (!started) { log.warn("Failed to get owned service units, load manager is not started."); - return Collections.emptySet(); + return CompletableFuture.completedFuture(Collections.emptySet()); } - Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet(); + String brokerId = brokerRegistry.getBrokerId(); + Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet(); Set<NamespaceBundle> ownedServiceUnits = entrySet.stream() .filter(entry -> { var stateData = entry.getValue(); @@ -223,34 +224,26 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS }).collect(Collectors.toSet()); // Add heartbeat and SLA monitor namespace bundle. NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration()); - try { - NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory() - .getFullBundle(heartbeatNamespace); - ownedServiceUnits.add(fullBundle); - } catch (Exception e) { - log.warn("Failed to get heartbeat namespace bundle.", e); - } NamespaceName heartbeatNamespaceV2 = NamespaceService .getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration()); - try { - NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory() - .getFullBundle(heartbeatNamespaceV2); - ownedServiceUnits.add(fullBundle); - } catch (Exception e) { - log.warn("Failed to get heartbeat namespace V2 bundle.", e); - } - NamespaceName slaMonitorNamespace = NamespaceService .getSLAMonitorNamespace(brokerId, pulsar.getConfiguration()); - try { - NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory() - .getFullBundle(slaMonitorNamespace); - ownedServiceUnits.add(fullBundle); - } catch (Exception e) { - log.warn("Failed to get SLA Monitor namespace bundle.", e); - } - - return ownedServiceUnits; + return pulsar.getNamespaceService().getNamespaceBundleFactory() + .getFullBundleAsync(heartbeatNamespace) + .thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> { + log.warn("Failed to get heartbeat namespace bundle.", e); + return null; + }).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory() + .getFullBundleAsync(heartbeatNamespaceV2)) + .thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> { + log.warn("Failed to get heartbeat namespace V2 bundle.", e); + return null; + }).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory() + .getFullBundleAsync(slaMonitorNamespace)) + .thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> { + log.warn("Failed to get SLA Monitor namespace bundle.", e); + return null; + }).thenApply(__ -> ownedServiceUnits); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 44cdd6368fe..96936b3a5c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -865,12 +865,12 @@ public class NamespaceService implements AutoCloseable { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); - var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream() - .collect(Collectors.toMap(NamespaceBundle::toString, - bundle -> getNamespaceOwnershipStatus(true, - namespaceIsolationPolicies.getPolicyByNamespace( - bundle.getNamespaceObject())))); - return CompletableFuture.completedFuture(statusMap); + return extensibleLoadManager.getOwnedServiceUnitsAsync() + .thenApply(OwnedServiceUnits -> OwnedServiceUnits.stream() + .collect(Collectors.toMap(NamespaceBundle::toString, + bundle -> getNamespaceOwnershipStatus(true, + namespaceIsolationPolicies.getPolicyByNamespace( + bundle.getNamespaceObject()))))); } Collection<CompletableFuture<OwnedBundle>> futures = ownershipCache.getOwnedBundlesAsync().values(); @@ -1187,7 +1187,12 @@ public class NamespaceService implements AutoCloseable { public Set<NamespaceBundle> getOwnedServiceUnits() { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); - return extensibleLoadManager.getOwnedServiceUnits(); + try { + return extensibleLoadManager.getOwnedServiceUnitsAsync() + .get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } } return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle) .collect(Collectors.toSet()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index a385b0d3c5c..8b96ed04f64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1579,13 +1579,15 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase .getFullBundle(slaMonitorNamespacePulsar2); - Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits(); + Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnitsAsync() + .get(5, TimeUnit.SECONDS); log.info("Owned service units: {}", ownedServiceUnitsByPulsar1); // heartbeat namespace bundle will own by pulsar1 assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1)); assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2)); assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1)); - Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits(); + Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnitsAsync() + .get(5, TimeUnit.SECONDS); log.info("Owned service units: {}", ownedServiceUnitsByPulsar2); assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3)); assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4)); @@ -1621,7 +1623,8 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase ExtensibleLoadManagerImpl extensibleLoadManager, NamespaceBundle bundle) throws PulsarAdminException { Awaitility.await().untilAsserted(() -> { - Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnits(); + Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnitsAsync() + .get(5, TimeUnit.SECONDS); assertTrue(ownedBundles.contains(bundle)); }); Map<String, NamespaceOwnershipStatus> ownedNamespaces = @@ -1634,9 +1637,11 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase } @Test(timeOut = 30 * 1000) - public void testGetOwnedServiceUnitsWhenLoadManagerNotStart() { + public void testGetOwnedServiceUnitsWhenLoadManagerNotStart() + throws Exception { ExtensibleLoadManagerImpl loadManager = new ExtensibleLoadManagerImpl(); - Set<NamespaceBundle> ownedServiceUnits = loadManager.getOwnedServiceUnits(); + Set<NamespaceBundle> ownedServiceUnits = loadManager.getOwnedServiceUnitsAsync() + .get(5, TimeUnit.SECONDS); assertNotNull(ownedServiceUnits); assertTrue(ownedServiceUnits.isEmpty()); } @@ -1651,7 +1656,7 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get(); assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()) .contains(namespaceEphemeralData.getNativeUrl())); - admin.namespaces().deleteNamespace(namespace, true); + admin.namespaces().deleteNamespace(namespace); } @Test(timeOut = 30 * 1000)