Author: indika Date: Mon Jun 1 08:50:02 2009 New Revision: 37391 URL: http://wso2.org/svn/browse/wso2?view=rev&revision=37391
Log: throttle update Modified: branches/synapse/1.3-wso2v1/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java branches/synapse/1.3-wso2v1/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java branches/synapse/1.3-wso2v1/pom.xml Modified: branches/synapse/1.3-wso2v1/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java?rev=37391&r1=37390&r2=37391&view=diff ============================================================================== --- branches/synapse/1.3-wso2v1/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java (original) +++ branches/synapse/1.3-wso2v1/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java Mon Jun 1 08:50:02 2009 @@ -19,30 +19,29 @@ package org.apache.synapse.mediators.throttle; import org.apache.axiom.om.OMElement; +import org.apache.axis2.clustering.ClusteringAgent; +import org.apache.axis2.clustering.ClusteringFault; +import org.apache.axis2.clustering.state.Replicator; +import org.apache.axis2.context.ConfigurationContext; import org.apache.neethi.PolicyEngine; -import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.Mediator; import org.apache.synapse.MessageContext; +import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.SynapseLog; -import org.apache.synapse.transport.nhttp.NhttpConstants; import org.apache.synapse.config.Entry; -import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.mediators.AbstractMediator; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.clustering.state.Replicator; -import org.apache.axis2.clustering.ClusteringFault; -import org.apache.axis2.clustering.ClusteringAgent; +import org.apache.synapse.transport.nhttp.NhttpConstants; import org.wso2.throttle.*; - /** - * The Mediator for the throttling - Throtting will occur according to the ws-policy + * The Mediator for the throttling - Throttling will occur according to the ws-policy * which is specified as the key for lookup from the registry or the inline policy - * Only support IP based throttling- Throotling can manage per IP using the throttle policy + * Only support IP based throttling- Throttling can manage per IP using the throttle policy */ -public class ThrottleMediator extends AbstractMediator implements ManagedLifecycle { +public class ThrottleMediator extends AbstractMediator { /* The key for getting the throttling policy - key refers to a/an [registry] entry */ private String policyKey = null; @@ -78,19 +77,19 @@ public void init(SynapseEnvironment se) { if (onAcceptMediator instanceof ManagedLifecycle) { - ((ManagedLifecycle)onAcceptMediator).init(se); + ((ManagedLifecycle) onAcceptMediator).init(se); } if (onRejectMediator instanceof ManagedLifecycle) { - ((ManagedLifecycle)onRejectMediator).init(se); + ((ManagedLifecycle) onRejectMediator).init(se); } } public void destroy() { if (onAcceptMediator instanceof ManagedLifecycle) { - ((ManagedLifecycle)onAcceptMediator).destroy(); + ((ManagedLifecycle) onAcceptMediator).destroy(); } if (onRejectMediator instanceof ManagedLifecycle) { - ((ManagedLifecycle)onRejectMediator).destroy(); + ((ManagedLifecycle) onRejectMediator).destroy(); } } @@ -108,7 +107,7 @@ synLog.traceTrace("Message : " + synCtx.getEnvelope()); } } - // To ensure the creation of throttle is thread safe – It is possible create same throttle + // To ensure the creation of throttle is thread safe – It is possible create same throttle // object multiple times by multiple threads. synchronized (throttleLock) { @@ -119,10 +118,10 @@ //To ensure check for clustering environment only happens one time if ((throttle == null && !isResponse) || (isResponse - && concurrentAccessController == null)) { + && concurrentAccessController == null)) { ClusteringAgent clusteringAgent = cc.getAxisConfiguration().getClusteringAgent(); if (clusteringAgent != null && - clusteringAgent.getStateManager() != null) { + clusteringAgent.getStateManager() != null) { isClusteringEnable = true; } } @@ -133,7 +132,7 @@ //if this is a clustered environment if (isClusteringEnable) { concurrentAccessController = - (ConcurrentAccessController) cc.getProperty(key); + (ConcurrentAccessController) cc.getProperty(key); } // for request messages, read the policy for throttling and initialize if (inLinePolicy != null) { @@ -142,12 +141,12 @@ if (synLog.isTraceTraceEnabled()) { synLog.traceTrace("Initializing using static throttling policy : " - + inLinePolicy); + + inLinePolicy); } try { // process the policy - throttle = ThrottlePolicyProcessor.processPolicy( - PolicyEngine.getPolicy(inLinePolicy)); + throttle = ThrottleFactory.createMediatorThrottle( + PolicyEngine.getPolicy(inLinePolicy)); //At this point concurrent access controller definitely 'null' // f the clustering is disable. @@ -156,7 +155,7 @@ // that message mediation has occurred through this mediator. if (throttle != null && concurrentAccessController == null) { concurrentAccessController = - throttle.getConcurrentAccessController(); + throttle.getConcurrentAccessController(); if (concurrentAccessController != null) { cc.setProperty(key, concurrentAccessController); } @@ -174,7 +173,7 @@ Entry entry = synCtx.getConfiguration().getEntryDefinition(policyKey); if (entry == null) { handleException("Cannot find throttling policy using key : " - + policyKey, synCtx); + + policyKey, synCtx); } else { boolean reCreate = false; @@ -188,37 +187,37 @@ Object entryValue = synCtx.getEntry(policyKey); if (entryValue == null) { handleException( - "Null throttling policy returned by Entry : " - + policyKey, synCtx); + "Null throttling policy returned by Entry : " + + policyKey, synCtx); } else { if (!(entryValue instanceof OMElement)) { handleException("Policy returned from key : " + policyKey + - " is not an OMElement", synCtx); + " is not an OMElement", synCtx); } else { - //Check for reload in a cluster environment – + //Check for reload in a cluster environment – // For clustered environment ,if the concurrent access controller // is not null and throttle is not null , then must reload. if (isClusteringEnable && concurrentAccessController != null - && throttle != null) { + && throttle != null) { concurrentAccessController = null; // set null , // because need reload } try { // Creates the throttle from the policy - throttle = ThrottlePolicyProcessor.processPolicy( - PolicyEngine.getPolicy((OMElement) entryValue)); + throttle = ThrottleFactory.createMediatorThrottle( + PolicyEngine.getPolicy((OMElement) entryValue)); //For non-clustered environment , must re-initiates //For clustered environment, //concurrent access controller is null , //then must re-initiates if (throttle != null && (concurrentAccessController == null - || !isClusteringEnable)) { + || !isClusteringEnable)) { concurrentAccessController = - throttle.getConcurrentAccessController(); + throttle.getConcurrentAccessController(); if (concurrentAccessController != null) { cc.setProperty(key, concurrentAccessController); } else { @@ -227,7 +226,7 @@ } } catch (ThrottleException e) { handleException("Error processing the throttling policy", - e, synCtx); + e, synCtx); } } } @@ -238,7 +237,7 @@ // if the message flow path is OUT , then must lookp from ConfigurationContext - // never create ,just get the existing one concurrentAccessController = - (ConcurrentAccessController) cc.getProperty(key); + (ConcurrentAccessController) cc.getProperty(key); } } //perform concurrency throttling @@ -256,12 +255,12 @@ try { if (synLog.isTraceOrDebugEnabled()) { synLog.traceOrDebug("Going to replicates the " + - "states of the ConcurrentAccessController with key : " + key); + "states of the ConcurrentAccessController with key : " + key); } Replicator.replicate(cc); } catch (ClusteringFault clusteringFault) { handleException("Error during the replicating states ", - clusteringFault, synCtx); + clusteringFault, synCtx); } } } @@ -272,7 +271,7 @@ return mediator.mediate(synCtx); } else { handleException("Unable to find onAccept sequence with key : " - + onAcceptSeqKey, synCtx); + + onAcceptSeqKey, synCtx); } } else if (onAcceptMediator != null) { return onAcceptMediator.mediate(synCtx); @@ -287,7 +286,7 @@ return mediator.mediate(synCtx); } else { handleException("Unable to find onReject sequence with key : " - + onRejectSeqKey, synCtx); + + onRejectSeqKey, synCtx); } } else if (onRejectMediator != null) { return onRejectMediator.mediate(synCtx); @@ -303,8 +302,8 @@ /** * Helper method that handles the concurrent access through throttle * - * @param isResponse Current Message is response or not - * @param synLog the Synapse log to use + * @param isResponse Current Message is response or not + * @param synLog the Synapse log to use * @return true if the caller can access ,o.w. false */ private boolean doThrottleByConcurrency(boolean isResponse, SynapseLog synLog) { @@ -314,7 +313,7 @@ int concurrentLimit = concurrentAccessController.getLimit(); if (synLog.isTraceOrDebugEnabled()) { synLog.traceOrDebug("Concurrent access controller for ID : " + id + - " allows : " + concurrentLimit + " concurrent accesses"); + " allows : " + concurrentLimit + " concurrent accesses"); } int available; if (!isResponse) { @@ -322,14 +321,14 @@ canAcess = available > 0; if (synLog.isTraceOrDebugEnabled()) { synLog.traceOrDebug("Concurrency Throttle : Access " + - (canAcess ? "allowed" : "denied") + " :: " + available - + " of available of " + concurrentLimit + " connections"); + (canAcess ? "allowed" : "denied") + " :: " + available + + " of available of " + concurrentLimit + " connections"); } } else { available = concurrentAccessController.incrementAndGet(); if (synLog.isTraceOrDebugEnabled()) { synLog.traceOrDebug("Concurrency Throttle : Connection returned" + " :: " + - available + " of available of " + concurrentLimit + " connections"); + available + " of available of " + concurrentLimit + " connections"); } } } @@ -339,19 +338,22 @@ /** * Helper method that handles the access-rate based throttling * - * @param synCtx MessageContext(Synapse) - * @param axisMC MessageContext(Axis2) - * @param cc ConfigurationContext - * @param synLog the Synapse log to use + * @param synCtx MessageContext(Synapse) + * @param axisMC MessageContext(Axis2) + * @param cc ConfigurationContext + * @param synLog the Synapse log to use * @return ue if the caller can access ,o.w. false */ - private boolean throttleByAccessRate(MessageContext synCtx, org.apache.axis2.context.MessageContext axisMC, ConfigurationContext cc, SynapseLog synLog) { + private boolean throttleByAccessRate(MessageContext synCtx, + org.apache.axis2.context.MessageContext axisMC, + ConfigurationContext cc, + SynapseLog synLog) { String callerId = null; boolean canAccess = true; //remote ip of the caller String remoteIP = (String) axisMC.getPropertyNonReplicable( - org.apache.axis2.context.MessageContext.REMOTE_ADDR); + org.apache.axis2.context.MessageContext.REMOTE_ADDR); //domain name of the caller String domainName = (String) axisMC.getPropertyNonReplicable(NhttpConstants.REMOTE_HOST); @@ -364,7 +366,7 @@ } // loads the DomainBasedThrottleContext ThrottleContext context - = throttle.getThrottleContext(ThrottleConstants.DOMAIN_BASED_THROTTLE_KEY); + = throttle.getThrottleContext(ThrottleConstants.DOMAIN_BASED_THROTTLE_KEY); if (context != null) { //loads the DomainBasedThrottleConfiguration ThrottleConfiguration config = context.getThrottleConfiguration(); @@ -381,12 +383,13 @@ try { //Checks for access state - canAccess = accessControler.canAccess(context, - callerId, ThrottleConstants.DOMAIN_BASE); + AccessInformation accessInformation = accessControler.canAccess(context, + callerId, ThrottleConstants.DOMAIN_BASE); + canAccess = accessInformation.isAccessAllowed(); if (synLog.isTraceOrDebugEnabled()) { synLog.traceOrDebug("Access " + (canAccess ? "allowed" : "denied") - + " for Domain Name : " + domainName); + + " for Domain Name : " + domainName); } //In the case of both of concurrency throttling and @@ -415,7 +418,9 @@ if (callerId == null) { //do the IP-based throttling if (remoteIP == null) { - synLog.traceOrDebug("The IP address of the caller cannot be found"); + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("The IP address of the caller cannot be found"); + } canAccess = true; } else { @@ -425,7 +430,7 @@ try { // Loads the IPBasedThrottleContext ThrottleContext context = - throttle.getThrottleContext(ThrottleConstants.IP_BASED_THROTTLE_KEY); + throttle.getThrottleContext(ThrottleConstants.IP_BASED_THROTTLE_KEY); if (context != null) { //Loads the IPBasedThrottleConfiguration ThrottleConfiguration config = context.getThrottleConfiguration(); @@ -440,13 +445,16 @@ context.setThrottleId(id); } //Checks access state - canAccess = accessControler.canAccess(context, - callerId, ThrottleConstants.IP_BASE); + AccessInformation accessInformation = accessControler.canAccess( + context, + callerId, + ThrottleConstants.IP_BASE); + canAccess = accessInformation.isAccessAllowed(); if (synLog.isTraceOrDebugEnabled()) { synLog.traceOrDebug("Access " + - (canAccess ? "allowed" : "denied") - + " for IP : " + remoteIP); + (canAccess ? "allowed" : "denied") + + " for IP : " + remoteIP); } //In the case of both of concurrency throttling and //rate based throttling have enabled , Modified: branches/synapse/1.3-wso2v1/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java?rev=37391&r1=37390&r2=37391&view=diff ============================================================================== --- branches/synapse/1.3-wso2v1/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java (original) +++ branches/synapse/1.3-wso2v1/modules/extensions/src/test/java/org/apache/synapse/mediators/throttle/ThrottleMediatorTest.java Mon Jun 1 08:50:02 2009 @@ -27,21 +27,23 @@ import org.apache.neethi.PolicyEngine; import org.apache.synapse.MessageContext; import org.apache.synapse.SynapseException; -import org.apache.synapse.config.SynapseConfiguration; import org.apache.synapse.config.Entry; -import org.apache.synapse.config.SynapseConfigUtils; +import org.apache.synapse.config.SynapseConfiguration; import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.core.axis2.Axis2MessageContext; import org.apache.synapse.core.axis2.Axis2SynapseEnvironment; import org.apache.synapse.mediators.AbstractMediator; import org.wso2.throttle.*; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; import java.io.ByteArrayInputStream; +import java.io.StringReader; /** * Throttle Mediator Test - This class test throttling when policy has specified as both of * InLine and a registry key - * */ public class ThrottleMediatorTest extends TestCase { @@ -50,7 +52,7 @@ private static final String POLICY = " <wsp:Policy xmlns:wsp=\"http://schemas.xmlsoap.org/ws/2004/09/policy\"\n" + " xmlns:throttle=\"http://www.wso2.org/products/wso2commons/throttle\">\n" + " <throttle:ThrottleAssertion>\n" + - " <throttle:MaximumConcurrentAccess>10</throttle:MaximumConcurrentAccess>\n" + + " <throttle:MaximumConcurrentAccess>10</throttle:MaximumConcurrentAccess>\n" + " <wsp:All>\n" + " <throttle:ID throttle:type=\"IP\">other</throttle:ID>\n" + " <wsp:ExactlyOne>\n" + @@ -115,12 +117,24 @@ OMAbstractFactory.getSOAP11Factory().createOMDocument(); omDoc.addChild(envelope); - envelope.getBody().addChild(SynapseConfigUtils.stringToOM(payload)); + envelope.getBody().addChild(createOMElement(payload)); synMc.setEnvelope(envelope); return synMc; } + public static OMElement createOMElement(String xml) { + try { + XMLStreamReader reader = XMLInputFactory + .newInstance().createXMLStreamReader(new StringReader(xml)); + StAXOMBuilder builder = new StAXOMBuilder(reader); + return builder.getDocumentElement(); + } + catch (XMLStreamException e) { + throw new RuntimeException(e); + } + } + public void testMediate() throws Exception { ByteArrayInputStream in = new ByteArrayInputStream(POLICY.getBytes()); StAXOMBuilder builde = new StAXOMBuilder(in); @@ -155,35 +169,36 @@ } } + public void testMediateWithInLineXML() throws Exception { - ByteArrayInputStream in = new ByteArrayInputStream(POLICY.getBytes()); - StAXOMBuilder build = new StAXOMBuilder(in); - ThrottleTestMediator throttleMediator = new ThrottleTestMediator(); - throttleMediator.setInLinePolicy(build.getDocumentElement()); - MessageContext synCtx = createLightweightSynapseMessageContext("<empty/>"); - synCtx.setProperty(REMOTE_ADDR, "192.168.8.212"); - SynapseConfiguration synCfg = new SynapseConfiguration(); - synCtx.setConfiguration(synCfg); - for (int i = 0; i < 6; i++) { - try { - throttleMediator.mediate(synCtx); - Thread.sleep(1000); - } - catch (Exception e) { - - if (i == 3) { - assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0); - } - if (i == 4) { - assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0); - } - if (i == 5) { - assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0); - } - } - } + ByteArrayInputStream in = new ByteArrayInputStream(POLICY.getBytes()); + StAXOMBuilder build = new StAXOMBuilder(in); + ThrottleTestMediator throttleMediator = new ThrottleTestMediator(); + throttleMediator.setInLinePolicy(build.getDocumentElement()); + MessageContext synCtx = createLightweightSynapseMessageContext("<empty/>"); + synCtx.setProperty(REMOTE_ADDR, "192.168.8.212"); + SynapseConfiguration synCfg = new SynapseConfiguration(); + synCtx.setConfiguration(synCfg); + for (int i = 0; i < 6; i++) { + try { + throttleMediator.mediate(synCtx); + Thread.sleep(1000); + } + catch (Exception e) { + + if (i == 3) { + assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0); + } + if (i == 4) { + assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0); + } + if (i == 5) { + assertTrue(e.getMessage().lastIndexOf("IP_BASE") > 0); + } + } + } - } + } public class ThrottleTestMediator extends AbstractMediator { @@ -213,7 +228,7 @@ } //IP based throttling - String remoteIP = (String)synContext.getProperty(REMOTE_ADDR); + String remoteIP = (String) synContext.getProperty(REMOTE_ADDR); if (remoteIP == null) { throw new ThrottleException("IP address of the caller can not find - Currently only support caller-IP base access control" + "- Thottling will not happen "); @@ -227,8 +242,9 @@ } AccessRateController accessControler; try { - accessControler =new AccessRateController(); - boolean canAccess = accessControler.canAccess(throttleContext, remoteIP,ThrottleConstants.IP_BASE); + accessControler = new AccessRateController(); + boolean canAccess = accessControler.canAccess( + throttleContext, remoteIP, ThrottleConstants.IP_BASE).isAccessAllowed(); if (!canAccess) { throw new SynapseException("Access has currently been denied by the IP_BASE throttle for IP :\t" + remoteIP); } @@ -260,7 +276,7 @@ reCreate = true; } policyOmElement = (OMElement) entryValue; - } else if (inLinePolicy != null){ + } else if (inLinePolicy != null) { policyOmElement = inLinePolicy; } if (policyOmElement == null) { @@ -280,8 +296,7 @@ protected void createThrottleMetaData(OMElement policyOmElement) { try { - throttle = ThrottlePolicyProcessor - .processPolicy(PolicyEngine.getPolicy(policyOmElement)); + throttle = ThrottleFactory.createMediatorThrottle(PolicyEngine.getPolicy(policyOmElement)); } catch (ThrottleException e) { Modified: branches/synapse/1.3-wso2v1/pom.xml URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/pom.xml?rev=37391&r1=37390&r2=37391&view=diff ============================================================================== --- branches/synapse/1.3-wso2v1/pom.xml (original) +++ branches/synapse/1.3-wso2v1/pom.xml Mon Jun 1 08:50:02 2009 @@ -854,7 +854,7 @@ <!-- dependencies of Synapse extensions module --> <wso2caching.version>3.1-SNAPSHOT</wso2caching.version> - <wso2throttle.version>1.6</wso2throttle.version> + <wso2throttle.version>3.1-SNAPSHOT</wso2throttle.version> <wso2eventing-api.version>SNAPSHOT</wso2eventing-api.version> <xbean.version>2.2.0</xbean.version> <bsf.version>3.0-beta2</bsf.version> _______________________________________________ Esb-java-dev mailing list [email protected] https://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev
