Hi everybody,

First, sorry for long mail and thanks for great job in ActiveMQ and
ActiveMQ-CPP.

I'm testing some failure cases with ActiveMQ and ActiveMQ-CPP.

In my test, an unexpected exception is thrown if broker is stopped
while consumer is in transaction.

Environment:
ActiveMQ 5.4.2
ActiveMQ-CPP 3.2.4
Ubuntu 10.04
JRE 1.6.0_20-b02


I get the following message:

terminate called after throwing an instance of 'cms::CMSException'
  what():  Failover timeout of 1000 ms reached.
Aborted (core dumped)

core backtrace:
#0  0x001ef422 in __kernel_vsyscall ()
#1  0x00c5a651 in *__GI_raise (sig=6) at
../nptl/sysdeps/unix/sysv/linux/raise.c:64
#2  0x00c5da82 in *__GI_abort () at abort.c:92
#3  0x00bf952f in __gnu_cxx::__verbose_terminate_handler() () from
/usr/lib/libstdc++.so.6
#4  0x00bf7465 in ?? () from /usr/lib/libstdc++.so.6
#5  0x00bf74a2 in std::terminate() () from /usr/lib/libstdc++.so.6
#6  0x00bf74c5 in ?? () from /usr/lib/libstdc++.so.6
#7  0x00bf6915 in __cxa_call_unexpected () from /usr/lib/libstdc++.so.6
#8  0x0052f8ae in
activemq::core::TransactionSynhcronization::beforeEnd (this=0x93ac548)
at activemq/core/ActiveMQConsumer.cpp:84
#9  0x00550588 in
activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x93b3e48)
at activemq/core/ActiveMQTransactionContext.cpp:192
#10 0x00550d63 in activemq::core::ActiveMQTransactionContext::commit
(this=0x93b3e48) at activemq/core/ActiveMQTransactionContext.cpp:127
#11 0x0053e460 in activemq::core::ActiveMQSession::commit
(this=0x93b3c10) at activemq/core/ActiveMQSession.cpp:189
#12 0x0042c1a0 in activemq::cmsutil::PooledSession::commit
(this=0x93b3fd8) at activemq/cmsutil/PooledSession.h:87
#13 0x0804c0fb in Consumer::onMessage (this=0x93a8c30,
message=0x93b49b0) at main.cpp:455
#14 0x0804bdc8 in Consumer::consumeLoop (this=0x93a8c30) at main.cpp:415
#15 0x0804bb56 in Consumer::run (this=0x93a8c30) at main.cpp:381
#16 0x00823cf1 in decaf::lang::ThreadProperties::runCallback
(properties=0x93abff0) at decaf/lang/Thread.cpp:135
#17 0x00822847 in threadWorker (arg=0x93abff0) at decaf/lang/Thread.cpp:188
#18 0x0015a96e in start_thread (arg=0xb6f8bb70) at pthread_create.c:300
#19 0x00cfda4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130


Exception specification for method
'activemq::core::TransactionSynhcronization::beforeEnd' says: 'throw(
exceptions::ActiveMQException )' but 'ActiveMQConsumer::acknowledge'
throws 'cms::CMSException', so 'std::unexpected' is called and 'abort'
occurs. 
(http://www.linuxprogrammingblog.com/cpp-exception-specifications-are-evil)


When 'MessageListener' is used, the commit freezes, probability, in thread 5:

Thread 1:
#0  0x00cfd422 in __kernel_vsyscall ()
#1  0x00ddcb5d in pthread_join (threadid=3061869424,
thread_return=0xbf964bdc) at pthread_join.c:89
#2  0x00a923c0 in decaf::lang::Thread::join (this=0x8eba574) at
decaf/lang/Thread.cpp:421
#3  0x0804c9f1 in AppTest::joinEndPoins (this=0xbf964c64) at main.cpp:586
#4  0x0804c678 in AppTest::run (this=0xbf964c64) at main.cpp:555
#5  0x0804a33c in main (argc=4, argv=0xbf964d84) at main.cpp:633

