The implementation logic has been modified, and the corresponding example has
also been modified as follows:
## Motivation
As we all know, Bundle split has 3 algorithms:
- range_equally_divide
- topic_count_equally_divide
- specified_positions_divide
However, none of these algorithms can divide bundles according to flow or qps,
which may cause bundles to be split multiple times.
## Goal
Our goal is to split bundles according to flow or QPS, so we propose a PIP to
introduce a split algorithm based on flow or QPS.
The main idea is that we can get the flow or qps information of a topic
contained in a bundle,
then split according to loadBalancerNamespaceBundleMaxMsgRate or
loadBalancerNamespaceBundleMaxBandwidthMbytes configuration
For example, there is bundle with boundaries 0x00000000 to 0x00000200, and six
topics : t1 , t2 , t3 , t4, t5, t6.
loadBalancerNamespaceBundleMaxMsgRate=1100
loadBalancerNamespaceBundleMaxBandwidthMbytes=110
**Step 1: Get their hash position and corresponding flow and QPS:**
> t1 with hashcode 10 msgRate 100/s throughput 10M/s
>
> t2 with hashcode 20 msgRate 200/s throughput 20M/s
>
> t3 with hashcode 80 msgRate 300/s throughput 30M/s
>
> t4 with hashcode 90 msgRate 400/s throughput 40M/s
>
> t5 with hashcode 100 msgRate 500/s throughput 50M/s
>
> t6 with hashcode 110 msgRate 600/s throughput 60M/s
**Step 2: Calculate the total flow and qps of the bundle:**
> bundleMsgRate = 100 + 200 + 300 + 400 + 500 + 600 = 2100
> bundleThroughput = 10 + 20 + 30 + 40 + 50 + 60 = 210MB
**Step 3: Calculate the position to split and split:**
> QPS: (100 + 200 + 300 + 400 ) < loadBalancerNamespaceBundleMaxMsgRate=1100
& (100+200+300+400+500) > loadBalancerNamespaceBundleMaxMsgRate=1100
> flow: (10 + 20 + 30 + 40 ) <
loadBalancerNamespaceBundleMaxBandwidthMbytes=110 & (10 + 20 + 30 + 40 + 50
) > loadBalancerNamespaceBundleMaxBandwidthMbytes=110
>
> Split between t4 and t5:
> splitStartPosition = 90
> splitEndPosition = 100
> splitPosition = (90 + 100) / 2 = 95
## API Changes
1. Add a split algorithm class based on flow or qps:
`public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements
NamespaceBundleSplitAlgorithm `
2. update the default configuration:
```
private List<String> supportedNamespaceBundleSplitAlgorithms =
Lists.newArrayList("range_equally_divide", "topic_count_equally_divide",
"specified_positions_divide", "flow_count_equally_divide");
```
3. added configuration
```
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Acceptable difference between qps and
loadBalancerNamespaceBundleMaxMsgRate "
+ " or flow and
loadBalancerNamespaceBundleMaxBandwidthMbytes "
)
private int flowOrQpsDifferenceThresholdPercentage = 10;
```
## Implementation
The execution steps of the
FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as
follows:
1. Get the hash position of each topic and the corresponding msgRate and
msgThroughput, and sort them according to the position size??
```
List<Long> topicNameHashList = new ArrayList<>(topics.size());
Map<Long, Double> hashAndMsgMap = new HashMap<>();
Map<Long, Double> hashAndThroughput = new HashMap<>();
```
2. According to the topic hash position, traverse all topics from small to
large,
and split the bundle according to the configured
loadBalancerNamespaceBundleMaxMsgRate or
loadBalancerNamespaceBundleMaxBandwidthMbytes:
```
double bundleMsgRateTmp = 0;
double bundleThroughputTmp = 0;
for (int i = 0; i < topicNameHashList.size(); i++) {
long topicHashCode = topicNameHashList.get(i);
bundleThroughputTmp +=
hashAndThroughput.get(topicHashCode);
bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);
if (bundleMsgRateTmp >
loadBalancerNamespaceBundleMaxMsgRate
|| bundleThroughputTmp >
loadBalancerNamespaceBundleMaxBandwidthBytes) {
long splitStart = i > 0 ?
topicNameHashList.get(i - 1) : topicHashCode;
long splitEnd = i > 0 ?
topicHashCode : topicNameHashList.get(i + 1);
long splitMiddle = splitStart +
(splitEnd - splitStart) / 2;
splitResults.add(splitMiddle);
bundleMsgRateTmp =
hashAndMsgMap.get(topicHashCode);
bundleThroughputTmp =
hashAndThroughput.get(topicHashCode);
}
}
```