Using the Spring listeners, we've actually got about 10-20 threads on
each stage of the pipeline, all consuming messages. The problem is that
we'd like to be able to deal with a situation where there are
potentially millions of messages waiting in to be processed without
having to concede ever more ram to the broker (or ever more threads to
the consumers). New messages should just get in line to be processed,
whether that's 100 milliseconds or 100 minutes from the time they
arrive. I thought persistence would do this for us, as it writes the
messages to disk instead of keeping them in ram, but the broker is still
running out of ram.

It seems as though, for some reason, it's keeping something in memory
for every message in a persistent queue, and once it hits 100% memory
usage it's game over.


What about creating clusters of consumers for each of your processing
stages to better-handle the load spikes? The messages will get
load-balanced across the consumers in a cluster.

Maarten Dirkse-2 wrote:
> An update:
> I've now tried it with the standard amqpersistence config:
>     <persistenceAdapter>
>       <amqPersistenceAdapter directory="/var/filterworks/amq/store"
> maxFileLength="32mb"/>
>     </persistenceAdapter>
> And it still results in the same problem. When I give the broker 128mb

> of memory, everything runs fine until it has processed about 126,000 
> messages. Then the memory usage hits 100%, and everything slows to a 
> crawl.
> Anyone have any ideas as to what's going on?
> Hi,
> I've spent quite a bit of time trying to find a solution to this 
> problem, but nothing that I've come across in the documentation or the

> mailing list seems to help, so here goes:
> I've got ActiveMQ set up to provide messaging between a chain of four 
> components, connected by three queues like so:
> Component 1 --> queue A --> component 2 --> queue B --> component 3 
> --> queue C --> component 4
> Component 1 gets things going by messages and putting them on queue A.
> Component 2 then processes these, and, for every message received, 
> puts a message on queue B. Component three then does the same thing as

> 2, generating an equal amount of message for component 4. Beside the 
> queue pipeline, components also communicate with each other using a
> topic which produces 2 messages for every queue message that goes 
> through the pipeline.
> All queues are (pure JDBC) persistent, topics are non-persistent.
> This setup works very well for low volumes of messages. At a certain 
> point though, the first component has to start producing messages at 
> rates of several thousand per minute. Component 2 can't consume the 
> messages at the rate they're produced, so the size of queue A slowly 
> grows into the thousands.
> The problem is that, once the size of queue A gets large enough, the 
> broker runs out of memory and everything will screech to a halt (ie 
> messages will be processed at a rate of 1 per minute or something). At

> around 256mb of broker memory for example, it reaches 100% usage when 
> queue A contains around 16.000 messages. The number of messages that 
> queue A can contain before activemq becomes unusably slow grows and 
> shrinks according to the amount of memory the broker has.
> What I don't get is why the broker slowly uses up all its memory for a

> queue that persists its messages to the database (this actually 
> happens, the database does fill up). I've read a number of times in 
> the documentation and mailing list that AMQ is capable of dealing with

> queues that contain millions of messages, but in my setup it grinds to

> a halt at a number far below that.
> Can anyone tell me what I'm doing wrong?
> Below is the setup used for the broker. Queues are defined in Spring 
> using <amq:queue> and producers and consumers use the Spring JMS 
> template and JMS Listeners respectively. I'm using AMQ 5.1 with 
> Postgresql for persistence and Spring 2.5.3 to configure everything.
> Broker config:
> <broker id="broker" useJmx="true" brokerName="${brokerA.name}"
> xmlns="http://activemq.apache.org/schema/core";
> dataDirectory="${activemq.data}">
>   <persistenceAdapter>
>     <jdbcPersistenceAdapter dataDirectory="${activemq.data}/jdbc"
> dataSource="#postgres-ds" createTablesOnStartup="true" />
>   </persistenceAdapter>
>   <systemUsage>
>     <systemUsage>
>       <memoryUsage>
>         <memoryUsage limit="128mb" />
>       </memoryUsage>
>     </systemUsage>
>   </systemUsage>
>   <transportConnectors>
>     <transportConnector uri="tcp://localhost:61616" />
>   </transportConnectors>
> </broker>
> Template and queue/topic config:
> <beans>
>   <amq:connectionFactory brokerURL="${broker.jmsUrl}" id="jmsFactory"
> dispatchAsync="true" />
>   <bean id="scf"
> class="org.springframework.jms.connection.SingleConnectionFactory"
> p:targetConnectionFactory-ref="jmsFactory" />
>   <bean class="org.springframework.jms.core.JmsTemplate"
> id="jmsTemplate" p:receiveTimeout="${jms.receivetimeout}"
> p:connectionFactory-ref="scf" />
>   <bean class="org.springframework.jms.core.JmsTemplate"
> id="commandTemplate" p:receiveTimeout="${jms.receivetimeout}"
> p:explicitQosEnabled="true"
>     p:priority="4" p:deliveryMode="2" p:timeToLive="120000"
> p:connectionFactory-ref="scf" p:defaultDestination-ref="commandTopic" 
> />
>   <amq:queue id="A" physicalName="queue.A" />
>   <amq:queue id="B" physicalName="queue.B" />
>   <amq:queue id="C" physicalName="queue.C" />
>   <amq:topic id="commandTopic" physicalName="command.topic" /> 
> </beans>
