Hi! I use the latest version of Camel, 2.12.3.
/Bengt Den 17 mar 2014 18:10 skrev "Scott England-Sullivan" <sully6...@gmail.com>: > Hi Bengt, > > What version of SJMS are you using? In earlier versions I had code in that > reused a single session for multiple consumers. This in effect made it > single threaded. Later versions (2.13 & 2.12.3) have a patch that has > removed the session restriction and uses a session per consumer. > > > On Mon, Mar 17, 2014 at 10:34 AM, Bengt Rodehav <be...@rodehav.com> wrote: > > > I've done some more research. It turns out that I was mistaken regarding > > the throughput. I had been watching the JMX attribute > "MeanProcessingTime" > > which is the same regardless of the number of threads. However the actual > > time taken to process 1000 messages is 115 seconds using one thread but > 38 > > seconds using three threads. Using more threads than three does not > > increase throughput. > > > > I think it's interesting that I can get my throughput down by (almost) > > exactly a factor of three if I use three threads. > > > > Another interesting fact is that for every message I read from JMS I > create > > exactly three messages that I write on another queue. The number three > > keeps coming up... > > > > Since I do about 1500 reads and 4500 writes from/to Weblogic JMS per > minute > > I guess it is possible that Weblogic JMS is the bottleneck. It is about > 100 > > JMS operations/second. > > > > /Bengt > > > > > > > > 2014-03-17 13:55 GMT+01:00 Bengt Rodehav <be...@rodehav.com>: > > > > > Hello Charles, > > > > > > I looked at your unit test but I don't understand it all. It seems to > use > > > an inout pattern and logs the threadId which I guess is a way to see > that > > > different threads are used. I haven't logged that in my code yet. > > > > > > However, I do get multiple threads to "work" by using "consumerCount", > > > "producerCount" and "threads" - all set to the same value. But I don't > > get > > > any better throughput which leads me to believe that somehow the > > execution > > > is still serialized somewhere - i e there is a bottleneck somewhere. > > > > > > I think it should be a fairly common scenario to want to consumer JMS > > > message in a multithreaded fashion in order to increase throughput. But > > > perhaps no one has done this with sjms yet. > > > > > > /Bengt > > > > > > > > > > > > > > > 2014-03-17 11:27 GMT+01:00 Charles Moulliard <ch0...@gmail.com>: > > > > > > Commit available here : > > >> > > >> > > > https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9 > > >> > > >> > > >> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch0...@gmail.com > > >> >wrote: > > >> > > >> > I will commit soon a unit test. That could help us to verify if > > >> something > > >> > goes wrong. > > >> > > > >> > > > >> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com> > > >> wrote: > > >> > > > >> >> An update. > > >> >> > > >> >> I noticed in the stack trace that it seems to be the *production* > of > > >> >> messages that get the exception "Unable to complete sending the > > >> message: > > >> >> Only one thread may use a JMS Session at a time." - not the > > >> *consumption*. > > >> >> I also only showed you the first part of the route (cause I didn't > > >> think > > >> >> the rest mattered). What I do is basically reading xml messages > from > > >> one > > >> >> queue, transforming them and the sending them to another queue. > > >> >> > > >> >> Moreover, one incoming message may end up creating more than one > (in > > >> this > > >> >> case 3) out messages. I'm using a splitter to accomplish this. In > > >> pseudo > > >> >> code the route looks like this: > > >> >> > > >> >> > > >> > from(sjms:...).threads(10).process(converting...).split().to(sjms:...); > > >> >> > > >> >> If I set consumerCount=10 on the consumer endpoint AND use > > threads(10) > > >> >> then > > >> >> I can see that more threads are created but I get the "Only one > > thread > > >> may > > >> >> use ..." exception. Now, if I also set the producerCount=10 on the > > >> >> producer > > >> >> endpoint then this exception goes away and my route works. > > >> >> > > >> >> Not exactly sure why this works. If anyone could explain the > > >> relationship > > >> >> between consumer/producer count and threads I would appreciate it. > A > > >> >> theory > > >> >> of mine is that there must be at least as many consumers/producers > as > > >> >> there > > >> >> are threads or there is a risk that two threads will try to use the > > >> same > > >> >> consumer/producer. If there is a one-to-one relationship between > > >> >> consumer/producer and JMS session then this could explain the > > >> exception. > > >> >> But this still sounds weird. If there is a pool of > > consumers/producers > > >> >> then > > >> >> the thread should try to acquire one from the pool. If no > > >> >> consumer/producer > > >> >> is available then the thread should wait until one is available - > not > > >> try > > >> >> to use one that is in use by another thread. > > >> >> > > >> >> Also, although I can get my route to work this way, I get no better > > >> >> throughput than if I only use one thread. I get the exact same > > >> throughput > > >> >> and my CPU is basically idling. Obviously I'm not doing this right. > > >> >> > > >> >> Do I also need multiple connections to the JMS server? Could this > > >> affect > > >> >> the concurrency? > > >> >> > > >> >> /Bengt > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>: > > >> >> > > >> >> > I've now tried just using consumerCount on the endpoint without > > using > > >> >> > threads(). However, it doesn't make it multithreaded. Only one > > >> thread is > > >> >> > started for the route. > > >> >> > > > >> >> > Any other ideas? Has someone used sjms with a multithreaded > > consumer? > > >> >> > > > >> >> > /Bengt > > >> >> > > > >> >> > > > >> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>: > > >> >> > > > >> >> > Ok - thanks. I have tried it but only together with threads(). I > > >> didn't > > >> >> >> realize that it might create threads on its own. > > >> >> >> > > >> >> >> /Bengt > > >> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" < > claus.ib...@gmail.com > > >: > > >> >> >> > > >> >> >> Hi > > >> >> >>> > > >> >> >>> I think you should use the consumerCount option on the endpoint > > >> >> instead > > >> >> >>> > > >> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav < > > be...@rodehav.com> > > >> >> >>> wrote: > > >> >> >>> > I'm using Camel 2.12.3 and the sjms component for > communicating > > >> with > > >> >> >>> > Weblogic JMS. > > >> >> >>> > > > >> >> >>> > Everything works fine when I use a single thread. However, to > > >> >> increase > > >> >> >>> > throughput I want multiple threads to read messages from the > > >> queue. > > >> >> >>> > > > >> >> >>> > I've done this by using the "threads()" method: > > >> >> >>> > > > >> >> >>> > from(sjms:...).threads(10).... > > >> >> >>> > > > >> >> >>> > However I get an exception as follows: > > >> >> >>> > > > >> >> >>> > java.lang.Exception: Unable to complete sending the message: > > Only > > >> >> one > > >> >> >>> > thread may use a JMS Session at a time.:Only one thread may > > use a > > >> >> JMS > > >> >> >>> > Session at a time. > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32] > > >> >> >>> > at > > >> >> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32] > > >> >> >>> > at > > >> >> >>> > > > >> >> >>> > > >> >> > > >> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32] > > >> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32] > > >> >> >>> > > > >> >> >>> > What exactly does this mean and how can I consume messages > via > > >> sjms > > >> >> >>> using > > >> >> >>> > multiple threads? > > >> >> >>> > > > >> >> >>> > /Bengt > > >> >> >>> > > >> >> >>> > > >> >> >>> > > >> >> >>> -- > > >> >> >>> Claus Ibsen > > >> >> >>> ----------------- > > >> >> >>> Red Hat, Inc. > > >> >> >>> Email: cib...@redhat.com > > >> >> >>> Twitter: davsclaus > > >> >> >>> Blog: http://davsclaus.com > > >> >> >>> Author of Camel in Action: http://www.manning.com/ibsen > > >> >> >>> Make your Camel applications look hawt, try: http://hawt.io > > >> >> >>> > > >> >> >> > > >> >> > > > >> >> > > >> > > > >> > > > >> > > > >> > -- > > >> > Charles Moulliard > > >> > Apache Committer / Architect @RedHat > > >> > Twitter : @cmoulliard | Blog : http://cmoulliard.github.io > > >> > > > >> > > > >> > > >> > > >> -- > > >> Charles Moulliard > > >> Apache Committer / Architect @RedHat > > >> Twitter : @cmoulliard | Blog : http://cmoulliard.github.io > > >> > > > > > > > > > > > > -- > -- > Scott England-Sullivan > Apache Camel Committer > Principal Consultant / Sr. Architect | Red Hat, Inc. > FuseSource is now part of Red Hat > Web: fusesource.com <http://www.fusesource.com> | > redhat.com<http://www.redhat.com> > Blog: sully6768.blogspot.com > Twitter: sully6768 >