This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b0e8fe9f7ea [fix] [broker] replace loadSheddingPipeline with loadSheddingStrategy. (#22786) b0e8fe9f7ea is described below commit b0e8fe9f7ea765d4b580ab3eb6bf3c51c59e685f Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Tue May 28 18:53:57 2024 +0800 [fix] [broker] replace loadSheddingPipeline with loadSheddingStrategy. (#22786) --- .../loadbalance/impl/ModularLoadManagerImpl.java | 81 ++++++++++------------ 1 file changed, 38 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index a3e6b1c3aeb..5d08ea9c3c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -139,8 +139,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // LocalBrokerData available before most recent update. private LocalBrokerData lastData; - // Pipeline used to determine what namespaces, if any, should be unloaded. - private final List<LoadSheddingStrategy> loadSheddingPipeline; + // Used to determine what namespaces, if any, should be unloaded. + private LoadSheddingStrategy loadSheddingStrategy; // Local data for the broker this is running on. private LocalBrokerData localData; @@ -204,7 +204,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager { defaultStats = new NamespaceBundleStats(); filterPipeline = new ArrayList<>(); loadData = new LoadData(); - loadSheddingPipeline = new ArrayList<>(); preallocatedBundleToBroker = new ConcurrentHashMap<>(); executors = Executors.newSingleThreadExecutor( new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager")); @@ -270,7 +269,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { () -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap)); }); - loadSheddingPipeline.add(createLoadSheddingStrategy()); + loadSheddingStrategy = createLoadSheddingStrategy(); } public void handleDataNotification(Notification t) { @@ -476,9 +475,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { if (pulsar.getLeaderElectionService() != null && pulsar.getLeaderElectionService().isLeader()) { deadBrokers.forEach(this::deleteTimeAverageDataFromMetadataStoreAsync); - for (LoadSheddingStrategy loadSheddingStrategy : loadSheddingPipeline) { - loadSheddingStrategy.onActiveBrokersChange(activeBrokers); - } + loadSheddingStrategy.onActiveBrokersChange(activeBrokers); placementStrategy.onActiveBrokersChange(activeBrokers); } } @@ -632,47 +629,45 @@ public class ModularLoadManagerImpl implements ModularLoadManager { final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles(); recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout); - for (LoadSheddingStrategy strategy : loadSheddingPipeline) { - final Multimap<String, String> bundlesToUnload = strategy.findBundlesForUnloading(loadData, conf); + final Multimap<String, String> bundlesToUnload = loadSheddingStrategy.findBundlesForUnloading(loadData, conf); - bundlesToUnload.asMap().forEach((broker, bundles) -> { - bundles.forEach(bundle -> { - final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); - final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); - if (!shouldNamespacePoliciesUnload(namespaceName, bundleRange, broker)) { - return; - } + bundlesToUnload.asMap().forEach((broker, bundles) -> { + bundles.forEach(bundle -> { + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + if (!shouldNamespacePoliciesUnload(namespaceName, bundleRange, broker)) { + return; + } - if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) { - return; - } - NamespaceBundle bundleToUnload = LoadManagerShared.getNamespaceBundle(pulsar, bundle); - Optional<String> destBroker = this.selectBroker(bundleToUnload); - if (!destBroker.isPresent()) { - log.info("[{}] No broker available to unload bundle {} from broker {}", - strategy.getClass().getSimpleName(), bundle, broker); - return; - } - if (destBroker.get().equals(broker)) { - log.warn("[{}] The destination broker {} is the same as the current owner broker for Bundle {}", - strategy.getClass().getSimpleName(), destBroker.get(), bundle); - return; - } + if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) { + return; + } + NamespaceBundle bundleToUnload = LoadManagerShared.getNamespaceBundle(pulsar, bundle); + Optional<String> destBroker = this.selectBroker(bundleToUnload); + if (!destBroker.isPresent()) { + log.info("[{}] No broker available to unload bundle {} from broker {}", + loadSheddingStrategy.getClass().getSimpleName(), bundle, broker); + return; + } + if (destBroker.get().equals(broker)) { + log.warn("[{}] The destination broker {} is the same as the current owner broker for Bundle {}", + loadSheddingStrategy.getClass().getSimpleName(), destBroker.get(), bundle); + return; + } - log.info("[{}] Unloading bundle: {} from broker {} to dest broker {}", - strategy.getClass().getSimpleName(), bundle, broker, destBroker.get()); - try { - pulsar.getAdminClient().namespaces() - .unloadNamespaceBundle(namespaceName, bundleRange, destBroker.get()); - loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis()); - } catch (PulsarServerException | PulsarAdminException e) { - log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e); - } - }); + log.info("[{}] Unloading bundle: {} from broker {} to dest broker {}", + loadSheddingStrategy.getClass().getSimpleName(), bundle, broker, destBroker.get()); + try { + pulsar.getAdminClient().namespaces() + .unloadNamespaceBundle(namespaceName, bundleRange, destBroker.get()); + loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis()); + } catch (PulsarServerException | PulsarAdminException e) { + log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e); + } }); + }); - updateBundleUnloadingMetrics(bundlesToUnload); - } + updateBundleUnloadingMetrics(bundlesToUnload); } /**