heesung-sn opened a new issue, #18215:
URL: https://github.com/apache/pulsar/issues/18215

   ### Motivation
   
   
   1. As PIP-192 enables the `bundle transfer protocol`, we could implement a 
new shedding strategy to specify a new owner broker.
   2. Improve the edge cases in the current shedders:
   
   - More aggressive load balance strategy: 
https://docs.google.com/document/d/1nXaiSK39E10awqinnDUX5Sial79FSTczFCmMdc9q8o8/
 (reported)
   We need to revisit the current load balance strategy to balance the load 
more evenly and frequently. With the current static threshold approach, 
sometimes it appears that the load balance is not working when 1. the threshold 
is set too high, and overall usage is relatively low, and 2 when new brokers 
are added.
     - Case 1) broker usages [12%, 2%, 1%, 1%], 10% default threshold, 
avg_usage= 4%, all brokers’ usage is below avg_usage+threshold(14%), no 
shedding will happen.
     - Case 2) broker usages [50%, 50%, 50%, 50%, 50%, 0%(new broker)], 10% 
default threshold, avg_usage= 41.6%, all brokers’ usage is below 
avg_usage+threshold(51.6%), no shedding will happen.
   Even with semi-optimal initial broker assignments, this load balance logic 
should be aggressive enough to guarantee the “load balance” over time.
   
   - Repeated Shedding due to highly-weighted historical load: 
https://github.com/apache/pulsar/issues/18173 (reported)
   
   
   
   
   ### Goal
   
   1. Create a new shedding algo that transfers bundles to a specific broker by 
the `bundle transfer` protocol introduced in 
https://github.com/apache/pulsar/issues/16691
   2. Improve the shedding algo in the following areas
    - add bundle msg throughput signal when computing broker resource usage
    - more aggressive unloading to the new broker
    - minimize the required configs to tune.
    - improve the accuracy of broker load data normalization(clean lingering 
load data after transfers)
    - optimize the number of bundle unloading for balancing load in the cluster.
    - clarify the global load-balance optimization target(clarify the epoch 
error function)
   3. This algo will be only used in the new broker load balancer introduced in 
PIP-192
   
   ### API Changes
   
   No.
   
   ### Implementation
   
   ### Pseudo code
   The idea is straightforward. We want to keep unloading bundles from max 
loaded broker to min loaded broker until the standard deviation of the broker 
load distribution is below our target.
   
   The following is the Pseudo code.
   
   ```
   // compute load data for each broker
   for( broker_load_data in active_brokers) {
   
     // we don't want to use the outdated load data before the last transfer
     // , and we should give enough time for each broker to recompute its load 
after transfers
     if(broker_load_data.timestamp - last_transfer_timestamp < x secs){
       continue;
     }
   
      // max(cpu, memory, dic_memory, network_in, network_out, 
msg_throughput_in, msg_throughput_out)
     cur_load = compute_load(brokerLoadData)
     load = normalize(cur_load)
     load_map.put(broker, load)
     top_k_min_load_brokers.add(broker, load);
     top_k_max_load_brokers.add(broker, load);
   }
   
   // compute std
   std = standard_deviation(load_map, offload_map)
   
   // force-unload if min_broker is a new broker
   for(int i =0; i < max_transfer_cnt && (std > std_threshold || 
top_k_min_load_broker.peek().msg_throughput  == 0 ); i++){
       (dst_broker, dst_load) = top_k_min_load_brokers.pop()
       (src_broker, src_load) = top_k_max_load_brokers.pop()
       if(dst_broker== null|| src_broker==null || dst_broker  == src_broker ) 
return;
     
       // we could adjust this offload_percent by other threshold configs
       offload_percent = (src_load - dst_load) / 2 
       offload_throughput = offload_percent * src_broker.throughput
       
       // Transfer bundles, from highest loaded to lowest,  from src_broker to 
dst_broker til sum(bundle.throughput) < offload_throughput
         ... 
   
        // mark offload_throughput 
        offload_map.put(dst_broker, -offload_throughput)
        offload_map.set(src_broker, offload_throughput)
        transferred_brokers.add(dst_broker)
        transferred_brokers.add(src_broker)
   
        // recompute std by considering the offload_throughput 
        std = standard_deviation(load_map, offload_map)
   }
   // clean load caches. 
   // we need to track new load data to avoid repeated transfers.
   offload_map.clear()
   for (broker : transferred_brokers) {
         load_map.remove(broker)
   }
   // mark the timestamp at the end of the transfer.
   last_transfer_timestamp = now()
   
   normalize(cur_load){
   
     // this is an exponential moving window version
     // we could make this normalization configurable for other configurable 
methods
      return historical_load_weight * prev_load + ( 1 - historical_load_weight) 
* cur_load
   }
   
   standard_deviation(load_map, offload_map){
       // for each broker recompute load considering offload_map
       return std(load = load_map.get(broker) - offload_map.get(broker));
   }
   
   
   ```
   
   * Default Configurable Parameters
   max_transfer_cnt = 3 // max number of transfers per unload cycle( 1 min by 
default)
   std_threshold = 15 // load standard deviation threshold (target load 
distribution)
   
   ###
   
   
   ### Theoretical Load Balance Epochs(Transfer Counts) per Cluster Size for 
Target std=15
   * Load Balance Epochs means the required execution count of the transfer 
runs to reach the target global load distribution(load standard deviation 
threshold)
   
   This data shows how many `epochs` are required to tune the 
`max_transfer_cnt` config.
   
   In general, the number of required transfer cycles(minutes) for (target 
std=15) =  `epochs`  / `max_transfer_cnt`
   
   
   
   <meta charset="utf-8"><b style="font-weight:normal;" 
id="docs-internal-guid-bb8b8530-7fff-3444-45f1-607c450cb596"><div dir="ltr" 
style="margin-left:0pt;" align="left">
   <img width="727" alt="Screen Shot 2022-10-26 at 6 54 13 PM" 
src="https://user-images.githubusercontent.com/103456639/198172688-7ff93a67-741e-422f-a43c-2dead538b862.png";>
   
   
   Cluster Size(number of brokers) | Epochs
   -- | --
   10 | 3
   20 | 5
   40 | 8
   80 | 15
   100 | 19
   200 | 36
   300 | 54
   400 | 72
   500 | 89
   600 | 107
   700 | 125
   800 | 143
   900 | 160
   1000 | 178
   
   </div></b>
   
   
   </div></b>
   
   
   
   
   
   ### Alternatives
   
   N/A
   
   ### Anything else?
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to