This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 85c4ba066481e0c303bc2928b493e578af79a146
Author: Hang Chen <chenh...@apache.org>
AuthorDate: Sat Dec 11 12:31:50 2021 +0800

    Use current resourceUsage value as historyUsage when leader change in 
ThresholdShedder (#13136)
    
    ### Motivation
    Fix #13119
    
    ### Modification
    1. User current resourceUsage value as historyUsage value when leader 
change in ThresholdShedder to speed up getting the actual historyUsage value.
    
    (cherry picked from commit 6d9d24d50db5418ddbb845d2c7a2be2b9ac72893)
---
 .../broker/loadbalance/impl/ThresholdShedder.java  | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 3996592..727be9b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -38,13 +38,9 @@ import org.slf4j.LoggerFactory;
 
 public class ThresholdShedder implements LoadSheddingStrategy {
     private static final Logger log = 
LoggerFactory.getLogger(ThresholdShedder.class);
-
     private final Multimap<String, String> selectedBundlesCache = 
ArrayListMultimap.create();
-
     private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
-
     private static final double MB = 1024 * 1024;
-
     private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
 
     @Override
@@ -139,25 +135,27 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
         for (Map.Entry<String, BrokerData> entry : 
loadData.getBrokerData().entrySet()) {
             LocalBrokerData localBrokerData = entry.getValue().getLocalData();
             String broker = entry.getKey();
-            updateAvgResourceUsage(broker, localBrokerData, historyPercentage, 
conf);
-            totalUsage += brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            totalUsage += updateAvgResourceUsage(broker, localBrokerData, 
historyPercentage, conf);
             totalBrokers++;
         }
 
         return totalBrokers > 0 ? totalUsage / totalBrokers : 0;
     }
 
-    private void updateAvgResourceUsage(String broker, LocalBrokerData 
localBrokerData, final double historyPercentage,
-                                        final ServiceConfiguration conf) {
-        double historyUsage =
-                brokerAvgResourceUsage.getOrDefault(broker, 0.0);
-        historyUsage = historyUsage * historyPercentage
-                + (1 - historyPercentage) * 
localBrokerData.getMaxResourceUsageWithWeight(
+    private double updateAvgResourceUsage(String broker, LocalBrokerData 
localBrokerData,
+                                          final double historyPercentage, 
final ServiceConfiguration conf) {
+        Double historyUsage =
+                brokerAvgResourceUsage.get(broker);
+        double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight(
                 conf.getLoadBalancerCPUResourceWeight(),
                 conf.getLoadBalancerMemoryResourceWeight(), 
conf.getLoadBalancerDirectMemoryResourceWeight(),
                 conf.getLoadBalancerBandwithInResourceWeight(),
                 conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - 
historyPercentage) * resourceUsage;
+
         brokerAvgResourceUsage.put(broker, historyUsage);
+        return historyUsage;
     }
 
 }

Reply via email to