thetumbled commented on code in PR #22946:
URL: https://github.com/apache/pulsar/pull/22946#discussion_r1650628784


##########
pip/pip-364.md:
##########
@@ -0,0 +1,476 @@
+
+# PIP-364: Introduce a new load balance algorithm AvgShedder
+
+# Background knowledge
+
+Pulsar has two load balance interfaces:
+- `LoadSheddingStrategy` is an unloading strategy that identifies high load 
brokers and unloads some of the bundles they carry to reduce the load.
+- `ModularLoadManagerStrategy` is a placement strategy responsible for 
assigning bundles to brokers.
+
+## LoadSheddingStrategy
+There are three available algorithms: `ThresholdShedder`, `OverloadShedder`, 
`UniformLoadShedder`.
+
+### ThresholdShedder
+`ThresholdShedder` uses the following method to calculate the maximum resource 
utilization rate for each broker,
+which includes CPU, direct memory, bandwidth in, and bandwidth out.
+```
+    public double getMaxResourceUsageWithWeight(final double cpuWeight,
+                                                final double 
directMemoryWeight, final double bandwidthInWeight,
+                                                final double 
bandwidthOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight,
+                directMemory.percentUsage() * directMemoryWeight, 
bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+```
+
+After calculating the maximum resource utilization rate for each broker, a 
historical weight algorithm will
+also be executed to obtain the final score.
+```
+historyUsage = historyUsage == null ? resourceUsage : historyUsage * 
historyPercentage + (1 - historyPercentage) * resourceUsage;
+```
+The historyPercentage is determined by configuring the 
`loadBalancerHistoryResourcePercentage`. 
+The default value is 0.9, which means that the last calculated score accounts 
for 90%, 
+while the current calculated score only accounts for 10%.
+
+The introduction of this historical weight algorithm is to avoid bundle 
switching caused by
+short-term abnormal load increase or decrease, but in fact, this algorithm 
will introduce some
+serious problems, which will be explained in detail later.
+
+Next, calculate the average score of all brokers in the entire cluster: 
`avgUsage=totalUsage/totalBrokers`. 
+When the score of any broker exceeds a certain threshold of avgUsage, it is 
determined that the broker is overloaded.
+The threshold is determined by the configuration 
`loadBalancerBrokerThresholdShedderPercentage`, with a default value of 10.
+
+
+### OverloadShedder
+`OverloadShedder` use the same method `getMaxResourceUsageWithWeight` to 
calculate the maximum resource utilization rate for each broker.
+The difference is that `OverloadShedder` will not use the historical weight 
algorithm to calculate the final score, 
+the final score is the current maximum resource utilization rate of the broker.
+
+After obtaining the load score for each broker, compare it with the 
`loadBalancerBrokerOverloadedThresholdPercentage`. 
+If the threshold is exceeded, it is considered overloaded, with a default 
value of 85%.
+
+This algorithm is relatively simple, but there are many serious corner cases, 
so it is not recommended to use `OverloadShedder`.
+Here are two cases:
+- When the load on each broker in the cluster reaches the threshold, the 
bundle unload will continue to be executed,
+   but it will only switch from one overloaded broker to another, which is 
meaningless.
+- If there are no broker whose load reaches the threshold, adding new brokers 
will not balance the traffic to the new added brokers.
+The impact of these two points is quite serious, so we won't talk about it 
next.
+
+
+### UniformLoadShedder
+`UniformLoadShedder` will first calculate the maximum and minimum message 
rates, as well as the maximum and minimum 
+traffic throughput and corresponding broker. Then calculate the maximum and 
minimum difference, with two thresholds 
+corresponding to message rate and throughput size, respectively.
+
+- loadBalancerMsgRateDifferenceShedderThreshold
+
+The message rate percentage threshold between the highest and lowest loaded 
brokers, with a default value of 50,
+can trigger bundle unload when the maximum message rate is 1.5 times the 
minimum message rate. 
+For example, broker 1 with 50K msgRate and broker 2 with 30K msgRate will have 
a (50-30)/30=66%>50% difference in msgRate,
+and the load balancer can unload the bundle from broker 1 to broker 2.
+
+- loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold
+
+The threshold for the message throughput multiplier between the highest and 
lowest loaded brokers, 
+with a default value of 4, can trigger bundle unload when the maximum 
throughput is 4 times the minimum throughput.
+For example, if the msgRate of broker 1 is 450MB, broker 2 is 100MB, and the 
difference in msgThrough 
+is 450/100=4.5>4 times, then the load balancer can unload the bundle from 
broker 1 to broker 2.
+
+
+After introducing the algorithm of `UniformLoadShedder`, we can clearly obtain 
the following information:
+#### load jitter
+`UniformLoadShedder` does not have the logic to handle load jitter. For 
example,
+when the traffic suddenly increases or decreases. This load data point is 
adopted, triggering a bundle unload.
+However, the traffic of this topic will soon return to normal, so it is very 
likely to trigger a bundle unload again.
+This type of bundle unload should be avoided. This kind of scenario is very 
common, actually.
+
+#### heterogeneous environment
+`UniformLoadShedder` does not rely on indicators such as CPU usage and network 
card usage to determine high load
+and low load brokers, but rather determines them based on message rate and 
traffic throughput size,
+while `ThresholdShedder` and `OverloadShedder` rely on machine resource 
indicators such as CPU usage to determine.
+If the cluster is heterogeneous, such as different machines with different 
hardware configurations, 
+or if there are other processes sharing resources on the machine where the 
broker is located, 
+`UniformLoadShedder` is likely to misjudge high and low load brokers, thereby 
migrating the load from high-performance 
+but low load brokers to low-performance but high load brokers. 
+Therefore, it is not recommended for users to use `UniformLoadShedder` in 
heterogeneous environments.
+
+#### slow load balancing
+`UniformLoadShedder` will only unload the bundle from one of the highest 
loaded brokers at a time,
+which may take a considerable amount of time for a large cluster to complete 
all load balancing tasks.
+For example, if there are 100 high load brokers in the current cluster and 100 
new machines to be added,
+it is roughly estimated that it will take 100 shedding to complete the 
balancing.
+However, since the execution time interval of the `LoadSheddingStrategy` 
policy is determined by the 
+configuration of `loadBalancerSheddingIntervalMinutes`, which defaults to once 
every 1 minute, 
+so it will take 100 minutes to complete all tasks. For users using large 
partition topics, their tasks 
+are likely to be disconnected multiple times within this 100 minutes, which 
greatly affects the user experience.
+
+
+## ModularLoadManagerStrategy
+The `LoadSheddingStrategy` strategy is used to unload bundles of high load 
brokers. However, in order to 
+achieve a good load balancing effect, it is necessary not only to "unload" 
correctly, but also to "load" correctly.
+The `ModularLoadManagerStrategy` strategy is responsible for assigning bundles 
to brokers.
+The coordination between `LoadSheddingStrategy` and 
`ModularLoadManagerStrategy` is also a key point worth paying attention to.
+
+### LeastLongTermMessageRate
+The `LeastLongTermMessageRate` algorithm directly used the maximum resource 
usage of CPU and so on as the broker's score,
+and reused the `OverloadShedder` configuration, 
`loadBalancerBrokerOverloadedThresholdPercentage`. 
+If the score is greater than it (default 85%), set `score=INF`; Otherwise, 
update the broker's score to the sum of the 
+message in and out rates obtained from the broker's long-term aggregation.
+```
+score = longTerm MsgIn rate+longTerm MsgOut rate, 
+```
+Finally, randomly select a broker from the broker with the lowest score to 
return. If the score of each broker is INF,
+randomly select broker from all brokers.
+
+The scoring algorithm in `LeastLongTermMessageRate` is essentially based on 
message rate. Although it initially examines
+the maximum resource utilization, it is to exclude overloaded brokers only.
+Therefore, in most cases, brokers are sorted based on the size of the message 
rate as a score, which results in the same
+issues with heterogeneous environments, similar to `UniformLoadShedder`.
+
+
+#### Effect of the combination of `LoadSheddingStrategy` and 
`LeastLongTermMessageRate`
+Next, we will attempt to analyze the effect together with the 
`LoadSheddingStrategy`.
+- **LeastLongTermMessageRate + OverloadShedder**
+This is the initial combination, but due to some inherent flaws in 
`OverloadShedder`, **it is not recommended**.
+
+- **LeastLongTermMessageRate + ThresholdShedder**
+This combination is even worse than `LeastLongTermMessageRate + 
OverloadShedder` and **is not recommended**. 
+Because `OverloadShedder` uses the maximum weighted resource usage and 
historical score to score brokers,
+while LeastLongTermMessage Rate is scored based on message rate. Inconsistent 
unloading and placement criteria
+can lead to incorrect load balancing execution.
+This is also why a new placement strategy `LeastResourceUsageWithWeight` will 
be introduced later.
+
+- **LeastLongTermMessageRate + UniformLoadShedder**
+This is **recommended**. Both uninstallation and placement policy are based on 
message rate,
+but using message rate as a standard naturally leads to issues with 
heterogeneous environments. 
+
+
+### LeastResourceUsageWithWeight
+`LeastResourceUsageWithWeight` uses the same scoring algorithm as 
`ThresholdShedder` to score brokers, which uses
+weighted maximum resource usage and historical scores to calculate the current 
score.
+
+Next, select candidate brokers based on the configuration of 
`loadBalancerAverageResourceUsageDifferenceThresholdPercentage`.
+If a broker's score plus this threshold is still not greater than the average 
score, the broker will be added to the 
+candidate broker list. After obtaining the candidate broker list, a broker 
will be randomly selected from it;
+If there are no candidate brokers, randomly select from all brokers.
+
+For example, if the resource utilization rate of broker 1 is 10%, broker 2 is 
30%, and broker 3 is 80%, 
+the average resource utilization rate is 40%. The placement strategy can 
choose Broker1 and Broker2 
+as the best candidates, as the thresholds are 10, 10+10<=40, 30+10<=40. In 
this way, the bundles uninstalled
+from broker 3 will be evenly distributed among broker 1 and broker 2, rather 
than being completely placed on broker 1.
+
+#### over placement problem
+Over placement problem is that the bundle is placed on high load brokers and 
make them overloaded.
+
+In practice, it will be found that it is difficult to determine a suitable 
value for `loadBalancerAverageResourceUsageDifferenceThresholdPercentage`,
+which often triggers a fallback global random selection logic. For example, if 
there are 6 brokers in the current
+cluster, with scores of 40, 40, 40, 40, 69, and 70 respectively, the average 
score is 49.83. 
+Using the default configuration, there are no candidate brokers because 
40+10>49.83. 
+Triggering a bottom-up global random selection logic and the bundle may be 
offloaded from the overloaded broker5 
+to the overloaded broker6, or vice versa, **causing the over placement 
problem.**
+
+Attempting to reduce the configuration value to expand the random pool, such 
as setting it to 0, may also include some
+overloaded brokers in the candidate broker list. For example, if there are 5 
brokers in the current cluster with scores
+of 10, 60, 70, 80, and 80 respectively, the average score is 60. As the 
configuration value is 0, then broker 1 and 
+broker 2 are both candidate brokers. If broker 2 shares half of the offloaded 
traffic, **it is highly likely to overload.**
+
+Therefore, it is difficult to configure the `LeastResourceUsageWithWeight` 
algorithm well to avoid incorrect load balancing.
+Of course, if you want to use the `ThresholdShedder` algorithm, the 
combination of `ThresholdShedder+LeastResourceUsageWithWeight`
+will still be superior to the combination of 
`ThresholdShedder+LeastLongTermMessageRate`, because at least the scoring 
algorithm
+of `LeastResourceUsageWithWeight` is consistent with that of 
`ThresholdShedder`.
+
+#### why doesn't LeastLongTermMessage Rate have over placement problem?
+The root of over placement problem is that the frequency of updating the load 
data is limited due to the performance
+of zookeeper. If we assign a bundle to a broker, the broker's load will 
increase after a while, and it's load data
+also need some time to be updated to leader broker. If there are many bundles 
unloaded in a shedding, 
+how can we assign these bundles to brokers?
+
+The most simple way is to assign them to the broker with the lowest load, but 
it may cause the over placement problem
+as it is most likely that there is only one single broker with the lowest 
load. With all bundles assigned to this broker,
+it will be overloaded. This is the reason why `LeastResourceUsageWithWeight` 
try to determine a candidate broker list
+to avoid the over placement problem. But we also find that candidate broker 
list can be empty or include some overloaded 
+brokers, which will also cause the over placement problem.
+
+So why doesn't `LeastLongTermMessageRate` have over placement problem? The 
reason is that each time a bundle is assigned,
+the bundle will be added into `PreallocatedBundleData`. When scoring a broker, 
not only will the long-term message rate 
+aggregated by the broker itself be used, but also the message rate of bundles 
in `PreallocatedBundleData` that have been
+assigned to the broker but have not yet been reflected in the broker's load 
data will be calculated.
+
+For example, if there are two bundles with 20KB/s message rate to be assigned, 
and broker1 and broker2 at 100KB/s 
+and 110KB/s respectively. The first bundle is assigned to broker1, However, 
broker1's load data will not be updated
+in the short term. Before the load data is updated, `LeastLongTermMessageRate` 
try to assign the second bundle.
+At this time, the score of broker1 is 100+20=120KB/s, where 20KB/s is the 
message rate of the first bundle
+from `PreallocatedBundleData`. As broker1's score is greater than broker2, the 
second bundle will be assigned to broker2.
+
+**`LeastLongTermMessageRate` predict the load of the broker after the bundle 
is assigned to avoid the over placement problem.**
+
+**Why doesn't `LeastResourceUsageWithWeight` have this feature? Because it is 
not possible to predict how much resource 
+utilization a broker will increase when loading a bundle. All algorithms 
scoring brokers based on resource utilization
+can't fix the over placement problem with this feature.** 
+So `LeastResourceUsageWithWeight` try to determine a candidate broker list to 
avoid the over placement problem, which is 
+proved to be not a good solution.
+
+
+#### over unloading problem
+Over unloading problem is that the load offloaded from high load brokers is 
too much and make them underloaded.
+
+Finally, let's talk about the issue of historical weighted scoring algorithms. 
The historical weighted scoring algorithm 
+is used by the `ThresholdShedder` and `LeastResourceUsageWithWeight` 
algorithms, as follows:
+```
+HistoryUsage=historyUsage=null? ResourceUsage: historyUsage * 
historyPercentage+(1- historyPercentage) * resourceUsage;
+```
+The default value of historyPercentage is 0.9, indicating that the score 
calculated last time has a significant impact on the current score.
+The current maximum resource utilization only accounts for 10%, which is to 
solves the problem of load jitter. 
+However, introducing this algorithm has its side effects, such as over 
unloading problem.
+
+For example, there is currently one broker1 in the cluster with a load of 90%, 
and broker2 is added with a current load of 10%.
+- At the first execution of shedding: broker1 scores 90, broker2 scores 10. 
For simplicity, assuming that the algorithm will
+move some bundles to make their load the same, thus the true load of broker 1 
and broker 2 become 50 after load shedding is completed.
+- At the second execution of shedding: broker1 scores 90*0.9+50*0.1=86, 
broker2 scores 10*0.9+50*0.1=14.
+**Note that the actual load of broker1 here is 50, but it is overestimated as 
86!** 
+**The true load of broker2 is also 50, but it is underestimated at 14!**
+Due to the significant difference in ratings between the two, although their 
actual loads are already the same, 
+broker1 will continue to unload traffic corresponding to 36 points from 
broker1 to broker2, 
+resulting in broker1's actual load score becoming 14, broker2's actual load 
score becoming 86. 
+
+- At the third execution of shedding: broker1 scored 86*0.9+14*0.1=78.8, 
broker2 scored 14*0.9+86*0.1=21.2.
+It is ridiculous that broker1 is still considered overloaded, and broker2 is 
still considered underloaded.
+All loads in broker1 are moved to broker2, which is the over unloading problem.
+
+Although this example is an idealized theoretical analysis, we can still see 
that using historical scoring algorithms
+can seriously overestimate or underestimate the true load of the broker. 
Although it can avoid the problem of load jitter,
+it will introduce a more serious and broader problem: **overestimating or 
underestimating the true load of the broker,
+leading to incorrect load balancing execution**.
+
+
+## Summary
+Based on the previous analysis, although we have three shedding strategies and 
two placement strategies 
+that can generate 6 combinations of 3 * 2, we actually only have two 
recommended options:
+- ThresholdShedder + LeastResourceUsageWithWeight
+- UniformLoadShedder + LeastLongTermMessageRate
+
+These two options each have their own advantages and disadvantages, and users 
can choose one according to 
+their requirements. The following table summarizes the advantages and 
disadvantages of the two options:
+
+| Combination                                 | heterogeneous environment | 
load jitter | over placement problem | over unloading problem | slow load 
balancing |
+|---------------------------------------------|---------------------------|------------|-----------------------|-----------------------|---------------------|
+| ThresholdShedder + LeastResourceUsageWithWeight | normal(1)                 
| good       | bad                   | bad                   | normal(1)        
   |
