Hi,All.
I am facing a simliar problem like this
thread(http://www.nabble.com/Pending-Messages-are-shown-in-ActiveMQ-to20241332.html)
but in a different environment.

My PC:
ActiveMQ 5.1.0
Java SE 6 Update 10
Windows 2003 SP2
client api: activemq-cpp-2.2.1

Development tool:
Visual Studio 2005

I have a simple test:
A producer sent  thousands of  persistent messages (each 1k bytes )into a
queue in async way. and then stopped.
A consumer received message with listener start and tried to run up of the
pending messages.

Sometimes the consumer stopped to received even there were still many
messages pending in the queue. I have tried to start the client and
reconnect to the queue successfully. But the messages still remained in the
queue (from webconsole I can see)and the onMessage handler was never called
until the ActiveMQ was restarted.

Just like the above thread saying, sometimes the Number Of Pending Message
in webconsole is negative and not proper. If I restart ActiveMQ , the number
become correct.

here is my code of producer:
ActiveMQMessageProducer is a producer wrapper class.
int ActiveMQMessageProducer::connect(char* cbrokerURI,char* cqueueName)
{
        ConnectionFactory* connectionFactory = NULL;

        try
        {
                string brokerURI=cbrokerURI;
                string queueName=cqueueName;
                
                // Create a ConnectionFactory
                connectionFactory = 
ConnectionFactory::createCMSConnectionFactory(
brokerURI);//brokerURI must contain UseAsyncSend=true

                // Create a Connection
                         connection = connectionFactory->createConnection();
                         ((Connection*)connection)->start();

                // free the factory, we are done with it.
                         delete connectionFactory;
                         connectionFactory = NULL;

                // Create an Auto_Acknowledge Session
                session = ((Connection*)connection)->createSession(
Session::AUTO_ACKNOWLEDGE );

                destination =((Session*) session)->createQueue( queueName );

                // Create a MessageProducer from the Session to the Topic or 
Queue
                         producer = ((Session*)session)->createProducer(
(Destination*)destination );
                         ((MessageProducer*)producer)->setDeliveryMode(
DeliveryMode::PERSISTENT );
        }
        catch( CMSException& e ) 
        {
                         if (connectionFactory)
                        delete connectionFactory;
                         connectionFactory = NULL;

                
                if (logCallbackFunc)//a function pointer
                {
                        logCallbackFunc((char*)e.getMessage().c_str());
                }

                return  -1;
        }
        return 0;

}

int ActiveMQMessageProducer::sendMessage(char *message,size_t sizeOfMessage)
{
        BytesMessage* bytesMessage=NULL;
        try
        {
                bytesMessage=((Session*)session)->createBytesMessage((unsigned
char*)message,sizeOfMessage);
                ((MessageProducer*)producer)->send(bytesMessage);

        }
        catch(CMSException& e)
        {
                
                if (logCallbackFunc)
                {
                        logCallbackFunc((char*)(e.getMessage().c_str()));
                }
                if (bytesMessage)
                        delete bytesMessage;

                bytesMessage=NULL;

                return  -1;
        }

        if (bytesMessage)
                delete bytesMessage;

        bytesMessage=NULL;

        return 0;
}

And below is code for consumer:
ActiveMQConsumer is a wrapper class for consumer

int ActiveMQMessageConsumer::connect(char *cbrokerURI,char *cqueueName)
{
        ConnectionFactory* connectionFactory = NULL;

        try
        {
                string brokerURI=cbrokerURI;
                string queueName=cqueueName;
                
                // Create a ConnectionFactory
                connectionFactory = 
ConnectionFactory::createCMSConnectionFactory(
brokerURI);

                // Create a Connection
                         connection = connectionFactory->createConnection();
                        ((Connection*)connection)->start();

        
((Connection*)connection)->setExceptionListener(((ConsumerListener*)listener));

                // free the factory, we are done with it.
                        delete connectionFactory;
                        connectionFactory = NULL;

                // Create an Auto_Acknowledge Session
                session = ((Connection*)connection)->createSession(
Session::AUTO_ACKNOWLEDGE );

                destination =((Session*) session)->createQueue( queueName );

                // Create a MessageProducer from the Session to the Topic or 
Queue        
                         consumer = ((Session*) session)->createConsumer(
((Destination*)destination ));
                        
((MessageConsumer*)consumer)->setMessageListener(((ConsumerListener*)listener));
        }
        catch( CMSException& e ) 
        {
                        if (connectionFactory)
                        delete connectionFactory;
                        connectionFactory = NULL;

                
                if (logCallbackFunc)
                {
                        logCallbackFunc((char*)e.getMessage().c_str());
                }

                return  -1;
        }
        return 0;

}

ConsumerListener is implementation for MessageListener and ExceptionListener

class ConsumerListener: public ExceptionListener,
                public MessageListener
{
private:
        ActiveMQMessageConsumer* consumer;

public:
        ConsumerListener(ActiveMQMessageConsumer* messageConsumer)
        {
                consumer=messageConsumer;
        }

        virtual ~ConsumerListener()
        {
                consumer=NULL;
        }

        // Called from the consumer since this class is a registered
MessageListener.
    virtual void onMessage( const Message* message )
        {
                
                try
                {
                        const BytesMessage* bytesMessage =
                dynamic_cast< const BytesMessage* >( message );
                        
                        char* tempCharArray=new 
char(bytesMessage->getBodyLength());
                
memcpy(tempCharArray,(char*)(bytesMessage->getBodyBytes()),bytesMessage->getBodyLength());
                                       delete []tempCharArray;
                                       tempCharArray=NULL;
                                
                        
                }
                catch(CMSException& e)
                {
                        // do some stuff
                }

                return ;
        }

        // If something bad happens you see it here as this class is also been
    // registered as an ExceptionListener with the connection.
    virtual void onException( const CMSException& ex )
        {
                try
                {                       
                        //do some error handler stuff
                        
                }
                catch(CMSException& e)
                {
                        //
                }
                return ;

    }
};

Anyone could help ?
Thanks a lot.


-- 
View this message in context: 
http://www.nabble.com/Consumer-Listener-stop-receving-message-until-ActiveMQ-restart-tp20355247p20355247.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to