Hi, take a look at Producer Flow Control
http://activemq.apache.org/producer-flow-control.html Cheers -- Dejan Bosanac Open Source Integration - http://fusesource.com/ ActiveMQ in Action - http://www.manning.com/snyder/ Blog - http://www.nighttale.net On Wed, Jul 22, 2009 at 5:59 AM, Scott Su <[email protected]> wrote: > > Hi, > > We are using ActiveMQ 5.2 for developing our application and encounter some > strange problems. > When we performing to performance testing, the ActiveMQ will suddenly stop. > > The test scenario is: > 1. Create about 500 producers, and send 100 messages each to the same queue > with a single consumer. > 2. The consumer will create 500 queues based on the identity of the > producer > and send about 9 messages. > 3. With or without consumers in the queues, the MQ will hang. > > I the jconsole, we can see there are 500 threads and some of them are > blocked by a thread whose status is waitng. > > If we restart ActiveMQ, it can be started but our application will fail to > create the connection unless we delete the files under /data/journal > > In activemq.log, there are lots of exceptions like: > > 2009-07-18 15:27:32,625 [/127.0.0.1:1344] ERROR Service > - Async error occurred: java.lang.IllegalStateException: Cannot remove a > consumer from a session that had not been registered: > 47289b75-0627-4ce6-947b-3450164727e0:1 > java.lang.IllegalStateException: Cannot remove a consumer from a session > that had not been registered: 47289b75-0627-4ce6-947b-3450164727e0:1 > at > > org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:557) > at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:64) > at > > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) > at > > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) > at > > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) > at > > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) > at > > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) > at > > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) > at java.lang.Thread.run(Unknown Source) > > And we will have warning while restarting MQ: > > 2009-07-18 18:25:56,000 [erSimpleAppMain] INFO AMQPersistenceAdapter > - AMQStore starting using directory: D:\Program Files\Trend > Micro\OfficeScan\Addon\TMSM\apache-activemq\bin\win32\..\..\data > 2009-07-18 18:25:56,046 [erSimpleAppMain] INFO KahaStore > - Kaha Store using data directory D:\Program Files\Trend > > Micro\OfficeScan\Addon\TMSM\apache-activemq\bin\win32\..\..\data\kr-store\state > 2009-07-18 18:25:56,125 [erSimpleAppMain] INFO AMQPersistenceAdapter > - Active data files: [2] > 2009-07-18 18:25:56,203 [erSimpleAppMain] INFO AMQPersistenceAdapter > - Aquired lock for AMQ StoreD:\Program Files\Trend > Micro\OfficeScan\Addon\TMSM\apache-activemq\bin\win32\..\..\data > 2009-07-18 18:25:56,203 [erSimpleAppMain] INFO BrokerService > - ActiveMQ 5.2.0 JMS Message Broker (localhost) is starting > 2009-07-18 18:25:56,203 [erSimpleAppMain] INFO BrokerService > - For help or more information please see: http://activemq.apache.org/ > 2009-07-18 <http://activemq.apache.org/%0A2009-07-18> 18:25:56,421 > [erSimpleAppMain] WARN AdvisoryBroker > - Failed to fire message master broker advisory > 2009-07-18 18:25:56,437 [erSimpleAppMain] INFO KahaStore > - Kaha Store using data directory D:\Program Files\Trend > > Micro\OfficeScan\Addon\TMSM\apache-activemq\bin\win32\..\..\data\kr-store\data > > > We have tried to use optimizedDispatch but the situation is getting worse. > Could some one help to see what we can do to solve the problem? Thanks a > lot. > > Below is the setting of activemq.xml > > -- > <beans > xmlns="http://www.springframework.org/schema/beans" > xmlns:amq="http://activemq.apache.org/schema/core" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans-2.0.xsd > http://activemq.apache.org/schema/core > http://activemq.apache.org/schema/core/activemq-core.xsd > http://activemq.apache.org/camel/schema/spring > http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> > > <!-- Allows us to use system properties as variables in this > configuration file --> > <bean > > class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> > <property name="locations"> > > <value>file:///${activemq.base}/conf/credentials.properties</value> > </property> > </bean> > > <broker xmlns="http://activemq.apache.org/schema/core" > brokerName="localhost" persistent ="true" > dataDirectory="${activemq.base}/data" useShutdownHook="false"> > > <!-- Destination specific policies using destination names or > wildcards --> > <destinationPolicy> > <policyMap> > <policyEntries> > <policyEntry queue=">" memoryLimit="5mb"/> > <policyEntry topic=">" memoryLimit="5mb"> > <!-- you can add other policies too such as these > <dispatchPolicy> > <strictOrderDispatchPolicy/> > </dispatchPolicy> > <subscriptionRecoveryPolicy> > <lastImageSubscriptionRecoveryPolicy/> > </subscriptionRecoveryPolicy> > --> > </policyEntry> > </policyEntries> > </policyMap> > </destinationPolicy> > > <!-- Use the following to configure how ActiveMQ is exposed in JMX > --> > <managementContext> > <managementContext createConnector="false"/> > </managementContext> > > <!-- The store and forward broker networks ActiveMQ will listen to > --> > <networkConnectors> > <!-- Example of a static configuration: > <networkConnector name="default-nc" uri="multicast://default"/> > <networkConnector name="host1 and host2" > uri="static://(tcp://host1:61616,tcp://host2:61616)"/> > --> > </networkConnectors> > > <persistenceAdapter> > <amqPersistenceAdapter maxCheckpointMessageAddSize="4 kb" > archiveDataLogs="false" syncOnWrite="false" > directory="${activemq.base}/data" maxFileLength="32 mb"/> > </persistenceAdapter> > > <sslContext> > <sslContext keyStore="file:${activemq.base}/conf/broker.ks" > keyStorePassword="password" > trustStore="file:${activemq.base}/conf/broker.ts" > trustStorePassword="password"/> > </sslContext> > > <!-- The maximum about of space the broker will use before slowing > down producers --> > <systemUsage> > <systemUsage> > <memoryUsage> > <memoryUsage limit="500 mb"/> > </memoryUsage> > <storeUsage> > <storeUsage limit="1 gb" name="foo"/> > </storeUsage> > <tempUsage> > <tempUsage limit="100 mb"/> > </tempUsage> > </systemUsage> > </systemUsage> > > > <!-- The transport connectors ActiveMQ will listen to --> > <transportConnectors> > <transportConnector name="openwire" > uri="tcp://localhost:61616"/> > <transportConnector name="ssl" uri="ssl://localhost:61617"/> > </transportConnectors> > > </broker> > > <!-- An embedded servlet engine for serving up the Admin console --> > <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> > <connectors> > <nioConnector port="8161"/> > </connectors> > > <handlers> > <webAppContext contextPath="/admin" > resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/> > <webAppContext contextPath="/demo" > resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/> > <webAppContext contextPath="/fileserver" > resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/> > </handlers> > </jetty> > > </beans> > > > > > -- > View this message in context: > http://www.nabble.com/Strange-ActiveMQ-hang-problem-tp24599723p24599723.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > >
