On Tue, 2011-03-01 at 20:22 -0300, Henrique Magarotto wrote:
> Hi everybody,
> 
> First, sorry for long mail and thanks for great job in ActiveMQ and
> ActiveMQ-CPP.
> 
> I'm testing some failure cases with ActiveMQ and ActiveMQ-CPP.
> 
> In my test, an unexpected exception is thrown if broker is stopped
> while consumer is in transaction.
> 
> Environment:
> ActiveMQ 5.4.2
> ActiveMQ-CPP 3.2.4
> Ubuntu 10.04
> JRE 1.6.0_20-b02
> 

> I get the following message:
> 
> terminate called after throwing an instance of 'cms::CMSException'
>   what():  Failover timeout of 1000 ms reached.
> Aborted (core dumped)
> 

Sounds like a bug, can you open a new issue in are Jira bug tracker and
attach your test case along with instructions for reproducing the error?

https://issues.apache.org/jira/browse/AMQCPP

Regards
Tim.

> core backtrace:
> #0  0x001ef422 in __kernel_vsyscall ()
> #1  0x00c5a651 in *__GI_raise (sig=6) at
> ../nptl/sysdeps/unix/sysv/linux/raise.c:64
> #2  0x00c5da82 in *__GI_abort () at abort.c:92
> #3  0x00bf952f in __gnu_cxx::__verbose_terminate_handler() () from
> /usr/lib/libstdc++.so.6
> #4  0x00bf7465 in ?? () from /usr/lib/libstdc++.so.6
> #5  0x00bf74a2 in std::terminate() () from /usr/lib/libstdc++.so.6
> #6  0x00bf74c5 in ?? () from /usr/lib/libstdc++.so.6
> #7  0x00bf6915 in __cxa_call_unexpected () from /usr/lib/libstdc++.so.6
> #8  0x0052f8ae in
> activemq::core::TransactionSynhcronization::beforeEnd (this=0x93ac548)
> at activemq/core/ActiveMQConsumer.cpp:84
> #9  0x00550588 in
> activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x93b3e48)
> at activemq/core/ActiveMQTransactionContext.cpp:192
> #10 0x00550d63 in activemq::core::ActiveMQTransactionContext::commit
> (this=0x93b3e48) at activemq/core/ActiveMQTransactionContext.cpp:127
> #11 0x0053e460 in activemq::core::ActiveMQSession::commit
> (this=0x93b3c10) at activemq/core/ActiveMQSession.cpp:189
> #12 0x0042c1a0 in activemq::cmsutil::PooledSession::commit
> (this=0x93b3fd8) at activemq/cmsutil/PooledSession.h:87
> #13 0x0804c0fb in Consumer::onMessage (this=0x93a8c30,
> message=0x93b49b0) at main.cpp:455
> #14 0x0804bdc8 in Consumer::consumeLoop (this=0x93a8c30) at main.cpp:415
> #15 0x0804bb56 in Consumer::run (this=0x93a8c30) at main.cpp:381
> #16 0x00823cf1 in decaf::lang::ThreadProperties::runCallback
> (properties=0x93abff0) at decaf/lang/Thread.cpp:135
> #17 0x00822847 in threadWorker (arg=0x93abff0) at decaf/lang/Thread.cpp:188
> #18 0x0015a96e in start_thread (arg=0xb6f8bb70) at pthread_create.c:300
> #19 0x00cfda4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> 
> 
> Exception specification for method
> 'activemq::core::TransactionSynhcronization::beforeEnd' says: 'throw(
> exceptions::ActiveMQException )' but 'ActiveMQConsumer::acknowledge'
> throws 'cms::CMSException', so 'std::unexpected' is called and 'abort'
> occurs. 
> (http://www.linuxprogrammingblog.com/cpp-exception-specifications-are-evil)
> 
> 
> When 'MessageListener' is used, the commit freezes, probability, in thread 5:
> 
> Thread 1:
> #0  0x00cfd422 in __kernel_vsyscall ()
> #1  0x00ddcb5d in pthread_join (threadid=3061869424,
> thread_return=0xbf964bdc) at pthread_join.c:89
> #2  0x00a923c0 in decaf::lang::Thread::join (this=0x8eba574) at
> decaf/lang/Thread.cpp:421
> #3  0x0804c9f1 in AppTest::joinEndPoins (this=0xbf964c64) at main.cpp:586
> #4  0x0804c678 in AppTest::run (this=0xbf964c64) at main.cpp:555
> #5  0x0804a33c in main (argc=4, argv=0xbf964d84) at main.cpp:633
> 
> Thread 2:
> #0  0x00cfd422 in __kernel_vsyscall ()
> #1  0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at
> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122
> #2  0x0034f9dd in __pthread_cond_wait (cond=0x8eb0e38,
> mutex=0x8eb0e08) at forward.c:139
> #3  0x00a606a9 in
> decaf::internal::util::concurrent::ConditionImpl::wait
> (condition=0x8eb0e38) at
> decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94
> #4  0x00ad1353 in decaf::util::concurrent::Mutex::wait
> (this=0x8eb0d6c) at decaf/util/concurrent/Mutex.cpp:95
> #5  0x007fc9c5 in activemq::threads::CompositeTaskRunner::run
> (this=0x8eb0d48) at activemq/threads/CompositeTaskRunner.cpp:118
> #6  0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
> (properties=0x8eb0e90) at decaf/lang/Thread.cpp:135
> #7  0x00a92847 in threadWorker (arg=0x8eb0e90) at decaf/lang/Thread.cpp:188
> #8  0x00ddb96e in start_thread (arg=0xb7808b70) at pthread_create.c:300
> #9  0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> 
> Thread 3:
> #0  0x00cfd422 in __kernel_vsyscall ()
> #1  0x00de2af9 in __lll_lock_wait () at
> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142
> #2  0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0
> #3  0x00dddf61 in __pthread_mutex_lock (mutex=0x8ebb3a8) at
> pthread_mutex_lock.c:61
> #4  0x0034fba6 in pthread_mutex_lock (mutex=0x8ebb3a8) at forward.c:182
> #5  0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock
> (handle=0x8ebb3a8) at
> decaf/internal/util/concurrent/unix/MutexImpl.cpp:71
> #6  0x00ad1412 in decaf::util::concurrent::Mutex::lock
> (this=0x8ebb2bc) at decaf/util/concurrent/Mutex.cpp:75
> #7  0x007c5770 in
> decaf::util::StlQueue<decaf::lang::Pointer<activemq::commands::MessageDispatch,
> decaf::util::concurrent::atomic::AtomicRefCounter> >::lock
> (this=0x8ebb2a8)
>     at ./decaf/util/StlQueue.h:253
> #8  activemq::core::MessageDispatchChannel::lock (this=0x8ebb2a8) at
> activemq/core/MessageDispatchChannel.h:153
> #9  0x00ad0ab5 in decaf::util::concurrent::Lock::lock
> (this=0xb7006fe4) at decaf/util/concurrent/Lock.cpp:54
> #10 0x00ad0c08 in Lock (this=0xfffffe00, object=0x8ebb3a8,
> intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32
> #11 0x0078e102 in
> activemq::core::ActiveMQConsumer::clearMessagesInProgress
> (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:1112
> #12 0x007af15c in
> activemq::core::ActiveMQSession::clearMessagesInProgress
> (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:239
> #13 0x00778543 in
> activemq::core::ActiveMQConnection::transportInterrupted
> (this=0x8eb1210) at activemq/core/ActiveMQConnection.cpp:704
> #14 0x00803b34 in
> activemq::transport::TransportFilter::transportInterrupted
> (this=0x8eb11c0) at activemq/transport/TransportFilter.cpp:67
> #15 0x008187ae in
> activemq::transport::failover::FailoverTransport::handleTransportFailure
> (this=0x8eb0a38, error=...) at
> activemq/transport/failover/FailoverTransport.cpp:476
> #16 0x0082406c in
> activemq::transport::failover::FailoverTransportListener::onException
> (this=0x8eb0878, ex=...) at
> activemq/transport/failover/FailoverTransportListener.cpp:97
> #17 0x00803c0b in activemq::transport::TransportFilter::fire
> (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:49
> #18 0x00803c64 in activemq::transport::TransportFilter::onException
> (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:41
> #19 0x00803c0b in activemq::transport::TransportFilter::fire
> (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:49
> #20 0x00803c64 in activemq::transport::TransportFilter::onException
> (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:41
> #21 0x00801b13 in activemq::transport::IOTransport::fire
> (this=0x8eb27a0, ex=...) at activemq/transport/IOTransport.cpp:73
> #22 0x008023bf in activemq::transport::IOTransport::run
> (this=0x8eb27a0) at activemq/transport/IOTransport.cpp:246
> #23 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
> (properties=0x8eb2ea0) at decaf/lang/Thread.cpp:135
> #24 0x00a92847 in threadWorker (arg=0x8eb2ea0) at decaf/lang/Thread.cpp:188
> #25 0x00ddb96e in start_thread (arg=0xb7007b70) at pthread_create.c:300
> #26 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> 
> Thread 4:
> #0  0x00cfd422 in __kernel_vsyscall ()
> #1  0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at
> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122
> #2  0x0034f9dd in __pthread_cond_wait (cond=0x8eba6c8,
> mutex=0x8eba698) at forward.c:139
> #3  0x00a606a9 in
> decaf::internal::util::concurrent::ConditionImpl::wait
> (condition=0x8eba6c8) at
> decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94
> #4  0x00ad1353 in decaf::util::concurrent::Mutex::wait
> (this=0x8eba584) at decaf/util/concurrent/Mutex.cpp:95
> #5  0x00ad029a in decaf::util::concurrent::CountDownLatch::await
> (this=0x8eba57c) at decaf/util/concurrent/CountDownLatch.cpp:56
> #6  0x0804af84 in JMSEndPointThread::stopCheck (this=0x8eba574,
> timeOut=-1) at main.cpp:226
> #7  0x0804bb46 in Consumer::run (this=0x8eba570) at main.cpp:379
> #8  0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
> (properties=0x8eba5b0) at decaf/lang/Thread.cpp:135
> #9  0x00a92847 in threadWorker (arg=0x8eba5b0) at decaf/lang/Thread.cpp:188
> #10 0x00ddb96e in start_thread (arg=0xb6806b70) at pthread_create.c:300
> #11 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> 
> Thread 5:
> #0  0x00cfd422 in __kernel_vsyscall ()
> #1  0x00de2af9 in __lll_lock_wait () at
> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142
> #2  0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0
> #3  0x00dddf61 in __pthread_mutex_lock (mutex=0x8eb05e0) at
> pthread_mutex_lock.c:61
> #4  0x0034fba6 in pthread_mutex_lock (mutex=0x8eb05e0) at forward.c:182
> #5  0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock
> (handle=0x8eb05e0) at
> decaf/internal/util/concurrent/unix/MutexImpl.cpp:71
> #6  0x00ad1412 in decaf::util::concurrent::Mutex::lock
> (this=0x8eb0a88) at decaf/util/concurrent/Mutex.cpp:75
> #7  0x00ad0ab5 in decaf::util::concurrent::Lock::lock
> (this=0xb6004ce0) at decaf/util/concurrent/Lock.cpp:54
> #8  0x00ad0c08 in Lock (this=0xfffffe00, object=0x8eb05e0,
> intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32
> #9  0x0081b8fd in
> activemq::transport::failover::FailoverTransport::oneway
> (this=0x8eb0a38, command=...) at
> activemq/transport/failover/FailoverTransport.cpp:186
> #10 0x00807f9f in
> activemq::transport::correlator::ResponseCorrelator::oneway
> (this=0x8eb11c0, command=...) at
> activemq/transport/correlator/ResponseCorrelator.cpp:82
> #11 0x007713be in activemq::core::ActiveMQConnection::oneway
> (this=0x8eb1210, command=...) at
> activemq/core/ActiveMQConnection.cpp:741
> #12 0x007b0a4f in activemq::core::ActiveMQSession::oneway
> (this=0x8ebab90, command=...) at activemq/core/ActiveMQSession.cpp:903
> #13 0x00795a1c in activemq::core::ActiveMQConsumer::acknowledge
> (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:860
> #14 0x0079f885 in
> activemq::core::TransactionSynhcronization::beforeEnd (this=0x8ebb9e8)
> at activemq/core/ActiveMQConsumer.cpp:85
> #15 0x007c0588 in
> activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x8ebadc8)
> at activemq/core/ActiveMQTransactionContext.cpp:192
> #16 0x007c0d63 in activemq::core::ActiveMQTransactionContext::commit
> (this=0x8ebadc8) at activemq/core/ActiveMQTransactionContext.cpp:127
> #17 0x007ae460 in activemq::core::ActiveMQSession::commit
> (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:189
> #18 0x0069c1a0 in activemq::cmsutil::PooledSession::commit
> (this=0x8ebaf38) at activemq/cmsutil/PooledSession.h:87
> #19 0x0804c0fb in Consumer::onMessage (this=0x8eba570,
> message=0x8ebbec0) at main.cpp:455
> #20 0x0079972f in activemq::core::ActiveMQConsumer::dispatch
> (this=0x8ebb270, dispatch=...) at
> activemq/core/ActiveMQConsumer.cpp:1018
> #21 0x007bd5c3 in activemq::core::ActiveMQSessionExecutor::dispatch
> (this=0x8ebae78, dispatch=...) at
> activemq/core/ActiveMQSessionExecutor.cpp:129
> #22 0x007bd993 in activemq::core::ActiveMQSessionExecutor::iterate
> (this=0x8ebae78) at activemq/core/ActiveMQSessionExecutor.cpp:166
> #23 0x008007f3 in activemq::threads::DedicatedTaskRunner::run
> (this=0x8ebb550) at activemq/threads/DedicatedTaskRunner.cpp:111
> #24 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
> (properties=0x8ebaad8) at decaf/lang/Thread.cpp:135
> #25 0x00a92847 in threadWorker (arg=0x8ebaad8) at decaf/lang/Thread.cpp:188
> #26 0x00ddb96e in start_thread (arg=0xb6005b70) at pthread_create.c:300
> #27 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> 
> 
> 
> I tried to resolve this issue changing configurations and code, but i
> always get an error or client freezes.
> Is it a bug? Any suggestions?
> 
> Thanks.
> Henrique
> 
> How to reproduce:
> 1. Start ActiveMQ
> 2. Send a massage './activemqTest -p1 -n1'
> 3. Run consumer './activemqTest -c1 -d10000'
> 4. Stop activemq when 'Starting delay...' message appears.
> 5. After activemq is down and delay is finish, consumer try commit
> (message 'Try commit') and unexpected exception occurs.
> 
> Following test code, based on ActiveMQ-CPP sample
> 'activemq-cpp-library-3.2.4/src/examples/main.cpp':
> 
> 
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> 
> // START SNIPPET: demo
> 
> #include <activemq/library/ActiveMQCPP.h>
> #include <activemq/cmsutil/SessionPool.h>
> #include <activemq/cmsutil/PooledSession.h>
> #include <activemq/cmsutil/ResourceLifecycleManager.h>
> #include <activemq/exceptions/ActiveMQException.h>
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <decaf/util/concurrent/Mutex.h>
> #include <decaf/lang/Integer.h>
> #include <decaf/lang/Long.h>
> #include <decaf/lang/System.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Config.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <iostream>
> #include <memory>
> #include <map>
> #include <vector>
> #include <unistd.h>
> 
> using namespace activemq::core;
> using namespace activemq::cmsutil;
> using namespace decaf::util::concurrent;
> using namespace decaf::util;
> using namespace decaf::lang;
> using namespace cms;
> 
> 
> class ConnPoolManager : protected decaf::util::concurrent::Mutex,
>                         public ExceptionListener {
> private:
>     typedef std::map< int, activemq::cmsutil::SessionPool* > tSessionPoolMap;
> 
>     std::string _brokerURI;
>     ConnectionFactory* _connectionFactory;
>     Connection* _connection;
>     ResourceLifecycleManager* _lifecycleManager;
>     tSessionPoolMap _pools;
>     bool _started;
> 
> public:
>     ConnPoolManager() :
>             _brokerURI("tcp://localhost:61616"),
>             _connectionFactory(NULL),
>             _connection(NULL),
>             _lifecycleManager(NULL),
>             _started(false)
>     {
>     }
> 
>     virtual ~ConnPoolManager() {
>         cleanup();
>     }
> 
>     void setBrokerURI(const std::string& uri) {
>         _brokerURI = uri;
>     }
> 
>     bool startPools() {
>         synchronized(this) {
>             if(_started) return true;
>             try {
>                 createConnection();
>                 createPools();
>                 _started=true;
>                 return true;
>             } catch(CMSException& e) {
>                 std::cout<<"(ConnPoolManager::startPools)
> CMSException: " << e.getMessage() << std::endl;
>             } catch(...) {
>                 std::cout<<"(ConnPoolManager::startPools) Unknown
> exception!!!" << std::endl;
>             }
>         }
>         cleanup();
>         return false;
>     }
> 
>     PooledSession* getSession(cms::Session::AcknowledgeMode ack) {
>         if(_started) {
>             return _pools[ack]->takeSession();
>         } else if(startPools()) {
>             return _pools[ack]->takeSession();
>         } else {
>             return NULL;
>         }
>     }
> 
> private:
> 
>     void createConnection(){
>         // Create a ConnectionFactory
>         _connectionFactory =
> ConnectionFactory::createCMSConnectionFactory( _brokerURI );
> 
>         // Create a Connection
>         _connection = _connectionFactory->createConnection();
>         _connection->start();
>         _connection->setExceptionListener(this);
>     }
> 
>     void createPools(){
>         // Create lifecycle manager
>         _lifecycleManager = new activemq::cmsutil::ResourceLifecycleManager();
> 
>         // Create pools, one for each acknowledge type
>         for(    int ack = cms::Session::AUTO_ACKNOWLEDGE;
>                 ack <= cms::Session::INDIVIDUAL_ACKNOWLEDGE;
>                 ack++)
>         {
>             _pools[ack] =  new activemq::cmsutil::SessionPool(
>                     _connection,
>                     static_cast<cms::Session::AcknowledgeMode>(ack),
>                     _lifecycleManager);
>         }
>     }
> 
>     void cleanup() {
>         synchronized(this) {
>             releasePools();
>             if(_lifecycleManager!=NULL) {
>                 try { delete _lifecycleManager;
>                 } catch(...) {}
>             } _lifecycleManager = NULL;
> 
>             if(_connection!=NULL) {
>                 try { delete _connection;
>                 } catch(...) {}
>             } _connection = NULL;
>             if(_connectionFactory!=NULL) {
>                 try { delete _connectionFactory;
>                 } catch(...) {}
>             } _connectionFactory = NULL;
>             _started=false;
>         }
>     }
> 
>     void releasePools() {
>         for(tSessionPoolMap::iterator it=_pools.begin();
> it!=_pools.end(); it++) {
>             try {
>                 delete it->second;
>             } catch(...) { }
>         }
>         _pools.clear();
>     }
> 
>     // If something bad happens you see it here as this class is also been
>     // registered as an ExceptionListener with the connection.
>     virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
>         std::cout<<"(ConnPoolManager::onException) CMS Exception
> occurred.  Shutting down client."<<std::endl;
>         ex.printStackTrace();
>         exit(1);
>     }
> 
> 
> 
> };
> 
> class JMSEndPointThread  : public Thread {
> private:
>     CountDownLatch _stopNow;
>     ConnPoolManager* _pools;
>     std::string _destinationString;
>     PooledSession* _session;
>     Destination* _destination;
>     bool _sessionTransacted;
> 
> protected:
> 
>     JMSEndPointThread(  ConnPoolManager* pools,
>                         const std::string& dest,
>                         bool sessionTransacted) :
>             _stopNow(1),
>             _pools(pools),
>             _destinationString(dest),
>             _session(NULL),
>             _destination(NULL),
>             _sessionTransacted(sessionTransacted)
>     {
>     }
> 
>     virtual bool prepareSession() {
>         cleanup();
>         // Create a Session
>         if( _sessionTransacted ) {
>             _session = _pools->getSession(Session::SESSION_TRANSACTED);
>         } else {
>             _session = _pools->getSession(Session::AUTO_ACKNOWLEDGE);
>         }
>         if(_session==NULL) return false;
> 
>         // Create the Queue destination
>         _destination = _session->createQueue( _destinationString );
>         return true;
>     }
> 
>     PooledSession* getSession() { return _session; }
>     Destination* getDestination() { return _destination; }
>     bool getTransacted() { return _sessionTransacted; }
> 
>     bool stopCheck(int timeOut=0) {
>         if(timeOut>=0) return _stopNow.await(timeOut);
>         else _stopNow.await();
>         return true;
>     }
> 
>     virtual void cleanup() {
>         // Destroy resources.
>         try{
>             if( _destination != NULL ) delete _destination;
>         } catch(...) { }
>         _destination = NULL;
> 
>         // Back session to pool
>         try{
>             if( _session != NULL ) _session->close();
>         } catch(...) { }
>         _session = NULL;
>     }
> 
> public:
> 
>     typedef std::vector<JMSEndPointThread*> List;
> 
>     virtual ~JMSEndPointThread() {
>         cleanup();
>     }
> 
>     void stopNow() {
>         _stopNow.countDown();
>     }
> 
> };
> 
> class Producer : public JMSEndPointThread {
> private:
> 
>     MessageProducer* _producer;
>     int _delay;
>     int _numMessages;
> 
> public:
> 
>     Producer(   ConnPoolManager* pools,
>                 const std::string& dest,
>                 int delay,
>                 int numMessages,
>                 bool sessionTransacted = false ) :
>         JMSEndPointThread(pools,dest,sessionTransacted)
>     {
>         _producer = NULL;
>         _delay = delay;
>         _numMessages = numMessages;
>     }
> 
>     virtual ~Producer() {
>         cleanup();
>     }
> 
>     virtual void run() {
>         std::cout<<"Producer started!!!! Thread: 
> "<<Thread::getId()<<std::endl;
>         while(!stopCheck() && _numMessages) {
>             try {
>                 if(prepareProducer()) {
>                     sendMessages();
>                     continue;
>                 }
>             } catch(CMSException& e) {
>                 std::cout<<"(Producer::run) CMSException: " <<
> e.getMessage() << std::endl;
>             } catch(...) {
>                 std::cout<<"(Producer::run) Unknown exception!!!" << 
> std::endl;
>             }
>             cleanup();
>             stopCheck(5000); // reconnect delay
>         }
>         std::cout<<"Producer end!!!! Thread: "<<Thread::getId()<<std::endl;
>         cleanup();
>     }
> 
> private:
> 
>     bool prepareProducer() {
>         // Get Session
>         if(!prepareSession()) return false;
>         // Create a MessageProducer from the Session to the Queue
>         _producer = getSession()->createProducer( getDestination() );
>         _producer->setDeliveryMode( DeliveryMode::PERSISTENT );
>         return true;
>     }
>     void sendMessages() {
>         // Create a messages
>         std::stringstream text;
>         text<<"Hello world! from thread " << Thread::getId();
> 
>         while(!stopCheck() && _numMessages) {
> 
>             TextMessage* message = getSession()->createTextMessage(
> text.str() );
>             message->setIntProperty( "Integer", _numMessages );
> 
>             // Tell the producer to send the message
>             std::cout<<"Sent message #"<<_numMessages<<" from thread
> "<<Thread::getId()<<std::endl;
>             _producer->send( message );
>             delete message;
>             --_numMessages;
> 
>             stopCheck(_delay);
>             if(getTransacted()) getSession()->commit();
>         }
>     }
> 
>     virtual void cleanup() {
>         try{
>             if( _producer != NULL ) delete _producer;
>         } catch(...) { }
>         _producer = NULL;
> 
>         JMSEndPointThread::cleanup();
>     }
> 
> };
> 
> 
> class Consumer :    public MessageListener,
>                     public JMSEndPointThread {
> 
> private:
> 
>     MessageConsumer* _consumer;
>     int _delay;
>     bool _useListener;
> 
> public:
> 
>     Consumer(   ConnPoolManager* pools,
>                 const std::string& dest,
>                 int delay,
>                 bool useListener = true,
>                 bool sessionTransacted = true ) :
>         JMSEndPointThread(pools,dest,sessionTransacted)
>     {
>         _consumer = NULL;
>         _delay = delay;
>         _useListener = useListener;
>     }
>     virtual ~Consumer(){
>         cleanup();
>     }
> 
>     virtual void run() {
>         std::cout<<"Consumer started!!!! Thread: 
> "<<Thread::getId()<<std::endl;
>         while(!stopCheck()) {
>             try {
>                 if(prepareConsumer()) {
>                     // Wait while asynchronous messages come in.
>                     if(_useListener) {
>                         stopCheck(-1);
>                     } else {
>                         consumeLoop();
>                     }
>                     continue;
>                 }
>             } catch (CMSException& e) {
>                 std::cout<<"(Consumer::run) CMSException: " <<
> e.getMessage() << std::endl;
>             } catch(...) {
>                 std::cout<<"(Consumer::run) Unknown exception!!!" << 
> std::endl;
>             }
>             cleanup();
>             stopCheck(5000); // reconnect delay
>         }
>         std::cout<<"Consumer end!!!! Thread: "<<Thread::getId()<<std::endl;
>         cleanup();
>     }
> 
> 
> private:
> 
>     bool prepareConsumer() {
>         // Get Session
>         if(!prepareSession()) return false;
>         // Create a MessageConsumer from the Session to the Queue
>         _consumer = getSession()->createConsumer( getDestination() );
>         if(_useListener)
>             _consumer->setMessageListener( this );
> 
>         return true;
>     }
> 
>     void consumeLoop() {
>         while(!stopCheck()) {
>             Message *msg = _consumer->receive(500);
>             if(msg!=NULL) {
>                 onMessage(msg);
>                 delete msg;
>             }
>         }
>     }
> 
>     // Called from the consumer since this class is a registered
> MessageListener.
>     virtual void onMessage( const Message* message ) {
> 
>         static int count = 0;
> 
>         try
>         {
>             count++;
>             const TextMessage* textMessage =
>                 dynamic_cast< const TextMessage* >( message );
>             std::string text = "";
> 
>             if( textMessage != NULL ) {
>                 text = textMessage->getText();
>             } else {
>                 text = "NOT A TEXTMESSAGE!";
>             }
> 
>             std::cout<<"Message #"<<count<<" Received: "<<text<<std::endl;
>             if(_delay) {
>                 std::cout<<"Starting delay..."<<std::endl;
>                 stopCheck(_delay);
>             }
> 
>         } catch (CMSException& e) {
>             std::cout<<"(Consumer::onMessage) CMSException: " <<
> e.getMessage() << std::endl;
>         } catch(...) {
>             std::cout<<"(Consumer::onMessage) Unknown exception!!!" <<
> std::endl;
>         }
> 
>         try
>         {   // Commit all messages.
>             if( getTransacted() ) {
>                 std::cout<<"Try commit....."<<std::endl;
>                 getSession()->commit();
>                 std::cout<<"Commit OK!!!"<<std::endl;
>             }
>         } catch (CMSException& e) {
>             std::cout<<"(Consumer::onMessage) CMSException: " <<
> e.getMessage() << std::endl;
>         } catch(...) {
>             std::cout<<"(Consumer::onMessage) Unknown exception!!!" <<
> std::endl;
>         }
> 
>     }
> 
>     virtual void cleanup() {
>         try{
>             if( _consumer != NULL ) delete _consumer;
>         } catch (...) { }
>         _consumer = NULL;
>         JMSEndPointThread::cleanup();
>     }
> };
> 
> 
> class ScopedActiveMQLibrary {
> public:
>     ScopedActiveMQLibrary() {
>         activemq::library::ActiveMQCPP::initializeLibrary();
>     }
>     virtual ~ScopedActiveMQLibrary() {
>         activemq::library::ActiveMQCPP::shutdownLibrary();
>     }
> };
> 
> class AppTest : private ScopedActiveMQLibrary {
> private:
>     int _consumer;
>     int _producer;
>     int _numMessages;
>     bool _useListener;
>     int _delay;
>     ConnPoolManager _poolManager;
>     JMSEndPointThread::List _endPointList;
> 
>     static AppTest* _app;
>     static CountDownLatch _terminationRequest;
> 
> public:
>     AppTest(int argc, char** argv) :
>             _consumer(0),
>             _producer(0),
>             _numMessages(1),
>             _useListener(false),
>             _delay(0)
>     {
>         _app=this;
>         int c;
>         while ((c = getopt (argc, argv, "lc:p:n:d:")) != -1) {
>             switch(c) {
>             case 'l': _useListener=true; break;
>             case 'c': _consumer=atoi(optarg); break;
>             case 'p': _producer=atoi(optarg); break;
>             case 'n': _numMessages=atoi(optarg); break;
>             case 'd': _delay=atoi(optarg); break;
>             }
>         }
>         if(_consumer<0) _consumer=0;
>         if(_producer<0) _producer=0;
>         _poolManager.setBrokerURI(
>                 "failover://("
>                 "tcp://localhost:61616"
>                 "?transport.useInactivityMonitor=false"
>                 ")?timeout=1000"
>                 "&cms.RedeliveryPolicy.maximumRedeliveries=-1"
>                 );
>         _poolManager.startPools();
>     }
> 
>     virtual ~AppTest() {
>     }
> 
>     int run() {
> 
>         // Install signal handler
>         if(!installSigAction())
>             return 1;
> 
>         // Create consumer/producer objects
>         loadEndPoins();
> 
>         // Start the producer/consumer thread.
>         startEndPoins();
> 
>         // start another tasks
>         // ...
> 
>         // Wait for termination request
>         /*if(_consumer) {
>             _terminationRequest.await();
>             stopEndPoins();
>         }*/
> 
>         // join threads
>         joinEndPoins();
> 
>         return 0;
>     }
> 
> private:
> 
>     void loadEndPoins() {
>         for(int i=0; i<_consumer;i++) {
>             _endPointList.push_back( new
> Consumer(&_poolManager,"TEST.FOO",_delay,_useListener) );
>         }
>         for(int i=0; i<_producer;i++) {
>             _endPointList.push_back( new
> Producer(&_poolManager,"TEST.FOO",_delay,_numMessages) );
>         }
>     }
> 
>     void startEndPoins() {
>         for(JMSEndPointThread::List::iterator it=_endPointList.begin();
>             it!=_endPointList.end(); it++ )
>             (**it).start();
>     }
> 
>     void stopEndPoins() {
>         for(JMSEndPointThread::List::reverse_iterator 
> it=_endPointList.rbegin();
>             it!=_endPointList.rend(); it++ )
>             (**it).stopNow();
>     }
> 
>     void joinEndPoins() {
>         for(JMSEndPointThread::List::reverse_iterator 
> it=_endPointList.rbegin();
>             it!=_endPointList.rend(); it++ ) {
>             (**it).join();
>             delete *it;
>             *it = NULL;
>         }
>         _endPointList.clear();
>     }
> 
>     static void signalHandler(int sig) {
>         _app->_terminationRequest.countDown();
>         _app->stopEndPoins();
>     }
> 
>     bool installSigAction() {
>         struct sigaction action;
>         memset(&action, 0, sizeof(action));
>         action.sa_handler = signalHandler;
>         if( sigaction(SIGTERM, &action, NULL)<0 ||
>             sigaction(SIGQUIT, &action, NULL)<0 ||
>             sigaction(SIGINT,  &action, NULL)<0 )
>         { return false; }
>         return true;
>     }
> 
>     int termWait() {
>         sigset_t sset;
>         sigemptyset(&sset);
>         sigaddset(&sset, SIGINT);
>         sigaddset(&sset, SIGQUIT);
>         sigaddset(&sset, SIGTERM);
>         sigprocmask(SIG_BLOCK, &sset, NULL);
>         int sig;
>         sigwait(&sset, &sig);
>         return sig;
>     }
> 
> };
> AppTest* AppTest::_app = NULL;
> CountDownLatch AppTest::_terminationRequest(1);
> 
> 
> void my_unexpected() {
>     throw activemq::exceptions::ActiveMQException();
> }
> 
> int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
>     //std::set_unexpected(my_unexpected);
>     AppTest test(argc, argv);
>     return test.run();
> }
> 
> // END SNIPPET: demo

-- 
Tim Bish
------------
FuseSource
Email: tim.b...@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/


Reply via email to