Thread 2:
#0  0x00cfd422 in __kernel_vsyscall ()
#1  0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at
../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122
#2  0x0034f9dd in __pthread_cond_wait (cond=0x8eb0e38,
mutex=0x8eb0e08) at forward.c:139
#3  0x00a606a9 in
decaf::internal::util::concurrent::ConditionImpl::wait
(condition=0x8eb0e38) at
decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94
#4  0x00ad1353 in decaf::util::concurrent::Mutex::wait
(this=0x8eb0d6c) at decaf/util/concurrent/Mutex.cpp:95
#5  0x007fc9c5 in activemq::threads::CompositeTaskRunner::run
(this=0x8eb0d48) at activemq/threads/CompositeTaskRunner.cpp:118
#6  0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
(properties=0x8eb0e90) at decaf/lang/Thread.cpp:135
#7  0x00a92847 in threadWorker (arg=0x8eb0e90) at decaf/lang/Thread.cpp:188
#8  0x00ddb96e in start_thread (arg=0xb7808b70) at pthread_create.c:300
#9  0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130

Thread 3:
#0  0x00cfd422 in __kernel_vsyscall ()
#1  0x00de2af9 in __lll_lock_wait () at
../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142
#2  0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0
#3  0x00dddf61 in __pthread_mutex_lock (mutex=0x8ebb3a8) at
pthread_mutex_lock.c:61
#4  0x0034fba6 in pthread_mutex_lock (mutex=0x8ebb3a8) at forward.c:182
#5  0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock
(handle=0x8ebb3a8) at
decaf/internal/util/concurrent/unix/MutexImpl.cpp:71
#6  0x00ad1412 in decaf::util::concurrent::Mutex::lock
(this=0x8ebb2bc) at decaf/util/concurrent/Mutex.cpp:75
#7  0x007c5770 in
decaf::util::StlQueue<decaf::lang::Pointer<activemq::commands::MessageDispatch,
decaf::util::concurrent::atomic::AtomicRefCounter> >::lock
(this=0x8ebb2a8)
    at ./decaf/util/StlQueue.h:253
