Hi David,

Your application should be i/o bound with a pre-fetch of 0 (effectively pul every message from the broker) - so something else is going on
Could you send a full stack trace ?

thanks,

Rob
On Dec 20, 2007, at 6:55 AM, David Sitsky wrote:

Hi,

I have simplistically speaking, an application using ActiveMQ 5 which has a queue called work-items.

A master JVM process puts work items onto the queue (this process also runs the embedded JMS broker), and external worker JVM processes read these items using receive() with a timeout of 2 seconds.

I have 4 quad-cores machines running this application, one runs the master program as described, and the other three quad-core machines each runs a worker JVM process, all connected with gigabit ethernet, all machines are running Vista and Java 1.6.

I have 4 separate threads within each worker JVM processing data, each thread with two connections to the broker (one for sending data, one for receiving data) which at the time seemed to give the best results. Each connection has their own associated session object to avoid any "contention".

I am using a prefetch size of 0, as have found this gives the best performance in this application, as each work item's payload can have an unpredictable amount of processing time. Data is sent and received using transactions, and the messages are non-persistent.

Using JMX, I often see the queue size can easily grow to around 10,000 - 20,000 elements, which is as expected, but often, I see the worker threads sitting waiting for messages to be dispatched to them (verified using thread dumps). I also see the master process is consuming a lot of CPU, which perhaps explains why it is unable to keep up.

I've read various documents on the wiki, and have tried a lot of different settings, but it seems that the broker is unable to deliver these messages fast enough to the consumers, or something else is holding it back.

Here are some relevant thread snapshots when the broker is grinding... if anybody has some ideas on what I might be doing wrong, I'd love some suggestions. I have gotten carried away with creating too many connections?

I am a bit worried by PooledTaskRunner iterating over this large number of messages often...

"ActiveMQ Transport: tcp:///192.168.222.75:49197" daemon prio=4 RUNNABLE at org .apache .activemq .broker.region.RegionBroker.getBrokerService(RegionBroker.java:625) at org .apache .activemq.broker.BrokerFilter.getBrokerService(BrokerFilter.java:245) at org .apache .activemq.broker.BrokerFilter.getBrokerService(BrokerFilter.java:245) at org .apache .activemq.broker.BrokerFilter.getBrokerService(BrokerFilter.java:245) at org .apache .activemq .broker .MutableBrokerFilter.getBrokerService(MutableBrokerFilter.java:261) at org .apache .activemq.broker.ConnectionContext.isSlave(ConnectionContext.java:265) at org .apache .activemq .broker .region.AbstractSubscription.isSlave(AbstractSubscription.java:118) at org .apache .activemq .broker.region.PrefetchSubscription.isFull(PrefetchSubscription.java: 361) at org .apache .activemq .broker .region .PrefetchSubscription.dispatchMatched(PrefetchSubscription.java:452) at org .apache .activemq .broker .region.PrefetchSubscription.pullMessage(PrefetchSubscription.java:90) at org .apache .activemq .broker.region.AbstractRegion.messagePull(AbstractRegion.java:348) at org .apache .activemq.broker.region.RegionBroker.messagePull(RegionBroker.java: 434) at org .apache.activemq.broker.BrokerFilter.messagePull(BrokerFilter.java:77) at org .apache.activemq.broker.BrokerFilter.messagePull(BrokerFilter.java:77) at org .apache.activemq.broker.BrokerFilter.messagePull(BrokerFilter.java:77) at org .apache .activemq .broker.MutableBrokerFilter.messagePull(MutableBrokerFilter.java:245) at org .apache .activemq .broker .TransportConnection.processMessagePull(TransportConnection.java:444)
        at org.apache.activemq.command.MessagePull.visit(MessagePull.java:43)
at org .apache .activemq .broker.TransportConnection.service(TransportConnection.java:280) at org.apache.activemq.broker.TransportConnection $1.onCommand(TransportConnection.java:177) at org .apache .activemq.transport.TransportFilter.onCommand(TransportFilter.java:67) at org .apache .activemq .transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java: 134) at org .apache .activemq .transport.InactivityMonitor.onCommand(InactivityMonitor.java:150) at org .apache .activemq.transport.TransportSupport.doConsume(TransportSupport.java: 83) at org .apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java: 185) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: 172)
        at java.lang.Thread.run(Unknown Source)

"ActiveMQ Task" daemon prio=5 RUNNABLE
at org .apache .activemq .broker.region.PrefetchSubscription.isFull(PrefetchSubscription.java: 361) at org .apache .activemq .broker .region .PrefetchSubscription.dispatchMatched(PrefetchSubscription.java:452) at org .apache .activemq .broker.region.PrefetchSubscription.add(PrefetchSubscription.java:141) at org .apache .activemq .broker .region .policy .RoundRobinDispatchPolicy.dispatch(RoundRobinDispatchPolicy.java:70) at org.apache.activemq.broker.region.Queue.doDispatch(Queue.java: 1059) at org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java: 1072)
        at org.apache.activemq.broker.region.Queue.iterate(Queue.java:945)
at org .apache .activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:118) at org.apache.activemq.thread.PooledTaskRunner $1.run(PooledTaskRunner.java:42) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

"ActiveMQ Transport: tcp:///192.168.222.74:49196" daemon prio=4 BLOCKED at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java: 449)
        at org.apache.activemq.broker.region.Queue.send(Queue.java:441)
at org .apache .activemq.broker.region.AbstractRegion.send(AbstractRegion.java:328) at org .apache.activemq.broker.region.RegionBroker.send(RegionBroker.java: 402) at org .apache .activemq.broker.TransactionBroker.send(TransactionBroker.java:224) at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java: 125) at org .apache .activemq .broker .CompositeDestinationBroker.send(CompositeDestinationBroker.java:95) at org .apache .activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java: 135) at org .apache .activemq .broker.TransportConnection.processMessage(TransportConnection.java: 433) at org .apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java: 623) at org .apache .activemq .broker.TransportConnection.service(TransportConnection.java:280) at org.apache.activemq.broker.TransportConnection $1.onCommand(TransportConnection.java:177) at org .apache .activemq.transport.TransportFilter.onCommand(TransportFilter.java:67) at org .apache .activemq .transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java: 134) at org .apache .activemq .transport.InactivityMonitor.onCommand(InactivityMonitor.java:150) at org .apache .activemq.transport.TransportSupport.doConsume(TransportSupport.java: 83) at org .apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java: 185) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: 172)
        at java.lang.Thread.run(Unknown Source)


--
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699 Web: http://www.nuix.com Fax: +61 2 9212 6902

Reply via email to