if the default session is auto ack, then with prefetch=1, a second
message will be dispatched as soon as the first is acked, which will
be before the listener is invoked.
Using client ack mode will avoid this dispatch till the ack is received.
An alternative is the use receive(timeout) in a loop and prefetch=0

On 19 April 2013 23:44, SledgeHammer <gro...@firstam.com> wrote:
> Here is my ActiveMQ.xml:
>
>
>
> <beans
>   xmlns="http://www.springframework.org/schema/beans";
>   xmlns:amq="http://activemq.apache.org/schema/core";
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>
>
>     <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.conf}/credentials.properties</value>
>         </property>
>     </bean>
>
>
>     <broker xmlns="http://activemq.apache.org/schema/core";
> brokerName="localhost" dataDirectory="${activemq.data}">
>
>
>
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" producerFlowControl="true">
>
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>                 <policyEntry queue=">" producerFlowControl="false"
> memoryLimit="1mb">
>
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>
>
>
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>
>
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>         </persistenceAdapter>
>
>
>
>           <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="64 mb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="100 gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="50 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>
>
>         <transportConnectors>
>
>             <transportConnector name="openwire"
> uri="tcp://0.0.0.0:25055?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
>             <transportConnector name="amqp"
> uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
>         </transportConnectors>
>
>
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans";
> class="org.apache.activemq.hooks.SpringContextHook" />
>         </shutdownHooks>
>
>     </broker>
>
>
>     <import resource="jetty.xml"/>
>
> </beans>
>
>
> I have already included the producer and consumer uri flags I am using. I am
> using the async message listeners if that has anything to do with it.
>
> I have tried deleting and re-creating the Queue.
>
> My producer set up code looks like:
>
>                         Uri connectUri = new
> Uri(String.Format("activemq:tcp://{0}:{1}?wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=true&jms.useCompression=true",
> server, port));
>
>                         if ((object)connectUri == null)
>                                 throw new ArgumentNullException("connectUri");
>
>                         IConnectionFactory factory = new 
> NMSConnectionFactory(connectUri);
>
>                         if ((object)factory == null)
>                                 throw new Exception("Unable to create 
> connection factory.");
>
>                         _connection = factory.CreateConnection();
>
>                         if ((object)_connection == null)
>                                 throw new Exception("Unable to create 
> connection.");
>
>                         _session = _connection.CreateSession();
>
>                         if ((object)_session == null)
>                                 throw new Exception("Unable to create 
> session.");
>
>                         string[] arrQueues = Enum.GetNames(typeof(Queue));
>
>                         if (arrQueues.Length > 0)
>                         {
>                                 _destinations = new 
> IDestination[arrQueues.Length];
>
>                                 for (int nIndex = 0; nIndex < 
> arrQueues.Length; nIndex++)
>                                 {
>                                         _destinations[nIndex] = 
> SessionUtil.GetDestination(_session,
> String.Format("queue://{0}", arrQueues[nIndex]));
>
>                                         if ((object)_destinations[nIndex] == 
> null)
>                                                 throw new Exception("Unable 
> to get destination.");
>                                 }
>
>                                 _producers = new 
> IMessageProducer[arrQueues.Length];
>                         }
>
>                         _tempQueue = _session.CreateTemporaryQueue();
>
>                         if ((object)_tempQueue == null)
>                                 throw new Exception("Unable to create 
> response queue.");
>
>                         _consumer = _session.CreateConsumer(_tempQueue);
>
>                         if ((object)_consumer == null)
>                                 throw new Exception("Unable to create 
> consumer.");
>                         else
>                                 _consumer.Listener += consumer_Listener;
>
>                         _connection.Start();
>
>
> So its just newing up the standard objects and keeping them standard. I
> create a single temporary queue per producer to get responses from the
> clients.
>
> Creating a message is:
>
>                         Guid guid = Guid.NewGuid();
>
>                         if ((object)payload == null)
>                                 throw new ArgumentNullException("payload");
>
>                         IMessageProducer producer = _producers[(int)queue];
>
>                         if ((object)producer == null)
>                         {
>                                 _producers[(int)queue] =
> _session.CreateProducer(_destinations[(int)queue]);
>
>                                 if ((object)_producers[(int)queue] == null)
>                                         throw new Exception("Unable to create 
> producer.");
>
>                                 producer = _producers[(int)queue];
>                         }
>
>                         IBytesMessage message =
> _session.CreateBytesMessage(Util.ObjectToByteArray(payload));
>
>                         message.NMSCorrelationID = guid.ToString();
>                         message.NMSReplyTo = _tempQueue;
>
>                         return new ActiveMQMessage(null, producer, null, 
> message);
>
>
> The consumer set up is:
>
>                         Uri connectUri = new
> Uri(String.Format("activemq:tcp://{0}:{1}?wireFormat.tightEncodingEnabled=true&nms.PrefetchPolicy.QueuePrefetch=1",
> _strServer, _nPort));
>
>                         if ((object)connectUri == null)
>                                 throw new ArgumentNullException("connectUri");
>
>                         IConnectionFactory factory = new 
> NMSConnectionFactory(connectUri);
>
>                         using (IConnection connection = 
> factory.CreateConnection())
>                         {
>                                 using (ISession session = 
> connection.CreateSession())
>                                 {
>                                         IDestination destination = 
> SessionUtil.GetDestination(session,
> String.Format("queue://{0}", queue.ToString()));
>
>                                         using (IMessageConsumer consumer = 
> session.CreateConsumer(destination))
>                                         {
>                                                 MessageListener handler = (x) 
> => { consumer_Listener(session, x); };
>                                                 consumer.Listener += handler;
>                                                 connection.Start();
>                                                 
> cancellationToken.WaitHandle.WaitOne();
>                                                 consumer.Listener -= handler;
>                                         }
>                                 }
>                         }
>
> Let me know if you want to see any additional code :)
>
>
>
>
> --
> View this message in context: 
> http://activemq.2283324.n4.nabble.com/Multiple-consumers-not-consuming-tp4666078p4666087.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.



-- 
http://redhat.com
http://blog.garytully.com

Reply via email to