thetumbled commented on code in PR #22946: URL: https://github.com/apache/pulsar/pull/22946#discussion_r1650628784
########## pip/pip-364.md: ########## @@ -0,0 +1,476 @@ + +# PIP-364: Introduce a new load balance algorithm AvgShedder + +# Background knowledge + +Pulsar has two load balance interfaces: +- `LoadSheddingStrategy` is an unloading strategy that identifies high load brokers and unloads some of the bundles they carry to reduce the load. +- `ModularLoadManagerStrategy` is a placement strategy responsible for assigning bundles to brokers. + +## LoadSheddingStrategy +There are three available algorithms: `ThresholdShedder`, `OverloadShedder`, `UniformLoadShedder`. + +### ThresholdShedder +`ThresholdShedder` uses the following method to calculate the maximum resource utilization rate for each broker, +which includes CPU, direct memory, bandwidth in, and bandwidth out. +``` + public double getMaxResourceUsageWithWeight(final double cpuWeight, + final double directMemoryWeight, final double bandwidthInWeight, + final double bandwidthOutWeight) { + return max(cpu.percentUsage() * cpuWeight, + directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight, + bandwidthOut.percentUsage() * bandwidthOutWeight) / 100; + } +``` + +After calculating the maximum resource utilization rate for each broker, a historical weight algorithm will +also be executed to obtain the final score. +``` +historyUsage = historyUsage == null ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage; +``` +The historyPercentage is determined by configuring the `loadBalancerHistoryResourcePercentage`. +The default value is 0.9, which means that the last calculated score accounts for 90%, +while the current calculated score only accounts for 10%. + +The introduction of this historical weight algorithm is to avoid bundle switching caused by +short-term abnormal load increase or decrease, but in fact, this algorithm will introduce some +serious problems, which will be explained in detail later. + +Next, calculate the average score of all brokers in the entire cluster: `avgUsage=totalUsage/totalBrokers`. +When the score of any broker exceeds a certain threshold of avgUsage, it is determined that the broker is overloaded. +The threshold is determined by the configuration `loadBalancerBrokerThresholdShedderPercentage`, with a default value of 10. + + +### OverloadShedder +`OverloadShedder` use the same method `getMaxResourceUsageWithWeight` to calculate the maximum resource utilization rate for each broker. +The difference is that `OverloadShedder` will not use the historical weight algorithm to calculate the final score, +the final score is the current maximum resource utilization rate of the broker. + +After obtaining the load score for each broker, compare it with the `loadBalancerBrokerOverloadedThresholdPercentage`. +If the threshold is exceeded, it is considered overloaded, with a default value of 85%. + +This algorithm is relatively simple, but there are many serious corner cases, so it is not recommended to use `OverloadShedder`. +Here are two cases: +- When the load on each broker in the cluster reaches the threshold, the bundle unload will continue to be executed, + but it will only switch from one overloaded broker to another, which is meaningless. +- If there are no broker whose load reaches the threshold, adding new brokers will not balance the traffic to the new added brokers. +The impact of these two points is quite serious, so we won't talk about it next. + + +### UniformLoadShedder +`UniformLoadShedder` will first calculate the maximum and minimum message rates, as well as the maximum and minimum +traffic throughput and corresponding broker. Then calculate the maximum and minimum difference, with two thresholds +corresponding to message rate and throughput size, respectively. + +- loadBalancerMsgRateDifferenceShedderThreshold + +The message rate percentage threshold between the highest and lowest loaded brokers, with a default value of 50, +can trigger bundle unload when the maximum message rate is 1.5 times the minimum message rate. +For example, broker 1 with 50K msgRate and broker 2 with 30K msgRate will have a (50-30)/30=66%>50% difference in msgRate, +and the load balancer can unload the bundle from broker 1 to broker 2. + +- loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold + +The threshold for the message throughput multiplier between the highest and lowest loaded brokers, +with a default value of 4, can trigger bundle unload when the maximum throughput is 4 times the minimum throughput. +For example, if the msgRate of broker 1 is 450MB, broker 2 is 100MB, and the difference in msgThrough +is 450/100=4.5>4 times, then the load balancer can unload the bundle from broker 1 to broker 2. + + +After introducing the algorithm of `UniformLoadShedder`, we can clearly obtain the following information: +#### load jitter +`UniformLoadShedder` does not have the logic to handle load jitter. For example, +when the traffic suddenly increases or decreases. This load data point is adopted, triggering a bundle unload. +However, the traffic of this topic will soon return to normal, so it is very likely to trigger a bundle unload again. +This type of bundle unload should be avoided. This kind of scenario is very common, actually. + +#### heterogeneous environment +`UniformLoadShedder` does not rely on indicators such as CPU usage and network card usage to determine high load +and low load brokers, but rather determines them based on message rate and traffic throughput size, +while `ThresholdShedder` and `OverloadShedder` rely on machine resource indicators such as CPU usage to determine. +If the cluster is heterogeneous, such as different machines with different hardware configurations, +or if there are other processes sharing resources on the machine where the broker is located, +`UniformLoadShedder` is likely to misjudge high and low load brokers, thereby migrating the load from high-performance +but low load brokers to low-performance but high load brokers. +Therefore, it is not recommended for users to use `UniformLoadShedder` in heterogeneous environments. + +#### slow load balancing +`UniformLoadShedder` will only unload the bundle from one of the highest loaded brokers at a time, +which may take a considerable amount of time for a large cluster to complete all load balancing tasks. +For example, if there are 100 high load brokers in the current cluster and 100 new machines to be added, +it is roughly estimated that it will take 100 shedding to complete the balancing. +However, since the execution time interval of the `LoadSheddingStrategy` policy is determined by the +configuration of `loadBalancerSheddingIntervalMinutes`, which defaults to once every 1 minute, +so it will take 100 minutes to complete all tasks. For users using large partition topics, their tasks +are likely to be disconnected multiple times within this 100 minutes, which greatly affects the user experience. + + +## ModularLoadManagerStrategy +The `LoadSheddingStrategy` strategy is used to unload bundles of high load brokers. However, in order to +achieve a good load balancing effect, it is necessary not only to "unload" correctly, but also to "load" correctly. +The `ModularLoadManagerStrategy` strategy is responsible for assigning bundles to brokers. +The coordination between `LoadSheddingStrategy` and `ModularLoadManagerStrategy` is also a key point worth paying attention to. + +### LeastLongTermMessageRate +The `LeastLongTermMessageRate` algorithm directly used the maximum resource usage of CPU and so on as the broker's score, +and reused the `OverloadShedder` configuration, `loadBalancerBrokerOverloadedThresholdPercentage`. +If the score is greater than it (default 85%), set `score=INF`; Otherwise, update the broker's score to the sum of the +message in and out rates obtained from the broker's long-term aggregation. +``` +score = longTerm MsgIn rate+longTerm MsgOut rate, +``` +Finally, randomly select a broker from the broker with the lowest score to return. If the score of each broker is INF, +randomly select broker from all brokers. + +The scoring algorithm in `LeastLongTermMessageRate` is essentially based on message rate. Although it initially examines +the maximum resource utilization, it is to exclude overloaded brokers only. +Therefore, in most cases, brokers are sorted based on the size of the message rate as a score, which results in the same +issues with heterogeneous environments, similar to `UniformLoadShedder`. + + +#### Effect of the combination of `LoadSheddingStrategy` and `LeastLongTermMessageRate` +Next, we will attempt to analyze the effect together with the `LoadSheddingStrategy`. +- **LeastLongTermMessageRate + OverloadShedder** +This is the initial combination, but due to some inherent flaws in `OverloadShedder`, **it is not recommended**. + +- **LeastLongTermMessageRate + ThresholdShedder** +This combination is even worse than `LeastLongTermMessageRate + OverloadShedder` and **is not recommended**. +Because `OverloadShedder` uses the maximum weighted resource usage and historical score to score brokers, +while LeastLongTermMessage Rate is scored based on message rate. Inconsistent unloading and placement criteria +can lead to incorrect load balancing execution. +This is also why a new placement strategy `LeastResourceUsageWithWeight` will be introduced later. + +- **LeastLongTermMessageRate + UniformLoadShedder** +This is **recommended**. Both uninstallation and placement policy are based on message rate, +but using message rate as a standard naturally leads to issues with heterogeneous environments. + + +### LeastResourceUsageWithWeight +`LeastResourceUsageWithWeight` uses the same scoring algorithm as `ThresholdShedder` to score brokers, which uses +weighted maximum resource usage and historical scores to calculate the current score. + +Next, select candidate brokers based on the configuration of `loadBalancerAverageResourceUsageDifferenceThresholdPercentage`. +If a broker's score plus this threshold is still not greater than the average score, the broker will be added to the +candidate broker list. After obtaining the candidate broker list, a broker will be randomly selected from it; +If there are no candidate brokers, randomly select from all brokers. + +For example, if the resource utilization rate of broker 1 is 10%, broker 2 is 30%, and broker 3 is 80%, +the average resource utilization rate is 40%. The placement strategy can choose Broker1 and Broker2 +as the best candidates, as the thresholds are 10, 10+10<=40, 30+10<=40. In this way, the bundles uninstalled +from broker 3 will be evenly distributed among broker 1 and broker 2, rather than being completely placed on broker 1. + +#### over placement problem +Over placement problem is that the bundle is placed on high load brokers and make them overloaded. + +In practice, it will be found that it is difficult to determine a suitable value for `loadBalancerAverageResourceUsageDifferenceThresholdPercentage`, +which often triggers a fallback global random selection logic. For example, if there are 6 brokers in the current +cluster, with scores of 40, 40, 40, 40, 69, and 70 respectively, the average score is 49.83. +Using the default configuration, there are no candidate brokers because 40+10>49.83. +Triggering a bottom-up global random selection logic and the bundle may be offloaded from the overloaded broker5 +to the overloaded broker6, or vice versa, **causing the over placement problem.** + +Attempting to reduce the configuration value to expand the random pool, such as setting it to 0, may also include some +overloaded brokers in the candidate broker list. For example, if there are 5 brokers in the current cluster with scores +of 10, 60, 70, 80, and 80 respectively, the average score is 60. As the configuration value is 0, then broker 1 and +broker 2 are both candidate brokers. If broker 2 shares half of the offloaded traffic, **it is highly likely to overload.** + +Therefore, it is difficult to configure the `LeastResourceUsageWithWeight` algorithm well to avoid incorrect load balancing. +Of course, if you want to use the `ThresholdShedder` algorithm, the combination of `ThresholdShedder+LeastResourceUsageWithWeight` +will still be superior to the combination of `ThresholdShedder+LeastLongTermMessageRate`, because at least the scoring algorithm +of `LeastResourceUsageWithWeight` is consistent with that of `ThresholdShedder`. + +#### why doesn't LeastLongTermMessage Rate have over placement problem? +The root of over placement problem is that the frequency of updating the load data is limited due to the performance +of zookeeper. If we assign a bundle to a broker, the broker's load will increase after a while, and it's load data +also need some time to be updated to leader broker. If there are many bundles unloaded in a shedding, +how can we assign these bundles to brokers? + +The most simple way is to assign them to the broker with the lowest load, but it may cause the over placement problem +as it is most likely that there is only one single broker with the lowest load. With all bundles assigned to this broker, +it will be overloaded. This is the reason why `LeastResourceUsageWithWeight` try to determine a candidate broker list +to avoid the over placement problem. But we also find that candidate broker list can be empty or include some overloaded +brokers, which will also cause the over placement problem. + +So why doesn't `LeastLongTermMessageRate` have over placement problem? The reason is that each time a bundle is assigned, +the bundle will be added into `PreallocatedBundleData`. When scoring a broker, not only will the long-term message rate +aggregated by the broker itself be used, but also the message rate of bundles in `PreallocatedBundleData` that have been +assigned to the broker but have not yet been reflected in the broker's load data will be calculated. + +For example, if there are two bundles with 20KB/s message rate to be assigned, and broker1 and broker2 at 100KB/s +and 110KB/s respectively. The first bundle is assigned to broker1, However, broker1's load data will not be updated +in the short term. Before the load data is updated, `LeastLongTermMessageRate` try to assign the second bundle. +At this time, the score of broker1 is 100+20=120KB/s, where 20KB/s is the message rate of the first bundle +from `PreallocatedBundleData`. As broker1's score is greater than broker2, the second bundle will be assigned to broker2. + +**`LeastLongTermMessageRate` predict the load of the broker after the bundle is assigned to avoid the over placement problem.** + +**Why doesn't `LeastResourceUsageWithWeight` have this feature? Because it is not possible to predict how much resource +utilization a broker will increase when loading a bundle. All algorithms scoring brokers based on resource utilization +can't fix the over placement problem with this feature.** +So `LeastResourceUsageWithWeight` try to determine a candidate broker list to avoid the over placement problem, which is +proved to be not a good solution. + + +#### over unloading problem +Over unloading problem is that the load offloaded from high load brokers is too much and make them underloaded. + +Finally, let's talk about the issue of historical weighted scoring algorithms. The historical weighted scoring algorithm +is used by the `ThresholdShedder` and `LeastResourceUsageWithWeight` algorithms, as follows: +``` +HistoryUsage=historyUsage=null? ResourceUsage: historyUsage * historyPercentage+(1- historyPercentage) * resourceUsage; +``` +The default value of historyPercentage is 0.9, indicating that the score calculated last time has a significant impact on the current score. +The current maximum resource utilization only accounts for 10%, which is to solves the problem of load jitter. +However, introducing this algorithm has its side effects, such as over unloading problem. + +For example, there is currently one broker1 in the cluster with a load of 90%, and broker2 is added with a current load of 10%. +- At the first execution of shedding: broker1 scores 90, broker2 scores 10. For simplicity, assuming that the algorithm will +move some bundles to make their load the same, thus the true load of broker 1 and broker 2 become 50 after load shedding is completed. +- At the second execution of shedding: broker1 scores 90*0.9+50*0.1=86, broker2 scores 10*0.9+50*0.1=14. +**Note that the actual load of broker1 here is 50, but it is overestimated as 86!** +**The true load of broker2 is also 50, but it is underestimated at 14!** +Due to the significant difference in ratings between the two, although their actual loads are already the same, +broker1 will continue to unload traffic corresponding to 36 points from broker1 to broker2, +resulting in broker1's actual load score becoming 14, broker2's actual load score becoming 86. + +- At the third execution of shedding: broker1 scored 86*0.9+14*0.1=78.8, broker2 scored 14*0.9+86*0.1=21.2. +It is ridiculous that broker1 is still considered overloaded, and broker2 is still considered underloaded. +All loads in broker1 are moved to broker2, which is the over unloading problem. + +Although this example is an idealized theoretical analysis, we can still see that using historical scoring algorithms +can seriously overestimate or underestimate the true load of the broker. Although it can avoid the problem of load jitter, +it will introduce a more serious and broader problem: **overestimating or underestimating the true load of the broker, +leading to incorrect load balancing execution**. + + +## Summary +Based on the previous analysis, although we have three shedding strategies and two placement strategies +that can generate 6 combinations of 3 * 2, we actually only have two recommended options: +- ThresholdShedder + LeastResourceUsageWithWeight +- UniformLoadShedder + LeastLongTermMessageRate + +These two options each have their own advantages and disadvantages, and users can choose one according to +their requirements. The following table summarizes the advantages and disadvantages of the two options: + +| Combination | heterogeneous environment | load jitter | over placement problem | over unloading problem | slow load balancing | +|---------------------------------------------|---------------------------|------------|-----------------------|-----------------------|---------------------| +| ThresholdShedder + LeastResourceUsageWithWeight | normal(1) | good | bad | bad | normal(1) | +| UniformLoadShedder + LeastLongTermMessageRate | bad(2) | bad | good | good | normal(1) | + +1. In terms of adapting to heterogeneous environments, `ThresholdShedder+LeastResourceUsageWithWeight` can +only be rated as `normal`. This is because `ThresholdShedder` is not fully adaptable to heterogeneous environments. +Although it does not misjudge overloaded brokers as underloaded, heterogeneous environments can still have a +significant impact on the load balancing effect of `ThresholdShedder`. +For example, there are three brokers in the current cluster with resource utilization rates of 10, 50, and 70, respectively. +Broker1 and Broker2 are isomorphic. Though Broker3 don't bear any load, its resource utilization rate has +reached to 70 due to the deployment of other processes at the same machine. +At this point, we would like broker 1 to share some of the pressure from broker2, but since the average load is +43.33, 43.33+10>50, broker2 will not be judged as overloaded, and overloaded broker 3 also has no traffic to +unload, causing the load balancing algorithm to be in an inoperable state. + +2. In the same scenario, if `UniformLoadShedder+LeastLongTermMessageRate` is used, the problem will be more +severe, as some of the load will be offloaded from broker2 to broker3. As a result, the performance of those +topics in broker3 services will experience significant performance degradation. +Therefore, it is not recommended to run Pulsar in heterogeneous environments as current load balancing algorithms +cannot adapt too well. If it is unavoidable, it is recommended to choose `ThresholdShedder+LeastResourceUsageWithWeight`. + +3. In terms of load balancing speed, although `ThresholdShedder+LeastResourceUsageWithWeight` can unload the load +of all overloaded brokers at once, historical scoring algorithms can seriously affect the accuracy of load +balancing decisions. Therefore, in reality, it also requires multiple load balancing executions to finally +stabilize. This is why the load balancing speed of `ThresholdShedder+LeastResourceUsageWithWeight` is rated as `normal`. + +4. In terms of load balancing speed, `UniformLoadShedder+LeastLongTermMessageRate` can only unload the load of one +overloaded broker at a time, so it takes a long time to complete load balancing when there are many brokers, +so it is also rated as `normal`. + + +# Motivation + +The current load balance algorithm has some serious problems, such as load jitter, heterogeneous environment, slow load balancing, etc. +This PIP aims to introduce a new load balance algorithm `AvgShedder` to solve these problems. + +# Goals + +Introduce a new load balance algorithm `AvgShedder` that can solve the problems of load jitter, heterogeneous environment, slow load balancing, etc. + + +# High Level Design + +## scoring criterion +First of all, to determine high load brokers, it is necessary to rate and sort them. +Currently, there are two scoring criteria: +- Resource utilization rate of broker +- The message rate and throughput of the broker +Based on the previous analysis, it can be seen that scoring based on message rate and throughput will face +the same problem as `UniformLoadShedder` in heterogeneous environments, while scoring based on resource utilization +rate will face the over placement problem like `LeastResourceUsageWithWeight`. + +**To solve the problem of heterogeneous environments, we use the resource utilization rate of the broker as the scoring criterion.** + + +## binding shedding and placement strategies +So how can we avoid the over placement problem? **The key is to bind the shedding and placement strategies together.** +If every bundle unloaded from the high load broker is assigned to the right low load broker in shedding strategy, +the over placement problem will be solved. + +For example, if the broker rating of the current cluster is 20,30,52,80,80, and the shedding and placement strategies are decoupled, +the bundles will be unloaded from the two brokers with score of 80, and then all these bundles will be placed on the broker with a +score of 20, causing the over placement problem. + +If the shedding and placement strategies are coupled, one broker with 80 score can unload some bundles to a broker with 20 score, +and another broker with 80 score can unload the bundle to the broker with 30 score. In this way, we can avoid the over placement problem. + + +## evenly distributed traffic between the highest and lowest loaded brokers +We will first pick out the highest and lowest loaded brokers, and then evenly distribute the traffic between them. + +For example, if the broker rating of the current cluster is 20,30,52,70,80, and the message rate of the highest loaded broker is 1000, +the message rate of the lowest loaded broker is 500. We introduce a threshold to whether trigger the bundle unload, for example, +the threshold is 40. As the difference between the score of the highest and lowest loaded brokers is 100-50=50>40, +the shedding strategy will be triggered. + +To achieve the goal of evenly distributing the traffic between the highest and lowest loaded brokers, the shedding strategy will +try to make the message rate of two brokers the same, which is (1000+500)/2=750. The shedding strategy will unload 250 message rate from the +highest loaded broker to the lowest loaded broker. After the shedding strategy is completed, the message rate of two brokers will be +same, which is 750. + + +## improve the load balancing speed +As we mentioned earlier in `UniformLoadShedder`, if strategy only handles one high load broker at a time, it will take a long time to +complete all load balancing tasks. Therefore, we further optimize it by matching multiple pairs of high and low load brokers in +a single shedding. After sorting the broker scores, the first and last place are paired, the second and and the second to last are paired, +and so on. When the score difference between the two paired brokers is greater than the threshold, the load will be evenly distributed +between the two, which can solve the problem of slow speed. + +For example, if the broker rating of the current cluster is 20,30,52,70,80, we will pair 20 and 80, 30 and 70. As the difference between +the two paired brokers is 80-20=60, 70-30=40, which are both greater than the threshold 40, the shedding strategy will be triggered. + + +## handle load jitter with multiple hits threshold +What about the historical weighting algorithm used in `ThresholdShedder`? It is used to solve the problem of load jitter, but previous +analysis and experiments have shown that it can bring serious negative effects, so we can no longer use this method to solve the +problem of load jitter. + +We mimic the way alarms are triggered: the threshold is triggered multiple times before the bundle unload is finally triggered. +For example, when the difference between a pair of brokers exceeds the threshold three times, load balancing is triggered. + +## high and low threshold +In situations of cluster rolling restart or expansion, there is often a significant load difference between +different brokers, and we hope to complete load balancing more quickly. + +Therefore, we introduce two thresholds: +- loadBalancerAvgShedderLowThreshold, default value is 15 +- loadBalancerAvgShedderHighThreshold, default value is 40 + +Two thresholds correspond to two continuous hit count requirements: +- loadBalancerAvgShedderHitCountLowThreshold, default value is 8 +- loadBalancerAvgShedderHitCountHighThreshold, default value of 2 + +When the difference in scores between two paired brokers exceeds the `loadBalancerAvgShedderLowThreshold` by Review Comment: When the threshold difference is 30, the hit count requirement should be `loadBalancerAvgShedderHitCountLowThreshold`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org