Hi all!

I am struggeling to understand/make sense of a situation in ActiveMQ:

* a high-volume producer delivers messages to a queue (several thousand 
messages per second)
* one or several high-volume consumers consume messages from the same queue 
(CEF in the example below)
* as long as the consumers consume messages in an orderly fashion, the producer 
flow control will make sure that the queue is not flooded.
* however, if the consumers go offline/away for a short while, the producers 
are not limited, and will quickly fill the queue up to its resource limits
* When the queue is full, the producers hang on send() to ActiveMQ, which makes 
sense. If enabling sendFailIfNoSpaceAfterTimeout, the producers are interrupted 
on send() if waiting more than 5000 millis. This is ok.
* However, the _consumers_ do not receive any messages, hanging on 
receive(wait) for the specified time, and then returning null. 
* So, the queue is full with messages, but the consumers are unable to consume 
them!

I have tried disconnecting/restarting the consumers, producers, without luck.
I am using memoryPeristenceAdapter to provide max throughput, so restarting 
activemq clears the queues, but then the problem starts all over again….
Using LevelDB persistence adapter causes the same behaviour, only that 
restarting activemq brings back the queue with all the messages still there, 
and the consumers are still not able to consume them!

I the producer is stopped before the queue reaches resource limits, the 
consumers are able to consume the messages in the queue when they get back 
online.

What am I not understanding?
I have tried playing around with per-destination-policies 
(storeUsageHighWaterMark, cursorMemoryHighWaterMark, memoryLimit, useCache), 
systemUsage limits, and different persistence adapters, but I am unable to get 
this to work properly.

My current broker test config is shown below. I am using version 5.9.1.

Any suggestions are welcome.

Best regards
Joakim von Brandis

--

<broker xmlns="http://activemq.apache.org/schema/core"; brokerName="router1" 
dataDirectory="${activemq.data}" useJmx="true">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry queue="STATS" producerFlowControl="true" 
storeUsageHighWaterMark="70"> </policyEntry>
                <policyEntry queue="AGGR" producerFlowControl="true" 
storeUsageHighWaterMark="70"> </policyEntry>
                <policyEntry queue="CEF" producerFlowControl="true" 
storeUsageHighWaterMark="50" cursorMemoryHighWaterMark="50"> </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" 
storeUsageHighWaterMark="50"> </policyEntry>
                <policyEntry topic=">" producerFlowControl="true" 
storeUsageHighWaterMark="50"> </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <persistenceAdapter>
         <!-- <levelDB directory="activemq-data"/> -->
            <memoryPersistenceAdapter/>
        </persistenceAdapter>


          <systemUsage>
            <systemUsage sendFailIfNoSpaceAfterTimeout="5000">
                <memoryUsage>
                    <memoryUsage limit="1000 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1000 mb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
        <transportConnectors>
            <transportConnector name="listener" 
uri="tcp://0.0.0.0:4001?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
        </transportConnectors>

        <networkConnectors>
        </networkConnectors>

        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans"; 
class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

Reply via email to