Hi All, 

I am trying to use connsumer.prefetchSize on destination, I want to limit
the no of messages sent from broker to consumer on destination basis, but it
is not working for me, but when is set "<policyEntry queue="&gt;"
queuePrefetch="2" >" on activemq.xml this is working properly, Please find
the below program I am not acknowledging the messages in CLIENT_ACKNOWLEDGE
, in this case I am receiving more than 2 messages from TEST.QUEUE5 (the
queue has 3 messages), but the below program works fine when (queueprefetch
applied on broker level). Please help 

import javax.jms.Message; 
import javax.jms.MessageListener; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQMessageConsumer; 
import org.apache.activemq.ActiveMQPrefetchPolicy; 
import org.apache.activemq.ActiveMQSession; 
import org.apache.activemq.command.ActiveMQQueue; 


public class Test implements MessageListener{ 
        
        public static void main(String s[]) throws Exception 
        { 
                ActiveMQConnection connection =null; 
                ActiveMQSession session =null; 
                
                try 
                { 
                        Test test = new Test(); 
                        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 
                        connection = (ActiveMQConnection)
factory.createConnection(); 
                        connection.start(); 
                        session = (ActiveMQSession)
connection.createSession(false,ActiveMQSession.CLIENT_ACKNOWLEDGE); 
                        ActiveMQQueue queue = new
ActiveMQQueue("TEST.QUEUE5?consumer.prefetchSize=2"); 
                        ActiveMQMessageConsumer acmgConsume =
(ActiveMQMessageConsumer) session.createConsumer(queue); 
                        System.out.println(acmgConsume.getPrefetchNumber()); 
                        //System.out.println(session.isAsyncDispatch()); 
                        acmgConsume.setMessageListener(test); 
                        
                        //System.out.println(acmgConsume.receive()); 
                        //System.out.println(acmgConsume.receive()); 
                        //System.out.println(acmgConsume.receive()); 
                        
                        //System.out.println(session.isAsyncDispatch()); 
                        while(true); 
                }catch(Exception e) 
                { 
                        e.printStackTrace(); 
                } finally 
                { 
                        session.close(); 
                        connection.stop(); 
                        connection.close(); 
                } 
                
        } 

        @Override 
        public void onMessage(Message arg0) { 
                // TODO Auto-generated method stub 
                
                System.out.println(arg0); 
        } 

} 

Output: 

2 
ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId =
ID:DA15060505-63172-1502968777734-3:1:1:1:1, originalDestination = null,
originalTransactionId = null, producerId =
ID:DA15060505-63172-1502968777734-3:1:1:1, destination =
queue://TEST.QUEUE5, transactionId = null, expiration = 0, timestamp =
1502969052352, arrival = 0, brokerInTime = 1502969052355, brokerOutTime =
1502970437587, correlationId = , replyTo = null, persistent = false, type =
, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
org.apache.activemq.util.ByteSequence@282ba1e, dataStructure = null,
redeliveryCounter = 10, size = 0, properties = {JMSXMessageCounter=1},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text =
Test Teja} 

ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId =
ID:DA15060505-63172-1502968777734-3:1:1:1:2, originalDestination = null,
originalTransactionId = null, producerId =
ID:DA15060505-63172-1502968777734-3:1:1:1, destination =
queue://TEST.QUEUE5, transactionId = null, expiration = 0, timestamp =
1502969052352, arrival = 0, brokerInTime = 1502969052356, brokerOutTime =
1502970437587, correlationId = , replyTo = null, persistent = false, type =
, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
org.apache.activemq.util.ByteSequence@13b6d03, dataStructure = null,
redeliveryCounter = 10, size = 0, properties = {JMSXMessageCounter=2},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text =
Test Teja} 

ActiveMQTextMessage {commandId = 7, responseRequired = false, messageId =
ID:DA15060505-63172-1502968777734-3:1:1:1:3, originalDestination = null,
originalTransactionId = null, producerId =
ID:DA15060505-63172-1502968777734-3:1:1:1, destination =
queue://TEST.QUEUE5, transactionId = null, expiration = 0, timestamp =
1502969052352, arrival = 0, brokerInTime = 1502969052358, brokerOutTime =
1502970437601, correlationId = , replyTo = null, persistent = false, type =
, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
org.apache.activemq.util.ByteSequence@f5f2bb7, dataStructure = null,
redeliveryCounter = 10, size = 0, properties = {JMSXMessageCounter=3},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text =
Test Teja} 



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/consumer-prefetchSize-option-on-destination-is-not-working-tp4729729.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to