I've done some more research. It turns out that I was mistaken regarding
the throughput. I had been watching the JMX attribute "MeanProcessingTime"
which is the same regardless of the number of threads. However the actual
time taken to process 1000 messages is 115 seconds using one thread but 38
seconds using three threads. Using more threads than three does not
increase throughput.

I think it's interesting that I can get my throughput down by (almost)
exactly a factor of three if I use three threads.

Another interesting fact is that for every message I read from JMS I create
exactly three messages that I write on another queue. The number three
keeps coming up...

Since I do about 1500 reads and 4500 writes from/to Weblogic JMS per minute
I guess it is possible that Weblogic JMS is the bottleneck. It is about 100
JMS operations/second.

/Bengt



2014-03-17 13:55 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:

> Hello Charles,
>
> I looked at your unit test but I don't understand it all. It seems to use
> an inout pattern and logs the threadId which I guess is a way to see that
> different threads are used. I haven't logged that in my code yet.
>
> However, I do get multiple threads to "work" by using "consumerCount",
> "producerCount" and "threads" - all set to the same value. But I don't get
> any better throughput which leads me to believe that somehow the execution
> is still serialized somewhere - i e there is a bottleneck somewhere.
>
> I think it should be a fairly common scenario to want to consumer JMS
> message in a multithreaded fashion in order to increase throughput. But
> perhaps no one has done this with sjms yet.
>
> /Bengt
>
>
>
>
> 2014-03-17 11:27 GMT+01:00 Charles Moulliard <ch0...@gmail.com>:
>
> Commit available here :
>>
>> https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9
>>
>>
>> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch0...@gmail.com
>> >wrote:
>>
>> > I will commit soon a unit test. That could help us to verify if
>> something
>> > goes wrong.
>> >
>> >
>> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com>
>> wrote:
>> >
>> >> An update.
>> >>
>> >> I noticed in the stack trace that it seems to be the *production* of
>> >> messages that get the exception "Unable to complete sending the
>> message:
>> >> Only one thread may use a JMS Session at a time." - not the
>> *consumption*.
>> >> I also only showed you the first part of the route (cause I didn't
>> think
>> >> the rest mattered). What I do is basically reading xml messages from
>> one
>> >> queue, transforming them and the sending them to another queue.
>> >>
>> >> Moreover, one incoming message may end up creating more than one (in
>> this
>> >> case 3) out messages. I'm using a splitter to accomplish this. In
>> pseudo
>> >> code the route looks like this:
>> >>
>> >>
>> from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
>> >>
>> >> If I set consumerCount=10 on the consumer endpoint AND use threads(10)
>> >> then
>> >> I can see that more threads are created but I get the "Only one thread
>> may
>> >> use ..." exception. Now, if I also set the producerCount=10 on the
>> >> producer
>> >> endpoint then this exception goes away and my route works.
>> >>
>> >> Not exactly sure why this works. If anyone could explain the
>> relationship
>> >> between consumer/producer count and threads I would appreciate it. A
>> >> theory
>> >> of mine is that there must be at least as many consumers/producers as
>> >> there
>> >> are threads or there is a risk that two threads will try to use the
>> same
>> >> consumer/producer. If there is a one-to-one relationship between
>> >> consumer/producer and JMS session then this could explain the
>> exception.
>> >> But this still sounds weird. If there is a pool of consumers/producers
>> >> then
>> >> the thread should try to acquire one from the pool. If no
>> >> consumer/producer
>> >> is available then the thread should wait until one is available - not
>> try
>> >> to use one that is in use by another thread.
>> >>
>> >> Also, although I can get my route to work this way, I get no better
>> >> throughput than if I only use one thread. I get the exact same
>> throughput
>> >> and my CPU is basically idling. Obviously I'm not doing this right.
>> >>
>> >> Do I also need multiple connections to the JMS server? Could this
>> affect
>> >> the concurrency?
>> >>
>> >> /Bengt
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>> >>
>> >> > I've now tried just using consumerCount on the endpoint without using
>> >> > threads(). However, it doesn't make it multithreaded. Only one
>> thread is
>> >> > started for the route.
>> >> >
>> >> > Any other ideas? Has someone used sjms with a multithreaded consumer?
>> >> >
>> >> > /Bengt
>> >> >
>> >> >
>> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>> >> >
>> >> > Ok - thanks. I have tried it but only together with threads(). I
>> didn't
>> >> >> realize that it might create threads on its own.
>> >> >>
>> >> >> /Bengt
>> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <claus.ib...@gmail.com>:
>> >> >>
>> >> >> Hi
>> >> >>>
>> >> >>> I think you should use the consumerCount option on the endpoint
>> >> instead
>> >> >>>
>> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com>
>> >> >>> wrote:
>> >> >>> > I'm using Camel 2.12.3 and the sjms component for communicating
>> with
>> >> >>> > Weblogic JMS.
>> >> >>> >
>> >> >>> > Everything works fine when I use a single thread. However, to
>> >> increase
>> >> >>> > throughput I want multiple threads to read messages from the
>> queue.
>> >> >>> >
>> >> >>> > I've done this by using the "threads()" method:
>> >> >>> >
>> >> >>> >   from(sjms:...).threads(10)....
>> >> >>> >
>> >> >>> > However I get an exception as follows:
>> >> >>> >
>> >> >>> > java.lang.Exception: Unable to complete sending the message: Only
>> >> one
>> >> >>> > thread may use a JMS Session at a time.:Only one thread may use a
>> >> JMS
>> >> >>> > Session at a time.
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
>> >> >>> > at
>> >> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
>> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>> >> >>> >
>> >> >>> > What exactly does this mean and how can I consume messages via
>> sjms
>> >> >>> using
>> >> >>> > multiple threads?
>> >> >>> >
>> >> >>> > /Bengt
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>> Claus Ibsen
>> >> >>> -----------------
>> >> >>> Red Hat, Inc.
>> >> >>> Email: cib...@redhat.com
>> >> >>> Twitter: davsclaus
>> >> >>> Blog: http://davsclaus.com
>> >> >>> Author of Camel in Action: http://www.manning.com/ibsen
>> >> >>> Make your Camel applications look hawt, try: http://hawt.io
>> >> >>>
>> >> >>
>> >> >
>> >>
>> >
>> >
>> >
>> > --
>> > Charles Moulliard
>> > Apache Committer / Architect @RedHat
>> > Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>> >
>> >
>>
>>
>> --
>> Charles Moulliard
>> Apache Committer / Architect @RedHat
>> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>>
>
>

Reply via email to