Hi
What version of Camel are you using?
The current codebase uses a pool size as the concurrent consumers parameter
int poolSize = endpoint.getConcurrentConsumers();
executor = ExecutorServiceHelper.newFixedThreadPool(poolSize,
endpoint.getEndpointUri(), true);
On Tue, Jul 14, 2009 at 3:57 AM, Ole Jørgensen<[email protected]> wrote:
>
> Concurrentconsumers not concurrent
>
> I made a test with concurrentconsumers. Problem is that I can get max 5
> realy concurrent threads. I log the id and name of the treads and it seems,
> that camel indeed is making the right number of threads but only 5 is realy
> working concurrent.
> I put 25 messages on a seda queue and specify concurrentconsumers. My
> consumer works for 2 seconds on each message. Concurrentconsumers works as
> expected for 1-5 cunsumers, but specifying more than 5 concurrentconsumers
> does not make more consumers work concurrent - max 5. I would expect all 25
> messages to be consumed and handled in 2 seconds with 25 concurrentconsumers
> or more. If I split the messages on 2 different queues I still get max 5
> realy concurrentconsumers.
> Same story when I use activemq.
>
> What am I doing wrong ?
>
>
> -------------------------
> <camel:camelContext id="camelContext">
> <camel:endpoint id="site0"
> uri="activemq:site0?concurrentConsumers=10"/>
> <camel:endpoint id="site1"
> uri="activemq:site1?concurrentConsumers=10"/>
> </camel:camelContext>
>
> <bean id="siteHandler0" class="SiteHandler"/>
> <bean id="siteHandler1" class="SiteHandler"/>
>
> ------------------------
>
> public class CamelRoutes extends RouteBuilder {
> �...@override
> public void configure() throws Exception {
> from("sites").to("bean:split");
> from("site0").to("bean:siteHandler0");
> from("site1").to("bean:siteHandler1");
> }
> }
> ---------------
>
> public Map onCheese(Map<String, Object> input) {
>
> Map<String, Object> res = new HashMap<String, Object>();
>
> Future[] futures = new Future[25];
> for (int i = 0; i < futures.length; i++) {
> Map<String, Object> newInput = new HashMap<String, Object>();
> newInput.putAll(input);
> newInput.put("i", i);
> futures[i] = producerTemplate.asyncRequestBody("site" + (i % 2),
> newInput);
> System.out.println("send = " + i);
> }
> System.out.println("------------------ Checking futures
> ------------------------");
> for (int i = 0; i < futures.length; i++) {
> Future future = futures[i];
> Map reply = producerTemplate.extractFutureBody(future, Map.class);
> res.putAll(reply);
> }
> return res;
> }
>
> -----------------------
>
> public class SiteHandler {
> public Map onSite(Map<String, Object> input) {
> System.out.println("siteHandler: " + input.get("i"));
>
> long start = System.currentTimeMillis();
> while (System.currentTimeMillis() < start + 2000);
>
> Map<String, Object> res = new HashMap<String, Object>();
> res.putAll(input);
> res.put(this.hashCode() + "", input.get("i"));
> res.put(Thread.currentThread().getId() + ": " +
> Thread.currentThread().getName(), input.get("i"));
>
> return res;
> }
> }
> ---------------------
> --
> View this message in context:
> http://www.nabble.com/Concurrentconsumers-not-concurrent-tp24472473p24472473.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>
--
Claus Ibsen
Apache Camel Committer
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus