Hi David,
Your application should be i/o bound with a pre-fetch of 0
(effectively pul every message from the broker) - so something else is
going on
Could you send a full stack trace ?
thanks,
Rob
On Dec 20, 2007, at 6:55 AM, David Sitsky wrote:
Hi,
I have simplistically speaking, an application using ActiveMQ 5
which has a queue called work-items.
A master JVM process puts work items onto the queue (this process
also runs the embedded JMS broker), and external worker JVM
processes read these items using receive() with a timeout of 2
seconds.
I have 4 quad-cores machines running this application, one runs the
master program as described, and the other three quad-core machines
each runs a worker JVM process, all connected with gigabit ethernet,
all machines are running Vista and Java 1.6.
I have 4 separate threads within each worker JVM processing data,
each thread with two connections to the broker (one for sending
data, one for receiving data) which at the time seemed to give the
best results. Each connection has their own associated session
object to avoid any "contention".
I am using a prefetch size of 0, as have found this gives the best
performance in this application, as each work item's payload can
have an unpredictable amount of processing time. Data is sent and
received using transactions, and the messages are non-persistent.
Using JMX, I often see the queue size can easily grow to around
10,000 - 20,000 elements, which is as expected, but often, I see the
worker threads sitting waiting for messages to be dispatched to them
(verified using thread dumps). I also see the master process is
consuming a lot of CPU, which perhaps explains why it is unable to
keep up.
I've read various documents on the wiki, and have tried a lot of
different settings, but it seems that the broker is unable to
deliver these messages fast enough to the consumers, or something
else is holding it back.
Here are some relevant thread snapshots when the broker is
grinding... if anybody has some ideas on what I might be doing
wrong, I'd love some suggestions. I have gotten carried away with
creating too many connections?
I am a bit worried by PooledTaskRunner iterating over this large
number of messages often...
"ActiveMQ Transport: tcp:///192.168.222.75:49197" daemon prio=4
RUNNABLE
at
org
.apache
.activemq
.broker.region.RegionBroker.getBrokerService(RegionBroker.java:625)
at
org
.apache
.activemq.broker.BrokerFilter.getBrokerService(BrokerFilter.java:245)
at
org
.apache
.activemq.broker.BrokerFilter.getBrokerService(BrokerFilter.java:245)
at
org
.apache
.activemq.broker.BrokerFilter.getBrokerService(BrokerFilter.java:245)
at
org
.apache
.activemq
.broker
.MutableBrokerFilter.getBrokerService(MutableBrokerFilter.java:261)
at
org
.apache
.activemq.broker.ConnectionContext.isSlave(ConnectionContext.java:265)
at
org
.apache
.activemq
.broker
.region.AbstractSubscription.isSlave(AbstractSubscription.java:118)
at
org
.apache
.activemq
.broker.region.PrefetchSubscription.isFull(PrefetchSubscription.java:
361)
at
org
.apache
.activemq
.broker
.region
.PrefetchSubscription.dispatchMatched(PrefetchSubscription.java:452)
at
org
.apache
.activemq
.broker
.region.PrefetchSubscription.pullMessage(PrefetchSubscription.java:90)
at
org
.apache
.activemq
.broker.region.AbstractRegion.messagePull(AbstractRegion.java:348)
at
org
.apache
.activemq.broker.region.RegionBroker.messagePull(RegionBroker.java:
434)
at
org
.apache.activemq.broker.BrokerFilter.messagePull(BrokerFilter.java:77)
at
org
.apache.activemq.broker.BrokerFilter.messagePull(BrokerFilter.java:77)
at
org
.apache.activemq.broker.BrokerFilter.messagePull(BrokerFilter.java:77)
at
org
.apache
.activemq
.broker.MutableBrokerFilter.messagePull(MutableBrokerFilter.java:245)
at
org
.apache
.activemq
.broker
.TransportConnection.processMessagePull(TransportConnection.java:444)
at org.apache.activemq.command.MessagePull.visit(MessagePull.java:43)
at
org
.apache
.activemq
.broker.TransportConnection.service(TransportConnection.java:280)
at org.apache.activemq.broker.TransportConnection
$1.onCommand(TransportConnection.java:177)
at
org
.apache
.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at
org
.apache
.activemq
.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:
134)
at
org
.apache
.activemq
.transport.InactivityMonitor.onCommand(InactivityMonitor.java:150)
at
org
.apache
.activemq.transport.TransportSupport.doConsume(TransportSupport.java:
83)
at
org
.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:
185)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
172)
at java.lang.Thread.run(Unknown Source)
"ActiveMQ Task" daemon prio=5 RUNNABLE
at
org
.apache
.activemq
.broker.region.PrefetchSubscription.isFull(PrefetchSubscription.java:
361)
at
org
.apache
.activemq
.broker
.region
.PrefetchSubscription.dispatchMatched(PrefetchSubscription.java:452)
at
org
.apache
.activemq
.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:141)
at
org
.apache
.activemq
.broker
.region
.policy
.RoundRobinDispatchPolicy.dispatch(RoundRobinDispatchPolicy.java:70)
at org.apache.activemq.broker.region.Queue.doDispatch(Queue.java:
1059)
at
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:
1072)
at org.apache.activemq.broker.region.Queue.iterate(Queue.java:945)
at
org
.apache
.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:118)
at org.apache.activemq.thread.PooledTaskRunner
$1.run(PooledTaskRunner.java:42)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
"ActiveMQ Transport: tcp:///192.168.222.74:49196" daemon prio=4
BLOCKED
at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:
449)
at org.apache.activemq.broker.region.Queue.send(Queue.java:441)
at
org
.apache
.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:328)
at
org
.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:
402)
at
org
.apache
.activemq.broker.TransactionBroker.send(TransactionBroker.java:224)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:
125)
at
org
.apache
.activemq
.broker
.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
at
org
.apache
.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:
135)
at
org
.apache
.activemq
.broker.TransportConnection.processMessage(TransportConnection.java:
433)
at
org
.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:
623)
at
org
.apache
.activemq
.broker.TransportConnection.service(TransportConnection.java:280)
at org.apache.activemq.broker.TransportConnection
$1.onCommand(TransportConnection.java:177)
at
org
.apache
.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at
org
.apache
.activemq
.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:
134)
at
org
.apache
.activemq
.transport.InactivityMonitor.onCommand(InactivityMonitor.java:150)
at
org
.apache
.activemq.transport.TransportSupport.doConsume(TransportSupport.java:
83)
at
org
.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:
185)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
172)
at java.lang.Thread.run(Unknown Source)
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
0699
Web: http://www.nuix.com Fax: +61 2 9212
6902