This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 11821c21e16 [fix][broker] Make 
ExtensibleLoadManagerImpl.getOwnedServiceUnits async (#22727)
11821c21e16 is described below

commit 11821c21e16a70ea159e666f6f6c8bd1c9f3304b
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Thu May 16 22:17:38 2024 -0700

    [fix][broker] Make ExtensibleLoadManagerImpl.getOwnedServiceUnits async 
(#22727)
    
    (cherry picked from commit fd5916cca6ee2041efa3947d19910e16d94d1bee)
---
 .../extensions/ExtensibleLoadManagerImpl.java      | 47 +++++++++-------------
 .../pulsar/broker/namespace/NamespaceService.java  | 19 +++++----
 .../extensions/ExtensibleLoadManagerImplTest.java  | 17 +++++---
 3 files changed, 43 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 41832fb6007..c22a4086a63 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -204,13 +204,14 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     /**
      * Get all the bundles that are owned by this broker.
      */
-    public Set<NamespaceBundle> getOwnedServiceUnits() {
+    public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() 
{
         if (!started) {
             log.warn("Failed to get owned service units, load manager is not 
started.");
-            return Collections.emptySet();
+            return CompletableFuture.completedFuture(Collections.emptySet());
         }
-        Set<Map.Entry<String, ServiceUnitStateData>> entrySet = 
serviceUnitStateChannel.getOwnershipEntrySet();
+
         String brokerId = brokerRegistry.getBrokerId();
+        Set<Map.Entry<String, ServiceUnitStateData>> entrySet = 
serviceUnitStateChannel.getOwnershipEntrySet();
         Set<NamespaceBundle> ownedServiceUnits = entrySet.stream()
                 .filter(entry -> {
                     var stateData = entry.getValue();
@@ -223,34 +224,26 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 }).collect(Collectors.toSet());
         // Add heartbeat and SLA monitor namespace bundle.
         NamespaceName heartbeatNamespace = 
NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
-        try {
-            NamespaceBundle fullBundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
-                    .getFullBundle(heartbeatNamespace);
-            ownedServiceUnits.add(fullBundle);
-        } catch (Exception e) {
-            log.warn("Failed to get heartbeat namespace bundle.", e);
-        }
         NamespaceName heartbeatNamespaceV2 = NamespaceService
                 .getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
-        try {
-            NamespaceBundle fullBundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
-                    .getFullBundle(heartbeatNamespaceV2);
-            ownedServiceUnits.add(fullBundle);
-        } catch (Exception e) {
-            log.warn("Failed to get heartbeat namespace V2 bundle.", e);
-        }
-
         NamespaceName slaMonitorNamespace = NamespaceService
                 .getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
-        try {
-            NamespaceBundle fullBundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
-                    .getFullBundle(slaMonitorNamespace);
-            ownedServiceUnits.add(fullBundle);
-        } catch (Exception e) {
-            log.warn("Failed to get SLA Monitor namespace bundle.", e);
-        }
-
-        return ownedServiceUnits;
+        return pulsar.getNamespaceService().getNamespaceBundleFactory()
+                .getFullBundleAsync(heartbeatNamespace)
+                .thenAccept(fullBundle -> 
ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
+                    log.warn("Failed to get heartbeat namespace bundle.", e);
+                    return null;
+                }).thenCompose(__ -> 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                        .getFullBundleAsync(heartbeatNamespaceV2))
+                .thenAccept(fullBundle -> 
ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
+                    log.warn("Failed to get heartbeat namespace V2 bundle.", 
e);
+                    return null;
+                }).thenCompose(__ -> 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                        .getFullBundleAsync(slaMonitorNamespace))
+                .thenAccept(fullBundle -> 
ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
+                    log.warn("Failed to get SLA Monitor namespace bundle.", e);
+                    return null;
+                }).thenApply(__ -> ownedServiceUnits);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 44cdd6368fe..96936b3a5c0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -865,12 +865,12 @@ public class NamespaceService implements AutoCloseable {
                    if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
                        ExtensibleLoadManagerImpl extensibleLoadManager =
                                
ExtensibleLoadManagerImpl.get(loadManager.get());
-                       var statusMap = 
extensibleLoadManager.getOwnedServiceUnits().stream()
-                               
.collect(Collectors.toMap(NamespaceBundle::toString,
-                                       bundle -> 
getNamespaceOwnershipStatus(true,
-                                               
namespaceIsolationPolicies.getPolicyByNamespace(
-                                                       
bundle.getNamespaceObject()))));
-                       return CompletableFuture.completedFuture(statusMap);
+                       return extensibleLoadManager.getOwnedServiceUnitsAsync()
+                               .thenApply(OwnedServiceUnits -> 
OwnedServiceUnits.stream()
+                                       
.collect(Collectors.toMap(NamespaceBundle::toString,
+                                               bundle -> 
getNamespaceOwnershipStatus(true,
+                                                       
namespaceIsolationPolicies.getPolicyByNamespace(
+                                                               
bundle.getNamespaceObject())))));
                    }
                     Collection<CompletableFuture<OwnedBundle>> futures =
                             ownershipCache.getOwnedBundlesAsync().values();
@@ -1187,7 +1187,12 @@ public class NamespaceService implements AutoCloseable {
     public Set<NamespaceBundle> getOwnedServiceUnits() {
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
-            return extensibleLoadManager.getOwnedServiceUnits();
+            try {
+                return extensibleLoadManager.getOwnedServiceUnitsAsync()
+                        .get(config.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
         }
         return 
ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle)
                 .collect(Collectors.toSet());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index a385b0d3c5c..8b96ed04f64 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1579,13 +1579,15 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                 .getFullBundle(slaMonitorNamespacePulsar2);
 
 
-        Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = 
primaryLoadManager.getOwnedServiceUnits();
+        Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = 
primaryLoadManager.getOwnedServiceUnitsAsync()
+                .get(5, TimeUnit.SECONDS);
         log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
         // heartbeat namespace bundle will own by pulsar1
         assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
         assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
         assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1));
-        Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = 
secondaryLoadManager.getOwnedServiceUnits();
+        Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = 
secondaryLoadManager.getOwnedServiceUnitsAsync()
+                .get(5, TimeUnit.SECONDS);
         log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
         assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
         assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
@@ -1621,7 +1623,8 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
             ExtensibleLoadManagerImpl extensibleLoadManager,
             NamespaceBundle bundle) throws PulsarAdminException {
         Awaitility.await().untilAsserted(() -> {
-            Set<NamespaceBundle> ownedBundles = 
extensibleLoadManager.getOwnedServiceUnits();
+            Set<NamespaceBundle> ownedBundles = 
extensibleLoadManager.getOwnedServiceUnitsAsync()
+                    .get(5, TimeUnit.SECONDS);
             assertTrue(ownedBundles.contains(bundle));
         });
         Map<String, NamespaceOwnershipStatus> ownedNamespaces =
@@ -1634,9 +1637,11 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
     }
 
     @Test(timeOut = 30 * 1000)
-    public void testGetOwnedServiceUnitsWhenLoadManagerNotStart() {
+    public void testGetOwnedServiceUnitsWhenLoadManagerNotStart()
+            throws Exception {
         ExtensibleLoadManagerImpl loadManager = new 
ExtensibleLoadManagerImpl();
-        Set<NamespaceBundle> ownedServiceUnits = 
loadManager.getOwnedServiceUnits();
+        Set<NamespaceBundle> ownedServiceUnits = 
loadManager.getOwnedServiceUnitsAsync()
+                .get(5, TimeUnit.SECONDS);
         assertNotNull(ownedServiceUnits);
         assertTrue(ownedServiceUnits.isEmpty());
     }
@@ -1651,7 +1656,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         NamespaceEphemeralData namespaceEphemeralData = 
primaryLoadManager.tryAcquiringOwnership(bundle).get();
         assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), 
pulsar2.getBrokerServiceUrl())
                 .contains(namespaceEphemeralData.getNativeUrl()));
-        admin.namespaces().deleteNamespace(namespace, true);
+        admin.namespaces().deleteNamespace(namespace);
     }
 
     @Test(timeOut = 30 * 1000)

Reply via email to