#8  activemq::core::MessageDispatchChannel::lock (this=0x8ebb2a8) at
activemq/core/MessageDispatchChannel.h:153
#9  0x00ad0ab5 in decaf::util::concurrent::Lock::lock
(this=0xb7006fe4) at decaf/util/concurrent/Lock.cpp:54
#10 0x00ad0c08 in Lock (this=0xfffffe00, object=0x8ebb3a8,
intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32
#11 0x0078e102 in
activemq::core::ActiveMQConsumer::clearMessagesInProgress
(this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:1112
#12 0x007af15c in
activemq::core::ActiveMQSession::clearMessagesInProgress
(this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:239
#13 0x00778543 in
activemq::core::ActiveMQConnection::transportInterrupted
(this=0x8eb1210) at activemq/core/ActiveMQConnection.cpp:704
#14 0x00803b34 in
activemq::transport::TransportFilter::transportInterrupted
(this=0x8eb11c0) at activemq/transport/TransportFilter.cpp:67
#15 0x008187ae in
activemq::transport::failover::FailoverTransport::handleTransportFailure
(this=0x8eb0a38, error=...) at
activemq/transport/failover/FailoverTransport.cpp:476
#16 0x0082406c in
activemq::transport::failover::FailoverTransportListener::onException
(this=0x8eb0878, ex=...) at
activemq/transport/failover/FailoverTransportListener.cpp:97
#17 0x00803c0b in activemq::transport::TransportFilter::fire
(this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:49
#18 0x00803c64 in activemq::transport::TransportFilter::onException
(this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:41
#19 0x00803c0b in activemq::transport::TransportFilter::fire
(this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:49
#20 0x00803c64 in activemq::transport::TransportFilter::onException
(this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:41
#21 0x00801b13 in activemq::transport::IOTransport::fire
(this=0x8eb27a0, ex=...) at activemq/transport/IOTransport.cpp:73
#22 0x008023bf in activemq::transport::IOTransport::run
(this=0x8eb27a0) at activemq/transport/IOTransport.cpp:246
#23 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
(properties=0x8eb2ea0) at decaf/lang/Thread.cpp:135
#24 0x00a92847 in threadWorker (arg=0x8eb2ea0) at decaf/lang/Thread.cpp:188
#25 0x00ddb96e in start_thread (arg=0xb7007b70) at pthread_create.c:300
#26 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130

Thread 4:
#0  0x00cfd422 in __kernel_vsyscall ()
#1  0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at
../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122
#2  0x0034f9dd in __pthread_cond_wait (cond=0x8eba6c8,
mutex=0x8eba698) at forward.c:139
#3  0x00a606a9 in
decaf::internal::util::concurrent::ConditionImpl::wait
(condition=0x8eba6c8) at
decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94
#4  0x00ad1353 in decaf::util::concurrent::Mutex::wait
(this=0x8eba584) at decaf/util/concurrent/Mutex.cpp:95
#5  0x00ad029a in decaf::util::concurrent::CountDownLatch::await
(this=0x8eba57c) at decaf/util/concurrent/CountDownLatch.cpp:56
#6  0x0804af84 in JMSEndPointThread::stopCheck (this=0x8eba574,
timeOut=-1) at main.cpp:226
#7  0x0804bb46 in Consumer::run (this=0x8eba570) at main.cpp:379
#8  0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
(properties=0x8eba5b0) at decaf/lang/Thread.cpp:135
#9  0x00a92847 in threadWorker (arg=0x8eba5b0) at decaf/lang/Thread.cpp:188
#10 0x00ddb96e in start_thread (arg=0xb6806b70) at pthread_create.c:300
#11 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130

Thread 5:
#0  0x00cfd422 in __kernel_vsyscall ()
#1  0x00de2af9 in __lll_lock_wait () at
../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142
#2  0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0
#3  0x00dddf61 in __pthread_mutex_lock (mutex=0x8eb05e0) at
pthread_mutex_lock.c:61
#4  0x0034fba6 in pthread_mutex_lock (mutex=0x8eb05e0) at forward.c:182
#5  0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock
(handle=0x8eb05e0) at
decaf/internal/util/concurrent/unix/MutexImpl.cpp:71
#6  0x00ad1412 in decaf::util::concurrent::Mutex::lock
(this=0x8eb0a88) at decaf/util/concurrent/Mutex.cpp:75
#7  0x00ad0ab5 in decaf::util::concurrent::Lock::lock
(this=0xb6004ce0) at decaf/util/concurrent/Lock.cpp:54
#8  0x00ad0c08 in Lock (this=0xfffffe00, object=0x8eb05e0,
intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32
#9  0x0081b8fd in
activemq::transport::failover::FailoverTransport::oneway
(this=0x8eb0a38, command=...) at
activemq/transport/failover/FailoverTransport.cpp:186
#10 0x00807f9f in
activemq::transport::correlator::ResponseCorrelator::oneway
(this=0x8eb11c0, command=...) at
activemq/transport/correlator/ResponseCorrelator.cpp:82
#11 0x007713be in activemq::core::ActiveMQConnection::oneway
(this=0x8eb1210, command=...) at
activemq/core/ActiveMQConnection.cpp:741
#12 0x007b0a4f in activemq::core::ActiveMQSession::oneway
(this=0x8ebab90, command=...) at activemq/core/ActiveMQSession.cpp:903
#13 0x00795a1c in activemq::core::ActiveMQConsumer::acknowledge
(this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:860
#14 0x0079f885 in
activemq::core::TransactionSynhcronization::beforeEnd (this=0x8ebb9e8)
at activemq/core/ActiveMQConsumer.cpp:85
#15 0x007c0588 in
activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x8ebadc8)
at activemq/core/ActiveMQTransactionContext.cpp:192
#16 0x007c0d63 in activemq::core::ActiveMQTransactionContext::commit
(this=0x8ebadc8) at activemq/core/ActiveMQTransactionContext.cpp:127
#17 0x007ae460 in activemq::core::ActiveMQSession::commit
(this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:189
#18 0x0069c1a0 in activemq::cmsutil::PooledSession::commit
(this=0x8ebaf38) at activemq/cmsutil/PooledSession.h:87
#19 0x0804c0fb in Consumer::onMessage (this=0x8eba570,
message=0x8ebbec0) at main.cpp:455
#20 0x0079972f in activemq::core::ActiveMQConsumer::dispatch
(this=0x8ebb270, dispatch=...) at
activemq/core/ActiveMQConsumer.cpp:1018
#21 0x007bd5c3 in activemq::core::ActiveMQSessionExecutor::dispatch
(this=0x8ebae78, dispatch=...) at
activemq/core/ActiveMQSessionExecutor.cpp:129
#22 0x007bd993 in activemq::core::ActiveMQSessionExecutor::iterate
(this=0x8ebae78) at activemq/core/ActiveMQSessionExecutor.cpp:166
#23 0x008007f3 in activemq::threads::DedicatedTaskRunner::run
(this=0x8ebb550) at activemq/threads/DedicatedTaskRunner.cpp:111
#24 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
(properties=0x8ebaad8) at decaf/lang/Thread.cpp:135
#25 0x00a92847 in threadWorker (arg=0x8ebaad8) at decaf/lang/Thread.cpp:188
#26 0x00ddb96e in start_thread (arg=0xb6005b70) at pthread_create.c:300
#27 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130



I tried to resolve this issue changing configurations and code, but i
always get an error or client freezes.
Is it a bug? Any suggestions?

Thanks.
Henrique

How to reproduce:
1. Start ActiveMQ
2. Send a massage './activemqTest -p1 -n1'
3. Run consumer './activemqTest -c1 -d10000'
4. Stop activemq when 'Starting delay...' message appears.
5. After activemq is down and delay is finish, consumer try commit
(message 'Try commit') and unexpected exception occurs.

Following test code, based on ActiveMQ-CPP sample
'activemq-cpp-library-3.2.4/src/examples/main.cpp':


/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// START SNIPPET: demo

#include <activemq/library/ActiveMQCPP.h>
#include <activemq/cmsutil/SessionPool.h>
#include <activemq/cmsutil/PooledSession.h>
#include <activemq/cmsutil/ResourceLifecycleManager.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/System.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <iostream>
#include <memory>
#include <map>
#include <vector>
#include <unistd.h>

using namespace activemq::core;
using namespace activemq::cmsutil;
using namespace decaf::util::concurrent;
using namespace decaf::util;
using namespace decaf::lang;
using namespace cms;


class ConnPoolManager : protected decaf::util::concurrent::Mutex,
                        public ExceptionListener {
private:
    typedef std::map< int, activemq::cmsutil::SessionPool* > tSessionPoolMap;

    std::string _brokerURI;
    ConnectionFactory* _connectionFactory;
    Connection* _connection;
    ResourceLifecycleManager* _lifecycleManager;
    tSessionPoolMap _pools;
    bool _started;

public:
    ConnPoolManager() :
            _brokerURI("tcp://localhost:61616"),
            _connectionFactory(NULL),
            _connection(NULL),
            _lifecycleManager(NULL),
            _started(false)
    {
    }

    virtual ~ConnPoolManager() {
        cleanup();
    }

    void setBrokerURI(const std::string& uri) {
        _brokerURI = uri;
    }

    bool startPools() {
        synchronized(this) {
            if(_started) return true;
            try {
                createConnection();
                createPools();
                _started=true;
                return true;
            } catch(CMSException& e) {
                std::cout<<"(ConnPoolManager::startPools)
CMSException: " << e.getMessage() << std::endl;
            } catch(...) {
                std::cout<<"(ConnPoolManager::startPools) Unknown
exception!!!" << std::endl;
            }
        }
        cleanup();
        return false;
    }

    PooledSession* getSession(cms::Session::AcknowledgeMode ack) {
        if(_started) {
            return _pools[ack]->takeSession();
        } else if(startPools()) {
            return _pools[ack]->takeSession();
        } else {
            return NULL;
        }
    }

private:

    void createConnection(){
        // Create a ConnectionFactory
        _connectionFactory =
ConnectionFactory::createCMSConnectionFactory( _brokerURI );

        // Create a Connection
        _connection = _connectionFactory->createConnection();
        _connection->start();
        _connection->setExceptionListener(this);
    }

    void createPools(){
        // Create lifecycle manager
        _lifecycleManager = new activemq::cmsutil::ResourceLifecycleManager();

        // Create pools, one for each acknowledge type
        for(    int ack = cms::Session::AUTO_ACKNOWLEDGE;
                ack <= cms::Session::INDIVIDUAL_ACKNOWLEDGE;
                ack++)
        {
            _pools[ack] =  new activemq::cmsutil::SessionPool(
                    _connection,
                    static_cast<cms::Session::AcknowledgeMode>(ack),
                    _lifecycleManager);
        }
    }

    void cleanup() {
        synchronized(this) {
            releasePools();
            if(_lifecycleManager!=NULL) {
                try { delete _lifecycleManager;
                } catch(...) {}
            } _lifecycleManager = NULL;

            if(_connection!=NULL) {
                try { delete _connection;
                } catch(...) {}
            } _connection = NULL;
            if(_connectionFactory!=NULL) {
                try { delete _connectionFactory;
                } catch(...) {}
            } _connectionFactory = NULL;
            _started=false;
        }
    }

    void releasePools() {
        for(tSessionPoolMap::iterator it=_pools.begin();
it!=_pools.end(); it++) {
            try {
                delete it->second;
            } catch(...) { }
        }
        _pools.clear();
    }

    // If something bad happens you see it here as this class is also been
    // registered as an ExceptionListener with the connection.
    virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
        std::cout<<"(ConnPoolManager::onException) CMS Exception
occurred.  Shutting down client."<<std::endl;
        ex.printStackTrace();
        exit(1);
    }



};

class JMSEndPointThread  : public Thread {
private:
    CountDownLatch _stopNow;
    ConnPoolManager* _pools;
    std::string _destinationString;
    PooledSession* _session;
    Destination* _destination;
    bool _sessionTransacted;

protected:

    JMSEndPointThread(  ConnPoolManager* pools,
                        const std::string& dest,
                        bool sessionTransacted) :
            _stopNow(1),
            _pools(pools),
            _destinationString(dest),
            _session(NULL),
            _destination(NULL),
            _sessionTransacted(sessionTransacted)
    {
    }

    virtual bool prepareSession() {
        cleanup();
        // Create a Session
        if( _sessionTransacted ) {
            _session = _pools->getSession(Session::SESSION_TRANSACTED);
        } else {
            _session = _pools->getSession(Session::AUTO_ACKNOWLEDGE);
        }
        if(_session==NULL) return false;

        // Create the Queue destination
        _destination = _session->createQueue( _destinationString );
        return true;
    }

    PooledSession* getSession() { return _session; }
    Destination* getDestination() { return _destination; }
    bool getTransacted() { return _sessionTransacted; }

    bool stopCheck(int timeOut=0) {
        if(timeOut>=0) return _stopNow.await(timeOut);
        else _stopNow.await();
        return true;
    }

    virtual void cleanup() {
        // Destroy resources.
        try{
            if( _destination != NULL ) delete _destination;
        } catch(...) { }
        _destination = NULL;

        // Back session to pool
        try{
            if( _session != NULL ) _session->close();
        } catch(...) { }
        _session = NULL;
    }

public:

    typedef std::vector<JMSEndPointThread*> List;

    virtual ~JMSEndPointThread() {
        cleanup();
    }

    void stopNow() {
        _stopNow.countDown();
    }

};

class Producer : public JMSEndPointThread {
private:

    MessageProducer* _producer;
    int _delay;
    int _numMessages;

public:

    Producer(   ConnPoolManager* pools,
                const std::string& dest,
                int delay,
                int numMessages,
                bool sessionTransacted = false ) :
        JMSEndPointThread(pools,dest,sessionTransacted)
    {
        _producer = NULL;
        _delay = delay;
        _numMessages = numMessages;
    }

    virtual ~Producer() {
        cleanup();
    }

    virtual void run() {
        std::cout<<"Producer started!!!! Thread: "<<Thread::getId()<<std::endl;
        while(!stopCheck() && _numMessages) {
            try {
                if(prepareProducer()) {
                    sendMessages();
                    continue;
                }
            } catch(CMSException& e) {
                std::cout<<"(Producer::run) CMSException: " <<
e.getMessage() << std::endl;
            } catch(...) {
                std::cout<<"(Producer::run) Unknown exception!!!" << std::endl;
            }
            cleanup();
            stopCheck(5000); // reconnect delay
        }
        std::cout<<"Producer end!!!! Thread: "<<Thread::getId()<<std::endl;
        cleanup();
    }

private:

    bool prepareProducer() {
        // Get Session
        if(!prepareSession()) return false;
        // Create a MessageProducer from the Session to the Queue
        _producer = getSession()->createProducer( getDestination() );
        _producer->setDeliveryMode( DeliveryMode::PERSISTENT );
        return true;
    }
    void sendMessages() {
        // Create a messages
        std::stringstream text;
        text<<"Hello world! from thread " << Thread::getId();

        while(!stopCheck() && _numMessages) {

            TextMessage* message = getSession()->createTextMessage(
text.str() );
            message->setIntProperty( "Integer", _numMessages );

            // Tell the producer to send the message
            std::cout<<"Sent message #"<<_numMessages<<" from thread
"<<Thread::getId()<<std::endl;
            _producer->send( message );
            delete message;
            --_numMessages;

            stopCheck(_delay);
            if(getTransacted()) getSession()->commit();
        }
    }

    virtual void cleanup() {
        try{
            if( _producer != NULL ) delete _producer;
        } catch(...) { }
        _producer = NULL;

        JMSEndPointThread::cleanup();
    }

};


class Consumer :    public MessageListener,
                    public JMSEndPointThread {

private:

    MessageConsumer* _consumer;
    int _delay;
    bool _useListener;

public:

    Consumer(   ConnPoolManager* pools,
                const std::string& dest,
                int delay,
                bool useListener = true,
                bool sessionTransacted = true ) :
        JMSEndPointThread(pools,dest,sessionTransacted)
    {
        _consumer = NULL;
        _delay = delay;
        _useListener = useListener;
    }
    virtual ~Consumer(){
        cleanup();
    }

    virtual void run() {
        std::cout<<"Consumer started!!!! Thread: "<<Thread::getId()<<std::endl;
        while(!stopCheck()) {
            try {
                if(prepareConsumer()) {
                    // Wait while asynchronous messages come in.
                    if(_useListener) {
                        stopCheck(-1);
                    } else {
                        consumeLoop();
                    }
                    continue;
                }
            } catch (CMSException& e) {
                std::cout<<"(Consumer::run) CMSException: " <<
e.getMessage() << std::endl;
            } catch(...) {
                std::cout<<"(Consumer::run) Unknown exception!!!" << std::endl;
            }
            cleanup();
            stopCheck(5000); // reconnect delay
        }
        std::cout<<"Consumer end!!!! Thread: "<<Thread::getId()<<std::endl;
        cleanup();
    }


private:

    bool prepareConsumer() {
        // Get Session
        if(!prepareSession()) return false;
        // Create a MessageConsumer from the Session to the Queue
        _consumer = getSession()->createConsumer( getDestination() );
        if(_useListener)
            _consumer->setMessageListener( this );

        return true;
    }

    void consumeLoop() {
        while(!stopCheck()) {
            Message *msg = _consumer->receive(500);
            if(msg!=NULL) {
                onMessage(msg);
                delete msg;
            }
        }
    }

    // Called from the consumer since this class is a registered
MessageListener.
    virtual void onMessage( const Message* message ) {

        static int count = 0;

        try
        {
            count++;
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            std::string text = "";

            if( textMessage != NULL ) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }

            std::cout<<"Message #"<<count<<" Received: "<<text<<std::endl;
            if(_delay) {
                std::cout<<"Starting delay..."<<std::endl;
                stopCheck(_delay);
            }

        } catch (CMSException& e) {
            std::cout<<"(Consumer::onMessage) CMSException: " <<
e.getMessage() << std::endl;
        } catch(...) {
            std::cout<<"(Consumer::onMessage) Unknown exception!!!" <<
std::endl;
        }

        try
        {   // Commit all messages.
            if( getTransacted() ) {
                std::cout<<"Try commit....."<<std::endl;
                getSession()->commit();
                std::cout<<"Commit OK!!!"<<std::endl;
            }
        } catch (CMSException& e) {
            std::cout<<"(Consumer::onMessage) CMSException: " <<
e.getMessage() << std::endl;
        } catch(...) {
            std::cout<<"(Consumer::onMessage) Unknown exception!!!" <<
std::endl;
        }

    }

    virtual void cleanup() {
        try{
            if( _consumer != NULL ) delete _consumer;
        } catch (...) { }
        _consumer = NULL;
        JMSEndPointThread::cleanup();
    }
};


class ScopedActiveMQLibrary {
public:
    ScopedActiveMQLibrary() {
        activemq::library::ActiveMQCPP::initializeLibrary();
    }
    virtual ~ScopedActiveMQLibrary() {
        activemq::library::ActiveMQCPP::shutdownLibrary();
    }
};

class AppTest : private ScopedActiveMQLibrary {
private:
    int _consumer;
    int _producer;
    int _numMessages;
    bool _useListener;
    int _delay;
    ConnPoolManager _poolManager;
    JMSEndPointThread::List _endPointList;

    static AppTest* _app;
    static CountDownLatch _terminationRequest;

public:
    AppTest(int argc, char** argv) :
            _consumer(0),
            _producer(0),
            _numMessages(1),
            _useListener(false),
            _delay(0)
    {
        _app=this;
        int c;
        while ((c = getopt (argc, argv, "lc:p:n:d:")) != -1) {
            switch(c) {
            case 'l': _useListener=true; break;
            case 'c': _consumer=atoi(optarg); break;
            case 'p': _producer=atoi(optarg); break;
            case 'n': _numMessages=atoi(optarg); break;
            case 'd': _delay=atoi(optarg); break;
            }
        }
        if(_consumer<0) _consumer=0;
        if(_producer<0) _producer=0;
        _poolManager.setBrokerURI(
                "failover://("
                "tcp://localhost:61616"
                "?transport.useInactivityMonitor=false"
                ")?timeout=1000"
                "&cms.RedeliveryPolicy.maximumRedeliveries=-1"
                );
        _poolManager.startPools();
    }

    virtual ~AppTest() {
    }

    int run() {

        // Install signal handler
        if(!installSigAction())
            return 1;

        // Create consumer/producer objects
        loadEndPoins();

        // Start the producer/consumer thread.
        startEndPoins();

        // start another tasks
        // ...

        // Wait for termination request
        /*if(_consumer) {
            _terminationRequest.await();
            stopEndPoins();
        }*/

        // join threads
        joinEndPoins();

        return 0;
    }

private:

    void loadEndPoins() {
        for(int i=0; i<_consumer;i++) {
            _endPointList.push_back( new
Consumer(&_poolManager,"TEST.FOO",_delay,_useListener) );
        }
        for(int i=0; i<_producer;i++) {
            _endPointList.push_back( new
Producer(&_poolManager,"TEST.FOO",_delay,_numMessages) );
        }
    }

    void startEndPoins() {
        for(JMSEndPointThread::List::iterator it=_endPointList.begin();
            it!=_endPointList.end(); it++ )
            (**it).start();
    }

    void stopEndPoins() {
        for(JMSEndPointThread::List::reverse_iterator it=_endPointList.rbegin();
            it!=_endPointList.rend(); it++ )
            (**it).stopNow();
    }

    void joinEndPoins() {
        for(JMSEndPointThread::List::reverse_iterator it=_endPointList.rbegin();
            it!=_endPointList.rend(); it++ ) {
            (**it).join();
            delete *it;
            *it = NULL;
        }
        _endPointList.clear();
    }

    static void signalHandler(int sig) {
        _app->_terminationRequest.countDown();
        _app->stopEndPoins();
    }

    bool installSigAction() {
        struct sigaction action;
        memset(&action, 0, sizeof(action));
        action.sa_handler = signalHandler;
        if( sigaction(SIGTERM, &action, NULL)<0 ||
            sigaction(SIGQUIT, &action, NULL)<0 ||
            sigaction(SIGINT,  &action, NULL)<0 )
        { return false; }
        return true;
    }

    int termWait() {
        sigset_t sset;
        sigemptyset(&sset);
        sigaddset(&sset, SIGINT);
        sigaddset(&sset, SIGQUIT);
        sigaddset(&sset, SIGTERM);
        sigprocmask(SIG_BLOCK, &sset, NULL);
        int sig;
        sigwait(&sset, &sig);
        return sig;
    }

};
AppTest* AppTest::_app = NULL;
CountDownLatch AppTest::_terminationRequest(1);


void my_unexpected() {
    throw activemq::exceptions::ActiveMQException();
}

int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
    //std::set_unexpected(my_unexpected);
    AppTest test(argc, argv);
    return test.run();
}

// END SNIPPET: demo

Reply via email to