michaeljmarshall commented on a change in pull request #12471:
URL: https://github.com/apache/pulsar/pull/12471#discussion_r752585894



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
##########
@@ -74,19 +77,56 @@
             return selectedBundlesCache;
         }
 
+        AtomicBoolean isBalanced = new AtomicBoolean(true);
+        Map<String, BrokerData> belowAvgUsageBrokers = new HashMap<>();
+        Map<String, BrokerData> overAvgUsageBrokers = new HashMap<>();
+        AtomicDouble sumOfAcceptableTrafficFromBelowAvgUsageBrokers = new 
AtomicDouble(0);
+        AtomicDouble totalTrafficOfOverAvgUsageBrokers = new AtomicDouble(0);
+
+        //  1. Divided into two categories:① (currentUsage > avgUsage); 
②(currentUsage < avgUsage);
         loadData.getBrokerData().forEach((broker, brokerData) -> {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            double brokerCurrentThroughput = localData.getMsgThroughputIn() + 
localData.getMsgThroughputOut();
+            final double currentUsage = 
brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+
+            if (currentUsage > avgUsage - threshold && currentUsage < avgUsage 
+ threshold) {
+                isBalanced.set(false);
+                return;
+            }
+            if (currentUsage < avgUsage) {
+                double percentOfTrafficToAccept = avgUsage - currentUsage;
+                double trafficToAccept = percentOfTrafficToAccept * 
brokerCurrentThroughput;
+                //2. Calculate the sum of acceptable traffic in 
belowAvgUsageBrokers
+                
sumOfAcceptableTrafficFromBelowAvgUsageBrokers.addAndGet(trafficToAccept);
+                belowAvgUsageBrokers.put(broker, brokerData);
+            }
+            if (currentUsage > avgUsage) {
+                //3. Calculate the total traffic in overAvgUsageBrokers
+                
totalTrafficOfOverAvgUsageBrokers.addAndGet(brokerCurrentThroughput);
+                overAvgUsageBrokers.put(broker, brokerData);
+            }
+        });
+
+        if (isBalanced.get()) {
+            return selectedBundlesCache;
+        }
+
+        //4. Calculate the percentage of traffic to be migrated by each broker 
in overAvgUsageBrokers;
+        double percentOfTrafficToOffload = ADDITIONAL_THRESHOLD_PERCENT_MARGIN

Review comment:
       > the load utilization rate of all brokers is lower than the average 
utilization rate. 
   
   Can you clarify this point? The average usage cannot be higher than all of 
the individual usages.
   
   > At this time, according to the original calculation method, there is no 
way to calculate percentoftraffictooffload.
   
   The current algorithm isn't focused on calculating this value. It only 
unloads brokers that have utilization above a certain threshold. Moving bundles 
is not free, so some operators might be fine with a slightly underloaded broker 
as long as no one broker is too high above the average utilization.
   
   >   The sum of the traffic that can be received by all nodes lower than 
avgusage is the total traffic that we should unload from nodes higher than 
avgusage. Therefore, we only need to calculate the sum of the total traffic of 
all brokers higher than avgusage over brokertrafficsum, Then, 
sumbelowoavgtraffic / overbrokertrafficsum can get the information from each 
broker that exceeds avgusage
   
   I don't think this is correct. Since brokers in the `overAvgUsageBrokers` 
collection can have different usages, we need to calculate an amount that is 
right for _each_ broker itself. Your algorithm might work for your example 
because all brokers above the average have the same usage.
   
   I think the most productive way to show that your algorithm works as you're 
saying is to add tests that show how it will react to specific scenarios.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to