Author: akarpe Date: Fri Apr 2 16:23:31 2010 New Revision: 930313 URL: http://svn.apache.org/viewvc?rev=930313&view=rev Log: CAMEL-2526 - Fix to add check to limit the concurrentConsumer to < 500
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=930313&r1=930312&r2=930313&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Fri Apr 2 16:23:31 2010 @@ -24,6 +24,8 @@ import java.util.concurrent.LinkedBlocki import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultComponent; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * An implementation of the <a href="http://camel.apache.org/seda.html">SEDA components</a> @@ -33,8 +35,10 @@ import org.apache.camel.impl.DefaultComp */ public class SedaComponent extends DefaultComponent { + private static final transient Log LOG = LogFactory.getLog(SedaComponent.class); private final Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>(); - + private final int maxConcurrentConsumers = 500; + public synchronized BlockingQueue<Exchange> createQueue(String uri, Map<String, Object> parameters) { String key = getQueueKey(uri); @@ -58,6 +62,14 @@ public class SedaComponent extends Defau @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, 1); + boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true); + if (limitConcurrentConsumers) { + consumers = (consumers < maxConcurrentConsumers) ? consumers : maxConcurrentConsumers; + if (consumers == maxConcurrentConsumers) { + LOG.info("The limitConcurrentConsumers flag in set to true. Concurrent Consumers cannot be set at a value greater than " + maxConcurrentConsumers); + LOG.info("Concurrent Consumers set to " + maxConcurrentConsumers); + } + } SedaEndpoint answer = new SedaEndpoint(uri, this, createQueue(uri, parameters), consumers); answer.configureProperties(parameters); return answer;