+| UniformLoadShedder + LeastLongTermMessageRate | bad(2)                    | 
bad        | good                  | good                  | normal(1)          
 |
+
+1. In terms of adapting to heterogeneous environments, 
`ThresholdShedder+LeastResourceUsageWithWeight` can
+only be rated as `normal`. This is because `ThresholdShedder` is not fully 
adaptable to heterogeneous environments.
+Although it does not misjudge overloaded brokers as underloaded, heterogeneous 
environments can still have a 
+significant impact on the load balancing effect of `ThresholdShedder`.
+For example, there are three brokers in the current cluster with resource 
utilization rates of 10, 50, and 70, respectively.
+Broker1 and Broker2 are isomorphic. Though Broker3 don't bear any load, its 
resource utilization rate has 
+reached to 70 due to the deployment of other processes at the same machine.
+At this point, we would like broker 1 to share some of the pressure from 
broker2, but since the average load is
+43.33, 43.33+10>50, broker2 will not be judged as overloaded, and overloaded 
broker 3 also has no traffic to
+unload, causing the load balancing algorithm to be in an inoperable state.
+
+2. In the same scenario, if `UniformLoadShedder+LeastLongTermMessageRate` is 
used, the problem will be more 
+severe, as some of the load will be offloaded from broker2 to broker3. As a 
result, the performance of those
+topics in broker3 services will experience significant performance degradation.
+Therefore, it is not recommended to run Pulsar in heterogeneous environments 
as current load balancing algorithms
+cannot adapt too well. If it is unavoidable, it is recommended to choose 
`ThresholdShedder+LeastResourceUsageWithWeight`.
+
+3. In terms of load balancing speed, although 
`ThresholdShedder+LeastResourceUsageWithWeight` can unload the load
+of all overloaded brokers at once, historical scoring algorithms can seriously 
affect the accuracy of load 
+balancing decisions. Therefore, in reality, it also requires multiple load 
balancing executions to finally 
+stabilize. This is why the load balancing speed of 
`ThresholdShedder+LeastResourceUsageWithWeight` is rated as `normal`.
+
+4. In terms of load balancing speed, 
`UniformLoadShedder+LeastLongTermMessageRate` can only unload the load of one
+overloaded broker at a time, so it takes a long time to complete load 
balancing when there are many brokers, 
+so it is also rated as `normal`.
+
+
+# Motivation
+
+The current load balance algorithm has some serious problems, such as load 
jitter, heterogeneous environment, slow load balancing, etc.
+This PIP aims to introduce a new load balance algorithm `AvgShedder` to solve 
these problems.
+
+# Goals
+
+Introduce a new load balance algorithm `AvgShedder` that can solve the 
problems of load jitter, heterogeneous environment, slow load balancing, etc.
+
+
+# High Level Design
+
+## scoring criterion
+First of all, to determine high load brokers, it is necessary to rate and sort 
them.
+Currently, there are two scoring criteria:
+- Resource utilization rate of broker
+- The message rate and throughput of the broker
+Based on the previous analysis, it can be seen that scoring based on message 
rate and throughput will face 
+the same problem as `UniformLoadShedder` in heterogeneous environments, while 
scoring based on resource utilization
+rate will face the over placement problem like `LeastResourceUsageWithWeight`. 
+
+**To solve the problem of heterogeneous environments, we use the resource 
utilization rate of the broker as the scoring criterion.**
+
+
+## binding shedding and placement strategies
+So how can we avoid the over placement problem? **The key is to bind the 
shedding and placement strategies together.**
+If every bundle unloaded from the high load broker is assigned to the right 
low load broker in shedding strategy,
+the over placement problem will be solved.
+
+For example, if the broker rating of the current cluster is 20,30,52,80,80, 
and the shedding and placement strategies are decoupled,
+the bundles will be unloaded from the two brokers with score of 80, and then 
all these bundles will be placed on the broker with a 
+score of 20, causing the over placement problem. 
+
+If the shedding and placement strategies are coupled, one broker with 80 score 
can unload some bundles to a broker with 20 score,
+and another broker with 80 score can unload the bundle to the broker with 30 
score. In this way, we can avoid the over placement problem.
+
+
+## evenly distributed traffic between the highest and lowest loaded brokers
+We will first pick out the highest and lowest loaded brokers, and then evenly 
distribute the traffic between them.
+
+For example, if the broker rating of the current cluster is 20,30,52,70,80, 
and the message rate of the highest loaded broker is 1000,
+the message rate of the lowest loaded broker is 500. We introduce a threshold 
to whether trigger the bundle unload, for example, 
+the threshold is 40. As the difference between the score of the highest and 
lowest loaded brokers is 100-50=50>40, 
+the shedding strategy will be triggered.
+
+To achieve the goal of evenly distributing the traffic between the highest and 
lowest loaded brokers, the shedding strategy will
+try to make the message rate of two brokers the same, which is 
(1000+500)/2=750. The shedding strategy will unload 250 message rate from the
+highest loaded broker to the lowest loaded broker. After the shedding strategy 
is completed, the message rate of two brokers will be
+same, which is 750.
+
+
+## improve the load balancing speed
+As we mentioned earlier in `UniformLoadShedder`, if strategy only handles one 
high load broker at a time, it will take a long time to 
+complete all load balancing tasks. Therefore, we further optimize it by 
matching multiple pairs of high and low load brokers in 
+a single shedding. After sorting the broker scores, the first and last place 
are paired, the second and and the second to last are paired,
+and so on. When the score difference between the two paired brokers is greater 
than the threshold, the load will be evenly distributed
+between the two, which can solve the problem of slow speed.
+
+For example, if the broker rating of the current cluster is 20,30,52,70,80, we 
will pair 20 and 80, 30 and 70. As the difference between
+the two paired brokers is 80-20=60, 70-30=40, which are both greater than the 
threshold 40, the shedding strategy will be triggered.
+
+
+## handle load jitter with multiple hits threshold
+What about the historical weighting algorithm used in `ThresholdShedder`? It 
is used to solve the problem of load jitter, but previous
+analysis and experiments have shown that it can bring serious negative 
effects, so we can no longer use this method to solve the
+problem of load jitter.
+
+We mimic the way alarms are triggered: the threshold is triggered multiple 
times before the bundle unload is finally triggered. 
+For example, when the difference between a pair of brokers exceeds the 
threshold three times, load balancing is triggered.
+
+## high and low threshold
+In situations of cluster rolling restart or expansion, there is often a 
significant load difference between 
+different brokers, and we hope to complete load balancing more quickly. 
+
+Therefore, we introduce two thresholds:
+- loadBalancerAvgShedderLowThreshold, default value is 15
+- loadBalancerAvgShedderHighThreshold, default value is 40
+
+Two thresholds correspond to two continuous hit count requirements:
+- loadBalancerAvgShedderHitCountLowThreshold, default value is 8
+- loadBalancerAvgShedderHitCountHighThreshold, default value of 2
+
+When the difference in scores between two paired brokers exceeds the 
`loadBalancerAvgShedderLowThreshold` by 

Review Comment:
   When the threshold difference is 30, the hit count requirement should be 
`loadBalancerAvgShedderHitCountLowThreshold`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to