Author: supun
Date: Fri May 14 10:56:06 2010
New Revision: 944203

URL: http://svn.apache.org/viewvc?rev=944203&view=rev
Log:
adding weighted load balance algorithm

Added:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java?rev=944203&r1=944202&r2=944203&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
 Fri May 14 10:56:06 2010
@@ -94,6 +94,7 @@ public final class LoadbalanceEndpointFa
                 algorithm =
                         LoadbalanceAlgorithmFactory.
                                 createLoadbalanceAlgorithm(loadbalanceElement, 
endpoints);
+                algorithm.setLoadBalanceEndpoint(loadbalanceEndpoint);
             } else if (loadbalanceElement.getFirstChildWithName(MEMBER) != 
null) {
                 if(loadbalanceElement.
                         
getChildrenWithName((XMLConfigConstants.ENDPOINT_ELT)).hasNext()){

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java?rev=944203&r1=944202&r2=944203&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
 Fri May 14 10:56:06 2010
@@ -22,7 +22,6 @@ package org.apache.synapse.endpoints;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.clustering.Member;
-import org.apache.http.protocol.HTTP;
 import org.apache.synapse.*;
 import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
 import org.apache.synapse.core.axis2.Axis2MessageContext;

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java?rev=944203&r1=944202&r2=944203&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
 Fri May 14 10:56:06 2010
@@ -46,6 +46,13 @@ public interface LoadbalanceAlgorithm {
     void setEndpoints(List<Endpoint> endpoints);
 
     /**
+     * Set the loadbalance endpoint
+     * 
+     * @param endpoint the endpoint which uses this algorithm
+     */
+    void setLoadBalanceEndpoint(Endpoint endpoint);
+
+    /**
      * This method returns the next node according to the algorithm 
implementation.
      *
      * @param synapseMessageContext SynapseMessageContext of the current 
message

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java?rev=944203&r1=944202&r2=944203&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
 Fri May 14 10:56:06 2010
@@ -58,6 +58,8 @@ public class RoundRobin implements Loadb
         this.endpoints = endpoints;
     }
 
+    public void setLoadBalanceEndpoint(Endpoint endpoint) {}
+
     /**
      * Choose an active endpoint using the round robin algorithm. If there are 
no active endpoints
      * available, returns null.

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java?rev=944203&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
 Fri May 14 10:56:06 2010
@@ -0,0 +1,293 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.endpoints.algorithms;
+
+import org.apache.axis2.clustering.Member;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.ManagedLifecycle;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.PropertyInclude;
+import org.apache.synapse.mediators.MediatorProperty;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+import java.util.Arrays;
+import java.util.Comparator;
+
+/**
+ * This algorithm sends messages based on the weights of the endpoints. For 
example we may
+ * have 3 endpoints with following weights.</p>
+ * <ul>
+ * <li>Epr 1: 5</li>
+ * <li>Epr 2: 3</li>
+ * <li>Epr 3: 2</li>
+ * </ul>
+ * <p> This algorithm will send the first 5 messages through Epr1, next 3 
messages through
+ * Epr2 and next 2 messages with Epr3. Then algorithm moves again to the first 
endpoint
+ * and cycle continues.</p>  
+ */
+public class WeightedRoundRobin implements LoadbalanceAlgorithm, 
ManagedLifecycle {
+    private static final Log log = LogFactory.getLog(WeightedRoundRobin.class);
+
+    /** We keep a sorted array of endpoint states, first state will point to 
the
+     * endpoint with the highest weight */
+    private EndpointState[] endpointStates = null;
+
+    /** Endpoint list */
+    private List<Endpoint> endpoints;
+
+    private Endpoint loadBalanceEndpoint;
+
+    /** Keep track of the current poistion we are operating on the 
endpointStates array */
+    private int endpointCursor = 0;
+
+    /** If a weight is not specified by the user, we use the default as 1 */
+    private static final int DEFAULT_WEIGHT = 1;
+
+    /** Configuration key used by the endpoints for indicating their weight */
+    private static final String LOADBALANCE_WEIGHT = "loadbalance.weight";
+
+    /** Configuration key used by the endpoints for indicating their weight */
+    private static final String LOADBALANCE_ThEADLOCAL = 
"loadbalance.threadLocal";
+
+    private boolean isThreadLocal = false;
+
+    private AlgorithmThreadLocal threadedAlgorithm = null;
+
+    /** we are not supporting members */
+    public void setApplicationMembers(List<Member> members) {
+        throw new UnsupportedOperationException("This algorithm doesn't 
operate on Members");
+    }
+
+    public void setEndpoints(List<Endpoint> endpoints) {
+        this.endpoints = endpoints;
+    }
+
+    public void setLoadBalanceEndpoint(Endpoint endpoint) {
+        this.loadBalanceEndpoint = endpoint;   
+    }
+
+    public Endpoint getNextEndpoint(MessageContext synapseMessageContext,
+                                    AlgorithmContext algorithmContext) {
+        if (!isThreadLocal) {
+            synchronized (this) {
+                EndpointState state = endpointStates[endpointCursor];
+                if (state.getCurrentWeight() == 0) {
+                    // reset the current state
+                    state.reset();
+
+                    // go to the next enpoint
+                    if (endpointCursor == endpointStates.length - 1) {
+                        endpointCursor = 0;
+                    } else {
+                        ++endpointCursor;
+                    }
+
+                    state = endpointStates[endpointCursor];
+                }
+
+                // we are about to use this endpoint, so decrement its current 
count
+                state.decrementCurrentWeight();
+
+                // return the endpoint corresponfing to the current poistion
+                return endpoints.get(state.getEndpointPosition());
+            }
+        } else {
+            if (threadedAlgorithm != null) {
+                Algorithm algo = threadedAlgorithm.get();
+
+                int position = algo.getNextEndpoint();
+
+                return endpoints.get(position);
+            } else {
+                String msg = "Algorithm: WeightedRoundRobin algorithm not 
initialized properly";
+                log.error(msg);
+                throw new SynapseException(msg);
+            }
+        }
+    }        
+
+    public Member getNextApplicationMember(AlgorithmContext algorithmContext) {
+        throw new UnsupportedOperationException("This algorithm doesn't 
operate on Members");
+    }
+
+    public void reset(AlgorithmContext algorithmContext) {
+        for (EndpointState state : endpointStates) {
+            state.reset();
+        }
+
+        endpointCursor = 0;
+    }
+
+    public String getName() {
+        return WeightedRoundRobin.class.getName();
+    }
+
+    public void init(SynapseEnvironment se) {
+        if (endpoints == null) {
+            String msg = "Endpoints are not set, cannot initialize the 
algorithm";
+            log.error(msg);
+            throw new SynapseException(msg);
+        }
+
+        endpointStates = new EndpointState[endpoints.size()];
+
+        for (int i = 0; i < endpoints.size(); i++) {
+            Endpoint endpoint = endpoints.get(i);
+            if (!(endpoint instanceof PropertyInclude)) {
+                EndpointState state = new EndpointState(i, DEFAULT_WEIGHT);
+                endpointStates[i] = state;
+            } else {
+                MediatorProperty property =
+                        
((PropertyInclude)endpoint).getProperty(LOADBALANCE_WEIGHT);
+                EndpointState state;
+                if (property != null) {
+                    int weight = Integer.parseInt(property.getValue());
+
+                    if (weight <= 0) {
+                        String msg = "Weight must be greater than zero";
+                        log.error(msg);
+                        throw new SynapseException(msg);
+                    }
+
+                    state = new EndpointState(i, weight);
+                } else {
+                    state = new EndpointState(i, DEFAULT_WEIGHT);
+                }
+
+                endpointStates[i] = state;
+            }
+        }
+
+        // now we are going to sort
+        Arrays.sort(endpointStates, new Comparator<EndpointState>() {
+            public int compare(EndpointState o1, EndpointState o2) {
+                return o2.getWeight() - o1.getWeight();
+            }
+        });
+
+        if (loadBalanceEndpoint instanceof PropertyInclude) {
+            MediatorProperty threadLocalProperty = ((PropertyInclude) 
loadBalanceEndpoint).
+                    getProperty(LOADBALANCE_ThEADLOCAL);
+
+            if (threadLocalProperty != null && 
threadLocalProperty.getValue().equals("true")) {
+                isThreadLocal = true;
+            }
+        }
+    }
+
+    public void destroy() {}
+
+    private class AlgorithmThreadLocal extends ThreadLocal<Algorithm> {
+        @Override
+        protected Algorithm initialValue() {
+            return new Algorithm(endpointStates);
+        }
+    }
+
+    private class Algorithm {
+        /**
+         * We keep a sorted array of endpoint states, first state will point 
to the
+         * endpoint with the highest weight
+         */
+        private EndpointState[] threadLocalEndpointStates = null;
+
+        /**
+         * Keep track of the current poistion we are operating on the 
endpointStates array
+         */
+        private int threadLocalEndpointCursor = 0;
+
+        public Algorithm(EndpointState[] states) {
+            threadLocalEndpointStates = new EndpointState[states.length];
+            for (int i = 0; i < states.length; i++) {
+                threadLocalEndpointStates[i] = new 
EndpointState(states[i].getEndpointPosition(),
+                        states[i].getWeight());
+            }
+        }
+
+        public int getNextEndpoint() {
+            EndpointState state = 
threadLocalEndpointStates[threadLocalEndpointCursor];
+            if (state.getCurrentWeight() == 0) {
+                // reset the current state
+                state.reset();
+
+                // go to the next enpoint
+                if (threadLocalEndpointCursor == 
threadLocalEndpointStates.length - 1) {
+                    threadLocalEndpointCursor = 0;
+                } else {
+                    ++threadLocalEndpointCursor;
+                }
+
+                state = threadLocalEndpointStates[threadLocalEndpointCursor];
+            }
+
+            // we are about to use this endpoint, so decrement its current 
count
+            state.decrementCurrentWeight();
+
+            // return the endpoint corresponfing to the current poistion
+            return state.getEndpointPosition();
+        }
+    }
+
+
+    /**
+     * Simple class for holding the states about the endpoints. 
+     */
+    private class EndpointState {
+        private int endpointPosition = 0;
+
+        private int weight = 0;
+
+        private int currentWeight = 0;
+
+        public EndpointState(int endpointPosition, int weight) {
+            this.endpointPosition = endpointPosition;
+            this.weight = weight;
+            this.currentWeight = weight;
+        }
+
+        public int getEndpointPosition() {
+            return endpointPosition;
+        }
+
+        public int getWeight() {
+            return weight;
+        }
+
+        public int getCurrentWeight() {
+            return currentWeight;
+        }
+
+        public void setCurrentWeight(int currentWeight) {
+            this.currentWeight = currentWeight;
+        }
+
+        public void decrementCurrentWeight() {
+            --currentWeight;
+        }
+
+        public void reset() {
+            currentWeight = weight;
+        }
+    }
+}


Reply via email to