Hi,All. I am facing a simliar problem like this thread(http://www.nabble.com/Pending-Messages-are-shown-in-ActiveMQ-to20241332.html) but in a different environment.
My PC: ActiveMQ 5.1.0 Java SE 6 Update 10 Windows 2003 SP2 client api: activemq-cpp-2.2.1 Development tool: Visual Studio 2005 I have a simple test: A producer sent thousands of persistent messages (each 1k bytes )into a queue in async way. and then stopped. A consumer received message with listener start and tried to run up of the pending messages. Sometimes the consumer stopped to received even there were still many messages pending in the queue. I have tried to start the client and reconnect to the queue successfully. But the messages still remained in the queue (from webconsole I can see)and the onMessage handler was never called until the ActiveMQ was restarted. Just like the above thread saying, sometimes the Number Of Pending Message in webconsole is negative and not proper. If I restart ActiveMQ , the number become correct. here is my code of producer: ActiveMQMessageProducer is a producer wrapper class. int ActiveMQMessageProducer::connect(char* cbrokerURI,char* cqueueName) { ConnectionFactory* connectionFactory = NULL; try { string brokerURI=cbrokerURI; string queueName=cqueueName; // Create a ConnectionFactory connectionFactory = ConnectionFactory::createCMSConnectionFactory( brokerURI);//brokerURI must contain UseAsyncSend=true // Create a Connection connection = connectionFactory->createConnection(); ((Connection*)connection)->start(); // free the factory, we are done with it. delete connectionFactory; connectionFactory = NULL; // Create an Auto_Acknowledge Session session = ((Connection*)connection)->createSession( Session::AUTO_ACKNOWLEDGE ); destination =((Session*) session)->createQueue( queueName ); // Create a MessageProducer from the Session to the Topic or Queue producer = ((Session*)session)->createProducer( (Destination*)destination ); ((MessageProducer*)producer)->setDeliveryMode( DeliveryMode::PERSISTENT ); } catch( CMSException& e ) { if (connectionFactory) delete connectionFactory; connectionFactory = NULL; if (logCallbackFunc)//a function pointer { logCallbackFunc((char*)e.getMessage().c_str()); } return -1; } return 0; } int ActiveMQMessageProducer::sendMessage(char *message,size_t sizeOfMessage) { BytesMessage* bytesMessage=NULL; try { bytesMessage=((Session*)session)->createBytesMessage((unsigned char*)message,sizeOfMessage); ((MessageProducer*)producer)->send(bytesMessage); } catch(CMSException& e) { if (logCallbackFunc) { logCallbackFunc((char*)(e.getMessage().c_str())); } if (bytesMessage) delete bytesMessage; bytesMessage=NULL; return -1; } if (bytesMessage) delete bytesMessage; bytesMessage=NULL; return 0; } And below is code for consumer: ActiveMQConsumer is a wrapper class for consumer int ActiveMQMessageConsumer::connect(char *cbrokerURI,char *cqueueName) { ConnectionFactory* connectionFactory = NULL; try { string brokerURI=cbrokerURI; string queueName=cqueueName; // Create a ConnectionFactory connectionFactory = ConnectionFactory::createCMSConnectionFactory( brokerURI); // Create a Connection connection = connectionFactory->createConnection(); ((Connection*)connection)->start(); ((Connection*)connection)->setExceptionListener(((ConsumerListener*)listener)); // free the factory, we are done with it. delete connectionFactory; connectionFactory = NULL; // Create an Auto_Acknowledge Session session = ((Connection*)connection)->createSession( Session::AUTO_ACKNOWLEDGE ); destination =((Session*) session)->createQueue( queueName ); // Create a MessageProducer from the Session to the Topic or Queue consumer = ((Session*) session)->createConsumer( ((Destination*)destination )); ((MessageConsumer*)consumer)->setMessageListener(((ConsumerListener*)listener)); } catch( CMSException& e ) { if (connectionFactory) delete connectionFactory; connectionFactory = NULL; if (logCallbackFunc) { logCallbackFunc((char*)e.getMessage().c_str()); } return -1; } return 0; } ConsumerListener is implementation for MessageListener and ExceptionListener class ConsumerListener: public ExceptionListener, public MessageListener { private: ActiveMQMessageConsumer* consumer; public: ConsumerListener(ActiveMQMessageConsumer* messageConsumer) { consumer=messageConsumer; } virtual ~ConsumerListener() { consumer=NULL; } // Called from the consumer since this class is a registered MessageListener. virtual void onMessage( const Message* message ) { try { const BytesMessage* bytesMessage = dynamic_cast< const BytesMessage* >( message ); char* tempCharArray=new char(bytesMessage->getBodyLength()); memcpy(tempCharArray,(char*)(bytesMessage->getBodyBytes()),bytesMessage->getBodyLength()); delete []tempCharArray; tempCharArray=NULL; } catch(CMSException& e) { // do some stuff } return ; } // If something bad happens you see it here as this class is also been // registered as an ExceptionListener with the connection. virtual void onException( const CMSException& ex ) { try { //do some error handler stuff } catch(CMSException& e) { // } return ; } }; Anyone could help ? Thanks a lot. -- View this message in context: http://www.nabble.com/Consumer-Listener-stop-receving-message-until-ActiveMQ-restart-tp20355247p20355247.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.