Author: akarpe
Date: Tue Feb 28 15:33:10 2012
New Revision: 1294693

URL: http://svn.apache.org/viewvc?rev=1294693&view=rev
Log:
Fixed CAMEL-5039 Make WeightedRandomLoadBalancer really random. Many Thanks to 
Xavier Fournet  for the bug identification, submission of this patch and well 
documented test cases.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java?rev=1294693&r1=1294692&r2=1294693&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
 Tue Feb 28 15:33:10 2012
@@ -23,34 +23,49 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
 public class WeightedRandomLoadBalancer extends WeightedLoadBalancer {
-    private int randomCounter;
+    private final Random rnd = new Random();
+    private final int distributionRatioSum;
+    private int runtimeRatioSum;
     
     public WeightedRandomLoadBalancer(List<Integer> distributionRatioList) {
         super(distributionRatioList);
+        int sum = 0;
+        for (Integer distributionRatio : distributionRatioList) {
+            sum += distributionRatio;
+        }
+        distributionRatioSum = sum;
+        runtimeRatioSum = distributionRatioSum;
     }
     
     @Override
-    protected Processor chooseProcessor(List<Processor> processors, Exchange 
exchange) {
-        boolean found = false;
-        while (!found) {
-            if (getRuntimeRatios().isEmpty())  {
-                loadRuntimeRatios(getDistributionRatioList());
+    protected Processor chooseProcessor(List<Processor> processors, Exchange 
exchange) {        
+        int selectedProcessorIndex = selectProcessIndex();
+        return processors.get(selectedProcessorIndex);
+    }
+    
+    public int selectProcessIndex() {
+        if (runtimeRatioSum == 0) { // every processor is exhausted, reload 
for a new distribution round
+            for (DistributionRatio distributionRatio : getRuntimeRatios()) {
+                int weight = distributionRatio.getDistributionWeight();
+                distributionRatio.setRuntimeWeight(weight);
             }
-            
-            randomCounter = 0;
-            if (getRuntimeRatios().size() > 0) {
-                randomCounter = new 
Random().nextInt(getRuntimeRatios().size());
-            } 
-                
-            if (getRuntimeRatios().get(randomCounter).getRuntimeWeight() > 0) {
-                
getRuntimeRatios().get(randomCounter).setRuntimeWeight((getRuntimeRatios().get(randomCounter).getRuntimeWeight())
 - 1);
-                found = true;
-            } else {
-                getRuntimeRatios().remove(randomCounter);
+            runtimeRatioSum = distributionRatioSum;
+        }
+
+        DistributionRatio selected = null;
+        int randomWeight = rnd.nextInt(runtimeRatioSum);
+        int choiceWeight = 0;
+        for (DistributionRatio distributionRatio : getRuntimeRatios()) {
+            choiceWeight += distributionRatio.getRuntimeWeight();
+            if (randomWeight < choiceWeight) {
+                selected = distributionRatio;
+                break;
             }
         }
+        
+        selected.setRuntimeWeight(selected.getRuntimeWeight() - 1);
+        runtimeRatioSum--;
 
-        return 
processors.get(getRuntimeRatios().get(randomCounter).getProcessorPosition());
+        return selected.getProcessorPosition();
     }
-    
 }


Reply via email to