This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 85c4ba066481e0c303bc2928b493e578af79a146 Author: Hang Chen <chenh...@apache.org> AuthorDate: Sat Dec 11 12:31:50 2021 +0800 Use current resourceUsage value as historyUsage when leader change in ThresholdShedder (#13136) ### Motivation Fix #13119 ### Modification 1. User current resourceUsage value as historyUsage value when leader change in ThresholdShedder to speed up getting the actual historyUsage value. (cherry picked from commit 6d9d24d50db5418ddbb845d2c7a2be2b9ac72893) --- .../broker/loadbalance/impl/ThresholdShedder.java | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java index 3996592..727be9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java @@ -38,13 +38,9 @@ import org.slf4j.LoggerFactory; public class ThresholdShedder implements LoadSheddingStrategy { private static final Logger log = LoggerFactory.getLogger(ThresholdShedder.class); - private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create(); - private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05; - private static final double MB = 1024 * 1024; - private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>(); @Override @@ -139,25 +135,27 @@ public class ThresholdShedder implements LoadSheddingStrategy { for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) { LocalBrokerData localBrokerData = entry.getValue().getLocalData(); String broker = entry.getKey(); - updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf); - totalUsage += brokerAvgResourceUsage.getOrDefault(broker, 0.0); + totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf); totalBrokers++; } return totalBrokers > 0 ? totalUsage / totalBrokers : 0; } - private void updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData, final double historyPercentage, - final ServiceConfiguration conf) { - double historyUsage = - brokerAvgResourceUsage.getOrDefault(broker, 0.0); - historyUsage = historyUsage * historyPercentage - + (1 - historyPercentage) * localBrokerData.getMaxResourceUsageWithWeight( + private double updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData, + final double historyPercentage, final ServiceConfiguration conf) { + Double historyUsage = + brokerAvgResourceUsage.get(broker); + double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight( conf.getLoadBalancerCPUResourceWeight(), conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(), conf.getLoadBalancerBandwithInResourceWeight(), conf.getLoadBalancerBandwithOutResourceWeight()); + historyUsage = historyUsage == null + ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage; + brokerAvgResourceUsage.put(broker, historyUsage); + return historyUsage; } }