Hi,

This is an interesting proposal and sure it can solve quite some load balance 
case.
My concern on this is that, relative speaking, the QPS  and flow data is 
temporary, 
it may changes a lot, so it's easy to create some bad cases, especially on 
startup. 
Any idea on this matter?

Thanks,
Haiting

On 2022/07/25 14:42:01 lordcheng10 wrote:
> Hi Pulsar Community,
> 
> 
> This is a PIP discussion on how to support splitting bundles according to 
> flow or qps.
> 
> 
> The issue can be found: https://github.com/apache/pulsar/issues/16782
> 
> 
> I copy the content here for convenience, any suggestions are welcome and 
> appreciated.
> 
> 
> 
> 
> ## Motivation
> As we all know, Bundle split has 3 algorithms:
> - range_equally_divide
> - topic_count_equally_divide
> - specified_positions_divide
> 
> 
> However, none of these algorithms can divide bundles according to flow or 
> qps, which may cause bundles to be split multiple times.
> 
> 
> ## Goal
> Our goal is to split bundles according to flow or QPS, so we propose a PIP to 
> introduce a split algorithm based on flow or QPS.
> The main idea is that we can get the flow or qps information of a topic 
> contained in a bundle,
> and then split from the position where the flow or qps are evenly divided.
> 
> 
> For example, there is bundle with boundaries 0x00000000 to 0x00000200, and 
> six topics : t1 , t2 , t3 , t4, t5, t6.
> 
> 
> **Step 1: Get their hash position and corresponding flow or QPS:**
> 
> 
> > t1 with hashcode 10 msgRate 100/s throughput 1M/s
> > 
> > t2 with hashcode 20 msgRate 200/s throughput 2M/s
> > 
> > t3 with hashcode 80 msgRate 300/s throughput 3M/s
> > 
> > t4 with hashcode 90 msgRate 400/s throughput 4M/s
> > 
> > t5 with hashcode 100 msgRate 500/s throughput 5M/s
> > 
> > t6 with hashcode 110 msgRate 2000/s throughput 190M/s
> 
> 
> 
> 
> **Step 2: Calculate the total flow and qps of the bundle:**
> 
> 
> > bundleMsgRate=3500
> > bundleThroughput=205MB
> 
> 
> **Step 3: Calculate the flow and qps to split:**
> 
> 
> > splitBundleMsgRate=1750
> > splitBundleThroughput=102.5MB
> 
> 
> **Step 4: Calculate the position to split and split:**
> 
> 
> > splitStartPosition=100
> > splitEndPosition=110
> > splitPosition=(100+110)/2=105
> 
> 
> ## API Changes
> 
> 
> 1. Add a split algorithm class based on flow or qps:
> 
> 
> `public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements 
> NamespaceBundleSplitAlgorithm `
> 
> 
> 2. update the default configuration:
> ```
> private List<String&gt; supportedNamespaceBundleSplitAlgorithms = 
> Lists.newArrayList("range_equally_divide", "topic_count_equally_divide",
> &nbsp; &nbsp;"specified_positions_divide", "flow_count_equally_divide");
> ```
> 
> 
> ## Implementation
> The execution steps of the 
> FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as 
> follows:
> 1. Get the hash position of each topic and the corresponding msgRate and 
> msgThroughput, and sort them according to the position size:
> 
> 
> ```
> List<Long&gt; topicNameHashList = new ArrayList<&gt;(topics.size());
> Map<Long, Double&gt; hashAndMsgMap = new HashMap<&gt;();
> Map<Long, Double&gt; hashAndThroughput = new HashMap<&gt;();
> ```
> 
> 
> 2. Traverse the topic position from small to large to find the position that 
> can roughly evenly divide the bundle's flow or qps:
> 
> 
> ```
> double bundleMsgRateTmp = 0;
> double bundleThroughputTmp = 0;
> for (int i = 0; i < topicNameHashList.size(); i++) {
> &nbsp; &nbsp; long topicHashCode = topicNameHashList.get(i);
> &nbsp; &nbsp; bundleThroughputTmp += hashAndThroughput.get(topicHashCode);
> &nbsp; &nbsp; bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);
> 
> 
> &nbsp; &nbsp; if (bundleMsgRateTmp &gt; bundleMsgRate / 2 || 
> bundleThroughputTmp &gt; bundleThroughput / 2) {
> &nbsp; &nbsp; &nbsp; &nbsp; long splitStart = i &gt; 0 ? 
> topicNameHashList.get(i - 1) : 0;
> &nbsp; &nbsp; &nbsp; &nbsp; long splitEnd = topicHashCode;
> &nbsp; &nbsp; &nbsp; &nbsp; long splitMiddle = splitStart + (splitEnd - 
> splitStart) / 2;
> &nbsp; &nbsp; &nbsp; &nbsp; splitResults.add(splitMiddle);
> &nbsp; &nbsp; &nbsp; &nbsp; break;
> &nbsp; &nbsp; }
> }
> ```

Reply via email to