An update.

I noticed in the stack trace that it seems to be the *production* of
messages that get the exception "Unable to complete sending the message:
Only one thread may use a JMS Session at a time." - not the *consumption*.
I also only showed you the first part of the route (cause I didn't think
the rest mattered). What I do is basically reading xml messages from one
queue, transforming them and the sending them to another queue.

Moreover, one incoming message may end up creating more than one (in this
case 3) out messages. I'm using a splitter to accomplish this. In pseudo
code the route looks like this:

  from(sjms:...).threads(10).process(converting...).split().to(sjms:...);

If I set consumerCount=10 on the consumer endpoint AND use threads(10) then
I can see that more threads are created but I get the "Only one thread may
use ..." exception. Now, if I also set the producerCount=10 on the producer
endpoint then this exception goes away and my route works.

Not exactly sure why this works. If anyone could explain the relationship
between consumer/producer count and threads I would appreciate it. A theory
of mine is that there must be at least as many consumers/producers as there
are threads or there is a risk that two threads will try to use the same
consumer/producer. If there is a one-to-one relationship between
consumer/producer and JMS session then this could explain the exception.
But this still sounds weird. If there is a pool of consumers/producers then
the thread should try to acquire one from the pool. If no consumer/producer
is available then the thread should wait until one is available - not try
to use one that is in use by another thread.

Also, although I can get my route to work this way, I get no better
throughput than if I only use one thread. I get the exact same throughput
and my CPU is basically idling. Obviously I'm not doing this right.

Do I also need multiple connections to the JMS server? Could this affect
the concurrency?

/Bengt





2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:

> I've now tried just using consumerCount on the endpoint without using
> threads(). However, it doesn't make it multithreaded. Only one thread is
> started for the route.
>
> Any other ideas? Has someone used sjms with a multithreaded consumer?
>
> /Bengt
>
>
> 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>
> Ok - thanks. I have tried it but only together with threads(). I didn't
>> realize that it might create threads on its own.
>>
>> /Bengt
>> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <claus.ib...@gmail.com>:
>>
>> Hi
>>>
>>> I think you should use the consumerCount option on the endpoint instead
>>>
>>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com>
>>> wrote:
>>> > I'm using Camel 2.12.3 and the sjms component for communicating with
>>> > Weblogic JMS.
>>> >
>>> > Everything works fine when I use a single thread. However, to increase
>>> > throughput I want multiple threads to read messages from the queue.
>>> >
>>> > I've done this by using the "threads()" method:
>>> >
>>> >   from(sjms:...).threads(10)....
>>> >
>>> > However I get an exception as follows:
>>> >
>>> > java.lang.Exception: Unable to complete sending the message: Only one
>>> > thread may use a JMS Session at a time.:Only one thread may use a JMS
>>> > Session at a time.
>>> > at
>>> >
>>> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
>>> > at
>>> >
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
>>> > at java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
>>> > at
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
>>> > at
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
>>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>>> >
>>> > What exactly does this mean and how can I consume messages via sjms
>>> using
>>> > multiple threads?
>>> >
>>> > /Bengt
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> -----------------
>>> Red Hat, Inc.
>>> Email: cib...@redhat.com
>>> Twitter: davsclaus
>>> Blog: http://davsclaus.com
>>> Author of Camel in Action: http://www.manning.com/ibsen
>>> Make your Camel applications look hawt, try: http://hawt.io
>>>
>>
>

Reply via email to