Hi Bengt,

What version of SJMS are you using?  In earlier versions I had code in that
reused a single session for multiple consumers.  This in effect made it
single threaded.  Later versions (2.13 & 2.12.3) have a patch that has
removed the session restriction and uses a session per consumer.


On Mon, Mar 17, 2014 at 10:34 AM, Bengt Rodehav <be...@rodehav.com> wrote:

> I've done some more research. It turns out that I was mistaken regarding
> the throughput. I had been watching the JMX attribute "MeanProcessingTime"
> which is the same regardless of the number of threads. However the actual
> time taken to process 1000 messages is 115 seconds using one thread but 38
> seconds using three threads. Using more threads than three does not
> increase throughput.
>
> I think it's interesting that I can get my throughput down by (almost)
> exactly a factor of three if I use three threads.
>
> Another interesting fact is that for every message I read from JMS I create
> exactly three messages that I write on another queue. The number three
> keeps coming up...
>
> Since I do about 1500 reads and 4500 writes from/to Weblogic JMS per minute
> I guess it is possible that Weblogic JMS is the bottleneck. It is about 100
> JMS operations/second.
>
> /Bengt
>
>
>
> 2014-03-17 13:55 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>
> > Hello Charles,
> >
> > I looked at your unit test but I don't understand it all. It seems to use
> > an inout pattern and logs the threadId which I guess is a way to see that
> > different threads are used. I haven't logged that in my code yet.
> >
> > However, I do get multiple threads to "work" by using "consumerCount",
> > "producerCount" and "threads" - all set to the same value. But I don't
> get
> > any better throughput which leads me to believe that somehow the
> execution
> > is still serialized somewhere - i e there is a bottleneck somewhere.
> >
> > I think it should be a fairly common scenario to want to consumer JMS
> > message in a multithreaded fashion in order to increase throughput. But
> > perhaps no one has done this with sjms yet.
> >
> > /Bengt
> >
> >
> >
> >
> > 2014-03-17 11:27 GMT+01:00 Charles Moulliard <ch0...@gmail.com>:
> >
> > Commit available here :
> >>
> >>
> https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch0...@gmail.com
> >> >wrote:
> >>
> >> > I will commit soon a unit test. That could help us to verify if
> >> something
> >> > goes wrong.
> >> >
> >> >
> >> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com>
> >> wrote:
> >> >
> >> >> An update.
> >> >>
> >> >> I noticed in the stack trace that it seems to be the *production* of
> >> >> messages that get the exception "Unable to complete sending the
> >> message:
> >> >> Only one thread may use a JMS Session at a time." - not the
> >> *consumption*.
> >> >> I also only showed you the first part of the route (cause I didn't
> >> think
> >> >> the rest mattered). What I do is basically reading xml messages from
> >> one
> >> >> queue, transforming them and the sending them to another queue.
> >> >>
> >> >> Moreover, one incoming message may end up creating more than one (in
> >> this
> >> >> case 3) out messages. I'm using a splitter to accomplish this. In
> >> pseudo
> >> >> code the route looks like this:
> >> >>
> >> >>
> >> from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
> >> >>
> >> >> If I set consumerCount=10 on the consumer endpoint AND use
> threads(10)
> >> >> then
> >> >> I can see that more threads are created but I get the "Only one
> thread
> >> may
> >> >> use ..." exception. Now, if I also set the producerCount=10 on the
> >> >> producer
> >> >> endpoint then this exception goes away and my route works.
> >> >>
> >> >> Not exactly sure why this works. If anyone could explain the
> >> relationship
> >> >> between consumer/producer count and threads I would appreciate it. A
> >> >> theory
> >> >> of mine is that there must be at least as many consumers/producers as
> >> >> there
> >> >> are threads or there is a risk that two threads will try to use the
> >> same
> >> >> consumer/producer. If there is a one-to-one relationship between
> >> >> consumer/producer and JMS session then this could explain the
> >> exception.
> >> >> But this still sounds weird. If there is a pool of
> consumers/producers
> >> >> then
> >> >> the thread should try to acquire one from the pool. If no
> >> >> consumer/producer
> >> >> is available then the thread should wait until one is available - not
> >> try
> >> >> to use one that is in use by another thread.
> >> >>
> >> >> Also, although I can get my route to work this way, I get no better
> >> >> throughput than if I only use one thread. I get the exact same
> >> throughput
> >> >> and my CPU is basically idling. Obviously I'm not doing this right.
> >> >>
> >> >> Do I also need multiple connections to the JMS server? Could this
> >> affect
> >> >> the concurrency?
> >> >>
> >> >> /Bengt
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >> >>
> >> >> > I've now tried just using consumerCount on the endpoint without
> using
> >> >> > threads(). However, it doesn't make it multithreaded. Only one
> >> thread is
> >> >> > started for the route.
> >> >> >
> >> >> > Any other ideas? Has someone used sjms with a multithreaded
> consumer?
> >> >> >
> >> >> > /Bengt
> >> >> >
> >> >> >
> >> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >> >> >
> >> >> > Ok - thanks. I have tried it but only together with threads(). I
> >> didn't
> >> >> >> realize that it might create threads on its own.
> >> >> >>
> >> >> >> /Bengt
> >> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <claus.ib...@gmail.com
> >:
> >> >> >>
> >> >> >> Hi
> >> >> >>>
> >> >> >>> I think you should use the consumerCount option on the endpoint
> >> >> instead
> >> >> >>>
> >> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <
> be...@rodehav.com>
> >> >> >>> wrote:
> >> >> >>> > I'm using Camel 2.12.3 and the sjms component for communicating
> >> with
> >> >> >>> > Weblogic JMS.
> >> >> >>> >
> >> >> >>> > Everything works fine when I use a single thread. However, to
> >> >> increase
> >> >> >>> > throughput I want multiple threads to read messages from the
> >> queue.
> >> >> >>> >
> >> >> >>> > I've done this by using the "threads()" method:
> >> >> >>> >
> >> >> >>> >   from(sjms:...).threads(10)....
> >> >> >>> >
> >> >> >>> > However I get an exception as follows:
> >> >> >>> >
> >> >> >>> > java.lang.Exception: Unable to complete sending the message:
> Only
> >> >> one
> >> >> >>> > thread may use a JMS Session at a time.:Only one thread may
> use a
> >> >> JMS
> >> >> >>> > Session at a time.
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
> >> >> >>> > at
> >> >> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
> >> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
> >> >> >>> >
> >> >> >>> > What exactly does this mean and how can I consume messages via
> >> sjms
> >> >> >>> using
> >> >> >>> > multiple threads?
> >> >> >>> >
> >> >> >>> > /Bengt
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> --
> >> >> >>> Claus Ibsen
> >> >> >>> -----------------
> >> >> >>> Red Hat, Inc.
> >> >> >>> Email: cib...@redhat.com
> >> >> >>> Twitter: davsclaus
> >> >> >>> Blog: http://davsclaus.com
> >> >> >>> Author of Camel in Action: http://www.manning.com/ibsen
> >> >> >>> Make your Camel applications look hawt, try: http://hawt.io
> >> >> >>>
> >> >> >>
> >> >> >
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > Charles Moulliard
> >> > Apache Committer / Architect @RedHat
> >> > Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
> >> >
> >> >
> >>
> >>
> >> --
> >> Charles Moulliard
> >> Apache Committer / Architect @RedHat
> >> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
> >>
> >
> >
>



-- 
-- 
Scott England-Sullivan
Apache Camel Committer
Principal Consultant / Sr. Architect | Red Hat, Inc.
FuseSource is now part of Red Hat
Web:     fusesource.com <http://www.fusesource.com> |
redhat.com<http://www.redhat.com>
Blog:     sully6768.blogspot.com
Twitter: sully6768

Reply via email to