This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b0e8fe9f7ea [fix] [broker] replace loadSheddingPipeline with 
loadSheddingStrategy. (#22786)
b0e8fe9f7ea is described below

commit b0e8fe9f7ea765d4b580ab3eb6bf3c51c59e685f
Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com>
AuthorDate: Tue May 28 18:53:57 2024 +0800

    [fix] [broker] replace loadSheddingPipeline with loadSheddingStrategy. 
(#22786)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 81 ++++++++++------------
 1 file changed, 38 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index a3e6b1c3aeb..5d08ea9c3c3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -139,8 +139,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     // LocalBrokerData available before most recent update.
     private LocalBrokerData lastData;
 
-    // Pipeline used to determine what namespaces, if any, should be unloaded.
-    private final List<LoadSheddingStrategy> loadSheddingPipeline;
+    // Used to determine what namespaces, if any, should be unloaded.
+    private LoadSheddingStrategy loadSheddingStrategy;
 
     // Local data for the broker this is running on.
     private LocalBrokerData localData;
@@ -204,7 +204,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         defaultStats = new NamespaceBundleStats();
         filterPipeline = new ArrayList<>();
         loadData = new LoadData();
-        loadSheddingPipeline = new ArrayList<>();
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
         executors = Executors.newSingleThreadExecutor(
                 new 
ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
@@ -270,7 +269,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                             () -> 
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, 
brokerToFailureDomainMap));
                 });
 
-        loadSheddingPipeline.add(createLoadSheddingStrategy());
+        loadSheddingStrategy = createLoadSheddingStrategy();
     }
 
     public void handleDataNotification(Notification t) {
@@ -476,9 +475,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         if (pulsar.getLeaderElectionService() != null
                 && pulsar.getLeaderElectionService().isLeader()) {
             
deadBrokers.forEach(this::deleteTimeAverageDataFromMetadataStoreAsync);
-            for (LoadSheddingStrategy loadSheddingStrategy : 
loadSheddingPipeline) {
-                loadSheddingStrategy.onActiveBrokersChange(activeBrokers);
-            }
+            loadSheddingStrategy.onActiveBrokersChange(activeBrokers);
             placementStrategy.onActiveBrokersChange(activeBrokers);
         }
     }
@@ -632,47 +629,45 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         final Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
         recentlyUnloadedBundles.keySet().removeIf(e -> 
recentlyUnloadedBundles.get(e) < timeout);
 
-        for (LoadSheddingStrategy strategy : loadSheddingPipeline) {
-            final Multimap<String, String> bundlesToUnload = 
strategy.findBundlesForUnloading(loadData, conf);
+        final Multimap<String, String> bundlesToUnload = 
loadSheddingStrategy.findBundlesForUnloading(loadData, conf);
 
-            bundlesToUnload.asMap().forEach((broker, bundles) -> {
-                bundles.forEach(bundle -> {
-                    final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
-                    final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
-                    if (!shouldNamespacePoliciesUnload(namespaceName, 
bundleRange, broker)) {
-                        return;
-                    }
+        bundlesToUnload.asMap().forEach((broker, bundles) -> {
+            bundles.forEach(bundle -> {
+                final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+                final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+                if (!shouldNamespacePoliciesUnload(namespaceName, bundleRange, 
broker)) {
+                    return;
+                }
 
-                    if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
-                        return;
-                    }
-                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
-                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
-                    if (!destBroker.isPresent()) {
-                        log.info("[{}] No broker available to unload bundle {} 
from broker {}",
-                                strategy.getClass().getSimpleName(), bundle, 
broker);
-                        return;
-                    }
-                    if (destBroker.get().equals(broker)) {
-                        log.warn("[{}] The destination broker {} is the same 
as the current owner broker for Bundle {}",
-                                strategy.getClass().getSimpleName(), 
destBroker.get(), bundle);
-                        return;
-                    }
+                if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
+                    return;
+                }
+                NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                if (!destBroker.isPresent()) {
+                    log.info("[{}] No broker available to unload bundle {} 
from broker {}",
+                            loadSheddingStrategy.getClass().getSimpleName(), 
bundle, broker);
+                    return;
+                }
+                if (destBroker.get().equals(broker)) {
+                    log.warn("[{}] The destination broker {} is the same as 
the current owner broker for Bundle {}",
+                            loadSheddingStrategy.getClass().getSimpleName(), 
destBroker.get(), bundle);
+                    return;
+                }
 
-                    log.info("[{}] Unloading bundle: {} from broker {} to dest 
broker {}",
-                            strategy.getClass().getSimpleName(), bundle, 
broker, destBroker.get());
-                    try {
-                        pulsar.getAdminClient().namespaces()
-                                .unloadNamespaceBundle(namespaceName, 
bundleRange, destBroker.get());
-                        loadData.getRecentlyUnloadedBundles().put(bundle, 
System.currentTimeMillis());
-                    } catch (PulsarServerException | PulsarAdminException e) {
-                        log.warn("Error when trying to perform load shedding 
on {} for broker {}", bundle, broker, e);
-                    }
-                });
+                log.info("[{}] Unloading bundle: {} from broker {} to dest 
broker {}",
+                        loadSheddingStrategy.getClass().getSimpleName(), 
bundle, broker, destBroker.get());
+                try {
+                    pulsar.getAdminClient().namespaces()
+                            .unloadNamespaceBundle(namespaceName, bundleRange, 
destBroker.get());
+                    loadData.getRecentlyUnloadedBundles().put(bundle, 
System.currentTimeMillis());
+                } catch (PulsarServerException | PulsarAdminException e) {
+                    log.warn("Error when trying to perform load shedding on {} 
for broker {}", bundle, broker, e);
+                }
             });
+        });
 
-            updateBundleUnloadingMetrics(bundlesToUnload);
-        }
+        updateBundleUnloadingMetrics(bundlesToUnload);
     }
 
     /**

Reply via email to