Here is my ActiveMQ.xml:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true"> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> <policyEntry queue=">" producerFlowControl="false" memoryLimit="1mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="false"/> </managementContext> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="64 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:25055?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> </transportConnectors> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> </shutdownHooks> </broker> <import resource="jetty.xml"/> </beans> I have already included the producer and consumer uri flags I am using. I am using the async message listeners if that has anything to do with it. I have tried deleting and re-creating the Queue. My producer set up code looks like: Uri connectUri = new Uri(String.Format("activemq:tcp://{0}:{1}?wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=true&jms.useCompression=true", server, port)); if ((object)connectUri == null) throw new ArgumentNullException("connectUri"); IConnectionFactory factory = new NMSConnectionFactory(connectUri); if ((object)factory == null) throw new Exception("Unable to create connection factory."); _connection = factory.CreateConnection(); if ((object)_connection == null) throw new Exception("Unable to create connection."); _session = _connection.CreateSession(); if ((object)_session == null) throw new Exception("Unable to create session."); string[] arrQueues = Enum.GetNames(typeof(Queue)); if (arrQueues.Length > 0) { _destinations = new IDestination[arrQueues.Length]; for (int nIndex = 0; nIndex < arrQueues.Length; nIndex++) { _destinations[nIndex] = SessionUtil.GetDestination(_session, String.Format("queue://{0}", arrQueues[nIndex])); if ((object)_destinations[nIndex] == null) throw new Exception("Unable to get destination."); } _producers = new IMessageProducer[arrQueues.Length]; } _tempQueue = _session.CreateTemporaryQueue(); if ((object)_tempQueue == null) throw new Exception("Unable to create response queue."); _consumer = _session.CreateConsumer(_tempQueue); if ((object)_consumer == null) throw new Exception("Unable to create consumer."); else _consumer.Listener += consumer_Listener; _connection.Start(); So its just newing up the standard objects and keeping them standard. I create a single temporary queue per producer to get responses from the clients. Creating a message is: Guid guid = Guid.NewGuid(); if ((object)payload == null) throw new ArgumentNullException("payload"); IMessageProducer producer = _producers[(int)queue]; if ((object)producer == null) { _producers[(int)queue] = _session.CreateProducer(_destinations[(int)queue]); if ((object)_producers[(int)queue] == null) throw new Exception("Unable to create producer."); producer = _producers[(int)queue]; } IBytesMessage message = _session.CreateBytesMessage(Util.ObjectToByteArray(payload)); message.NMSCorrelationID = guid.ToString(); message.NMSReplyTo = _tempQueue; return new ActiveMQMessage(null, producer, null, message); The consumer set up is: Uri connectUri = new Uri(String.Format("activemq:tcp://{0}:{1}?wireFormat.tightEncodingEnabled=true&nms.PrefetchPolicy.QueuePrefetch=1", _strServer, _nPort)); if ((object)connectUri == null) throw new ArgumentNullException("connectUri"); IConnectionFactory factory = new NMSConnectionFactory(connectUri); using (IConnection connection = factory.CreateConnection()) { using (ISession session = connection.CreateSession()) { IDestination destination = SessionUtil.GetDestination(session, String.Format("queue://{0}", queue.ToString())); using (IMessageConsumer consumer = session.CreateConsumer(destination)) { MessageListener handler = (x) => { consumer_Listener(session, x); }; consumer.Listener += handler; connection.Start(); cancellationToken.WaitHandle.WaitOne(); consumer.Listener -= handler; } } } Let me know if you want to see any additional code :) -- View this message in context: http://activemq.2283324.n4.nabble.com/Multiple-consumers-not-consuming-tp4666078p4666087.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.