We have noticed that the MemoryPercentageUsage attribute on some topics and
the CursorPercentageUsage on Queues within a broker increases until it
reaches 100 then the queues and topics no longer accept any more messages
(as you wouold expect), even through the queue size and number of inflight
messages is 0. 

   We are in a clustered environment with four brokers, two on each node in
the cluster providing one logical broker and two individual brokers. We are
also using JMSXgroupID to provide stickiness through the logical broker.

   We have run the system without clustering ActiveMQ, using the same
configuration but only on one node and found that the attributes above act
as expected, they increase when the queues start to back up but decrease
when the system processes all the messages, so when queue size is 0
MemoryPercentageUsage is 0. We have noticed that the "old gen" memory in the
JVM steadily increases when we see the percentage useage going up but it
remains constant at 0 when we are not clustered.

  It seems like something in clustering is holding onto a reference to the
message even though the message has been delivered causing a memory leak or
we have got the configuration wrong?

  We are using:
     active mq 5.4.2
     non persistent messaging
     java 1.6.0_20. 
     Active MQ is running in a stand alone JVM using activemq start
xbean:conf
     We are connecting via spring integration v 1.0.4 from a tomcat tc
server instance with tomcat 6.0.29

Here is simplified activemq configuration


<beans xmlns="http://www.springframework.org/schema/beans";
xmlns:amq="http://activemq.apache.org/schema/core";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 

xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
        http://activemq.apache.org/schema/core 
        http://activemq.apache.org/schema/core/activemq-core.xsd";>

        <!-- to start install and run "activemq xbean:activemq-config.xml" -->

        <!-- ActiveMQ Broker that is a logical broker shared across all cluster
nodes -->
        <broker xmlns="http://activemq.apache.org/schema/core"; useJmx="true"
persistent="false" brokerName="clustered">

                <destinationPolicy>
                        <policyMap>
                                <policyEntries>
                                        <policyEntry queue="entryQueue" 
producerFlowControl="true"
memoryLimit="50mb" enableAudit="false">
                                                <pendingQueuePolicy>
                                                        <vmQueueCursor />
                                                </pendingQueuePolicy>
                                        </policyEntry>
                                        <policyEntry topic="reportingTopic" 
producerFlowControl="true"
memoryLimit="50mb" enableAudit="false">
                                                <pendingSubscriberPolicy>
                                                        <vmCursor />
                                                </pendingSubscriberPolicy>
                                        </policyEntry>
                                </policyEntries>
                        </policyMap>
                </destinationPolicy>

                <destinations>
                        <queue physicalName="entryQueue" />
                        <topic physicalName="reportingTopic" />
                </destinations>

                <managementContext>
                        <managementContext createConnector="false" />
                </managementContext>

                <networkConnectors>
                        <networkConnector
uri="failover:(tcp://ip1:61616,tcp://ip2:61616)?jms.prefetchPolicy.all=1"
userName="user" password="password" prefetchSize="1" />
                </networkConnectors>

                <!-- Need authentication to stop creation of queues and topics 
on the fly
http://activemq.apache.org/how-do-i-create-new-destinations.html -->
                <plugins>
                        <simpleAuthenticationPlugin>
                                <users>
                                        <authenticationUser username="user" 
password="password" groups="users"
/>
                                </users>
                        </simpleAuthenticationPlugin>
                        <authorizationPlugin>
                                <map>
                                        <authorizationMap>
                                                <authorizationEntries>
                                                        <authorizationEntry 
queue="entryQueue" read="users" write="users" />
                                                        <authorizationEntry 
topic="reportingTopic" read="users" write="users"
/>
                                                        <!-- allow people to 
actually connect -->
                                                        <authorizationEntry 
topic="ActiveMQ.Advisory.>" read="users"
write="users" admin="users" />
                                                </authorizationEntries>

                                        </authorizationMap>
                                </map>
                        </authorizationPlugin>
                </plugins>

                <systemUsage>
            <systemUsage sendFailIfNoSpace="true">
                <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <!-- 
                <storeUsage>
                    <storeUsage limit="1 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
                 -->
            </systemUsage>
        </systemUsage>
                
                <transportConnectors>
                        <transportConnector uri="tcp://0.0.0.0:61616" />
                </transportConnectors>

        </broker>

        <!-- ActiveMQ Broker that is a stand alone broker accessed from this and
only this cluster node -->
        <broker xmlns="http://activemq.apache.org/schema/core"; useJmx="true"
persistent="false" brokerName="individual">
        
                <destinationPolicy>
                        <policyMap>
                                <policyEntries>
                                        <policyEntry queue="updateQueue" 
producerFlowControl="true"
enableAudit="false">
                                                <pendingQueuePolicy>
                                                        <vmQueueCursor />
                                                </pendingQueuePolicy>
                                        </policyEntry>
                                </policyEntries>
                        </policyMap>
                </destinationPolicy>

                <destinations>
                        <queue physicalName="updateQueue" />
                </destinations>

                <managementContext>
                        <managementContext createConnector="false" />
                </managementContext>

                <!-- Need authentication to stop creation of queues and topics 
on the fly
http://activemq.apache.org/how-do-i-create-new-destinations.html -->
                <plugins>
                        <simpleAuthenticationPlugin>
                                <users>
                                        <authenticationUser username="user" 
password="password" groups="users"
/>
                                </users>
                        </simpleAuthenticationPlugin>
                        <authorizationPlugin>
                                <map>
                                        <authorizationMap>
                                                <authorizationEntries>
                                                        <authorizationEntry 
queue="updateQueue" read="users" write="users" />
                                                </authorizationEntries>

                                        </authorizationMap>
                                </map>
                        </authorizationPlugin>
                </plugins>

                <transportConnectors>
                        <transportConnector uri="tcp://0.0.0.0:61618" />
                </transportConnectors>

        </broker>

</beans>


spring connection details


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd";>

        <bean id="clusteredConnectionFactoryBare"
class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL"
                
value="failover:(tcp://ip1:61616,tcp://ip2:61616)?jms.prefetchPolicy.all=1"
/>
                <property name="userName" value="user" />
                <property name="password" value="password" />
                <property name="alwaysSyncSend" value="true" /> 
                <!-- <property name="dispatchAsync" 
value="${async.dispatch:true}" /> -->
        </bean>
        
        <bean id="clusteredConnectionFactory"
                
class="org.springframework.jms.connection.CachingConnectionFactory">
                <property name="targetConnectionFactory"
ref="clusteredConnectionFactoryBare" />
                <property name="sessionCacheSize" value="10" />
        </bean>
        
        <bean id="individualConnectionFactoryBare"
class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL"
                        
value="${activemq.connection.factory.broker.url.individual}" />
                <property name="userName" value="user" />
                <property name="password" value="password" />
                <property name="alwaysSyncSend" value="true" /> 
                <!-- <property name="dispatchAsync" 
value="${async.dispatch:true}" /> -->
        </bean>
        
        <bean id="individualConnectionFactory"
                
class="org.springframework.jms.connection.CachingConnectionFactory">
                <property name="targetConnectionFactory"
ref="individualConnectionFactoryBare" />
                <property name="sessionCacheSize" value="10" />
        </bean>

        <bean id="updateQueue" 
class="org.apache.activemq.command.ActiveMQQueue">
                <constructor-arg value="updateQueue" />
        </bean>

        <bean id="entryQueue" class="org.apache.activemq.command.ActiveMQQueue">
                <constructor-arg value="entryQueue" />
        </bean>

        <bean id="reportingTopic"
class="org.apache.activemq.command.ActiveMQTopic">
                <constructor-arg value="reportingTopic" />
        </bean>

        <bean id="clusteredJmsTemplateParent"
class="org.springframework.jms.core.JmsTemplate"
                abstract="true">
                <property name="connectionFactory" 
ref="clusteredConnectionFactory" />
                <property name="deliveryPersistent"
                        
value="${activemq.jmsTemplateParent.deliveryPersistent}" />
        </bean>

        <bean id="entryQueueJmsTemplate" parent="clusteredJmsTemplateParent">
                <property name="defaultDestination" ref="entryQueue" />
        </bean>

        <bean id="individualJmsTemplateParent"
class="org.springframework.jms.core.JmsTemplate"
                abstract="true">
                <property name="connectionFactory" 
ref="individualConnectionFactory" />
                <property name="deliveryPersistent"
                        
value="${activemq.jmsTemplateParent.deliveryPersistent}" />
        </bean>

        <bean id="updateQueueJmsTemplate" parent="individualJmsTemplateParent">
                <property name="defaultDestination" ref="updateQueue" />
        </bean>

</beans>

-- 
View this message in context: 
http://activemq.2283324.n4.nabble.com/Network-of-Brokers-vmQueueCursor-has-memory-leak-tp3217582p3217582.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to