On Tue, 2011-03-01 at 20:22 -0300, Henrique Magarotto wrote: > Hi everybody, > > First, sorry for long mail and thanks for great job in ActiveMQ and > ActiveMQ-CPP. > > I'm testing some failure cases with ActiveMQ and ActiveMQ-CPP. > > In my test, an unexpected exception is thrown if broker is stopped > while consumer is in transaction. > > Environment: > ActiveMQ 5.4.2 > ActiveMQ-CPP 3.2.4 > Ubuntu 10.04 > JRE 1.6.0_20-b02 >
> I get the following message: > > terminate called after throwing an instance of 'cms::CMSException' > what(): Failover timeout of 1000 ms reached. > Aborted (core dumped) > Sounds like a bug, can you open a new issue in are Jira bug tracker and attach your test case along with instructions for reproducing the error? https://issues.apache.org/jira/browse/AMQCPP Regards Tim. > core backtrace: > #0 0x001ef422 in __kernel_vsyscall () > #1 0x00c5a651 in *__GI_raise (sig=6) at > ../nptl/sysdeps/unix/sysv/linux/raise.c:64 > #2 0x00c5da82 in *__GI_abort () at abort.c:92 > #3 0x00bf952f in __gnu_cxx::__verbose_terminate_handler() () from > /usr/lib/libstdc++.so.6 > #4 0x00bf7465 in ?? () from /usr/lib/libstdc++.so.6 > #5 0x00bf74a2 in std::terminate() () from /usr/lib/libstdc++.so.6 > #6 0x00bf74c5 in ?? () from /usr/lib/libstdc++.so.6 > #7 0x00bf6915 in __cxa_call_unexpected () from /usr/lib/libstdc++.so.6 > #8 0x0052f8ae in > activemq::core::TransactionSynhcronization::beforeEnd (this=0x93ac548) > at activemq/core/ActiveMQConsumer.cpp:84 > #9 0x00550588 in > activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x93b3e48) > at activemq/core/ActiveMQTransactionContext.cpp:192 > #10 0x00550d63 in activemq::core::ActiveMQTransactionContext::commit > (this=0x93b3e48) at activemq/core/ActiveMQTransactionContext.cpp:127 > #11 0x0053e460 in activemq::core::ActiveMQSession::commit > (this=0x93b3c10) at activemq/core/ActiveMQSession.cpp:189 > #12 0x0042c1a0 in activemq::cmsutil::PooledSession::commit > (this=0x93b3fd8) at activemq/cmsutil/PooledSession.h:87 > #13 0x0804c0fb in Consumer::onMessage (this=0x93a8c30, > message=0x93b49b0) at main.cpp:455 > #14 0x0804bdc8 in Consumer::consumeLoop (this=0x93a8c30) at main.cpp:415 > #15 0x0804bb56 in Consumer::run (this=0x93a8c30) at main.cpp:381 > #16 0x00823cf1 in decaf::lang::ThreadProperties::runCallback > (properties=0x93abff0) at decaf/lang/Thread.cpp:135 > #17 0x00822847 in threadWorker (arg=0x93abff0) at decaf/lang/Thread.cpp:188 > #18 0x0015a96e in start_thread (arg=0xb6f8bb70) at pthread_create.c:300 > #19 0x00cfda4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 > > > Exception specification for method > 'activemq::core::TransactionSynhcronization::beforeEnd' says: 'throw( > exceptions::ActiveMQException )' but 'ActiveMQConsumer::acknowledge' > throws 'cms::CMSException', so 'std::unexpected' is called and 'abort' > occurs. > (http://www.linuxprogrammingblog.com/cpp-exception-specifications-are-evil) > > > When 'MessageListener' is used, the commit freezes, probability, in thread 5: > > Thread 1: > #0 0x00cfd422 in __kernel_vsyscall () > #1 0x00ddcb5d in pthread_join (threadid=3061869424, > thread_return=0xbf964bdc) at pthread_join.c:89 > #2 0x00a923c0 in decaf::lang::Thread::join (this=0x8eba574) at > decaf/lang/Thread.cpp:421 > #3 0x0804c9f1 in AppTest::joinEndPoins (this=0xbf964c64) at main.cpp:586 > #4 0x0804c678 in AppTest::run (this=0xbf964c64) at main.cpp:555 > #5 0x0804a33c in main (argc=4, argv=0xbf964d84) at main.cpp:633 > > Thread 2: > #0 0x00cfd422 in __kernel_vsyscall () > #1 0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at > ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122 > #2 0x0034f9dd in __pthread_cond_wait (cond=0x8eb0e38, > mutex=0x8eb0e08) at forward.c:139 > #3 0x00a606a9 in > decaf::internal::util::concurrent::ConditionImpl::wait > (condition=0x8eb0e38) at > decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94 > #4 0x00ad1353 in decaf::util::concurrent::Mutex::wait > (this=0x8eb0d6c) at decaf/util/concurrent/Mutex.cpp:95 > #5 0x007fc9c5 in activemq::threads::CompositeTaskRunner::run > (this=0x8eb0d48) at activemq/threads/CompositeTaskRunner.cpp:118 > #6 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback > (properties=0x8eb0e90) at decaf/lang/Thread.cpp:135 > #7 0x00a92847 in threadWorker (arg=0x8eb0e90) at decaf/lang/Thread.cpp:188 > #8 0x00ddb96e in start_thread (arg=0xb7808b70) at pthread_create.c:300 > #9 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 > > Thread 3: > #0 0x00cfd422 in __kernel_vsyscall () > #1 0x00de2af9 in __lll_lock_wait () at > ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142 > #2 0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0 > #3 0x00dddf61 in __pthread_mutex_lock (mutex=0x8ebb3a8) at > pthread_mutex_lock.c:61 > #4 0x0034fba6 in pthread_mutex_lock (mutex=0x8ebb3a8) at forward.c:182 > #5 0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock > (handle=0x8ebb3a8) at > decaf/internal/util/concurrent/unix/MutexImpl.cpp:71 > #6 0x00ad1412 in decaf::util::concurrent::Mutex::lock > (this=0x8ebb2bc) at decaf/util/concurrent/Mutex.cpp:75 > #7 0x007c5770 in > decaf::util::StlQueue<decaf::lang::Pointer<activemq::commands::MessageDispatch, > decaf::util::concurrent::atomic::AtomicRefCounter> >::lock > (this=0x8ebb2a8) > at ./decaf/util/StlQueue.h:253 > #8 activemq::core::MessageDispatchChannel::lock (this=0x8ebb2a8) at > activemq/core/MessageDispatchChannel.h:153 > #9 0x00ad0ab5 in decaf::util::concurrent::Lock::lock > (this=0xb7006fe4) at decaf/util/concurrent/Lock.cpp:54 > #10 0x00ad0c08 in Lock (this=0xfffffe00, object=0x8ebb3a8, > intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32 > #11 0x0078e102 in > activemq::core::ActiveMQConsumer::clearMessagesInProgress > (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:1112 > #12 0x007af15c in > activemq::core::ActiveMQSession::clearMessagesInProgress > (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:239 > #13 0x00778543 in > activemq::core::ActiveMQConnection::transportInterrupted > (this=0x8eb1210) at activemq/core/ActiveMQConnection.cpp:704 > #14 0x00803b34 in > activemq::transport::TransportFilter::transportInterrupted > (this=0x8eb11c0) at activemq/transport/TransportFilter.cpp:67 > #15 0x008187ae in > activemq::transport::failover::FailoverTransport::handleTransportFailure > (this=0x8eb0a38, error=...) at > activemq/transport/failover/FailoverTransport.cpp:476 > #16 0x0082406c in > activemq::transport::failover::FailoverTransportListener::onException > (this=0x8eb0878, ex=...) at > activemq/transport/failover/FailoverTransportListener.cpp:97 > #17 0x00803c0b in activemq::transport::TransportFilter::fire > (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:49 > #18 0x00803c64 in activemq::transport::TransportFilter::onException > (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:41 > #19 0x00803c0b in activemq::transport::TransportFilter::fire > (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:49 > #20 0x00803c64 in activemq::transport::TransportFilter::onException > (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:41 > #21 0x00801b13 in activemq::transport::IOTransport::fire > (this=0x8eb27a0, ex=...) at activemq/transport/IOTransport.cpp:73 > #22 0x008023bf in activemq::transport::IOTransport::run > (this=0x8eb27a0) at activemq/transport/IOTransport.cpp:246 > #23 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback > (properties=0x8eb2ea0) at decaf/lang/Thread.cpp:135 > #24 0x00a92847 in threadWorker (arg=0x8eb2ea0) at decaf/lang/Thread.cpp:188 > #25 0x00ddb96e in start_thread (arg=0xb7007b70) at pthread_create.c:300 > #26 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 > > Thread 4: > #0 0x00cfd422 in __kernel_vsyscall () > #1 0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at > ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122 > #2 0x0034f9dd in __pthread_cond_wait (cond=0x8eba6c8, > mutex=0x8eba698) at forward.c:139 > #3 0x00a606a9 in > decaf::internal::util::concurrent::ConditionImpl::wait > (condition=0x8eba6c8) at > decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94 > #4 0x00ad1353 in decaf::util::concurrent::Mutex::wait > (this=0x8eba584) at decaf/util/concurrent/Mutex.cpp:95 > #5 0x00ad029a in decaf::util::concurrent::CountDownLatch::await > (this=0x8eba57c) at decaf/util/concurrent/CountDownLatch.cpp:56 > #6 0x0804af84 in JMSEndPointThread::stopCheck (this=0x8eba574, > timeOut=-1) at main.cpp:226 > #7 0x0804bb46 in Consumer::run (this=0x8eba570) at main.cpp:379 > #8 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback > (properties=0x8eba5b0) at decaf/lang/Thread.cpp:135 > #9 0x00a92847 in threadWorker (arg=0x8eba5b0) at decaf/lang/Thread.cpp:188 > #10 0x00ddb96e in start_thread (arg=0xb6806b70) at pthread_create.c:300 > #11 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 > > Thread 5: > #0 0x00cfd422 in __kernel_vsyscall () > #1 0x00de2af9 in __lll_lock_wait () at > ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142 > #2 0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0 > #3 0x00dddf61 in __pthread_mutex_lock (mutex=0x8eb05e0) at > pthread_mutex_lock.c:61 > #4 0x0034fba6 in pthread_mutex_lock (mutex=0x8eb05e0) at forward.c:182 > #5 0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock > (handle=0x8eb05e0) at > decaf/internal/util/concurrent/unix/MutexImpl.cpp:71 > #6 0x00ad1412 in decaf::util::concurrent::Mutex::lock > (this=0x8eb0a88) at decaf/util/concurrent/Mutex.cpp:75 > #7 0x00ad0ab5 in decaf::util::concurrent::Lock::lock > (this=0xb6004ce0) at decaf/util/concurrent/Lock.cpp:54 > #8 0x00ad0c08 in Lock (this=0xfffffe00, object=0x8eb05e0, > intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32 > #9 0x0081b8fd in > activemq::transport::failover::FailoverTransport::oneway > (this=0x8eb0a38, command=...) at > activemq/transport/failover/FailoverTransport.cpp:186 > #10 0x00807f9f in > activemq::transport::correlator::ResponseCorrelator::oneway > (this=0x8eb11c0, command=...) at > activemq/transport/correlator/ResponseCorrelator.cpp:82 > #11 0x007713be in activemq::core::ActiveMQConnection::oneway > (this=0x8eb1210, command=...) at > activemq/core/ActiveMQConnection.cpp:741 > #12 0x007b0a4f in activemq::core::ActiveMQSession::oneway > (this=0x8ebab90, command=...) at activemq/core/ActiveMQSession.cpp:903 > #13 0x00795a1c in activemq::core::ActiveMQConsumer::acknowledge > (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:860 > #14 0x0079f885 in > activemq::core::TransactionSynhcronization::beforeEnd (this=0x8ebb9e8) > at activemq/core/ActiveMQConsumer.cpp:85 > #15 0x007c0588 in > activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x8ebadc8) > at activemq/core/ActiveMQTransactionContext.cpp:192 > #16 0x007c0d63 in activemq::core::ActiveMQTransactionContext::commit > (this=0x8ebadc8) at activemq/core/ActiveMQTransactionContext.cpp:127 > #17 0x007ae460 in activemq::core::ActiveMQSession::commit > (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:189 > #18 0x0069c1a0 in activemq::cmsutil::PooledSession::commit > (this=0x8ebaf38) at activemq/cmsutil/PooledSession.h:87 > #19 0x0804c0fb in Consumer::onMessage (this=0x8eba570, > message=0x8ebbec0) at main.cpp:455 > #20 0x0079972f in activemq::core::ActiveMQConsumer::dispatch > (this=0x8ebb270, dispatch=...) at > activemq/core/ActiveMQConsumer.cpp:1018 > #21 0x007bd5c3 in activemq::core::ActiveMQSessionExecutor::dispatch > (this=0x8ebae78, dispatch=...) at > activemq/core/ActiveMQSessionExecutor.cpp:129 > #22 0x007bd993 in activemq::core::ActiveMQSessionExecutor::iterate > (this=0x8ebae78) at activemq/core/ActiveMQSessionExecutor.cpp:166 > #23 0x008007f3 in activemq::threads::DedicatedTaskRunner::run > (this=0x8ebb550) at activemq/threads/DedicatedTaskRunner.cpp:111 > #24 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback > (properties=0x8ebaad8) at decaf/lang/Thread.cpp:135 > #25 0x00a92847 in threadWorker (arg=0x8ebaad8) at decaf/lang/Thread.cpp:188 > #26 0x00ddb96e in start_thread (arg=0xb6005b70) at pthread_create.c:300 > #27 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 > > > > I tried to resolve this issue changing configurations and code, but i > always get an error or client freezes. > Is it a bug? Any suggestions? > > Thanks. > Henrique > > How to reproduce: > 1. Start ActiveMQ > 2. Send a massage './activemqTest -p1 -n1' > 3. Run consumer './activemqTest -c1 -d10000' > 4. Stop activemq when 'Starting delay...' message appears. > 5. After activemq is down and delay is finish, consumer try commit > (message 'Try commit') and unexpected exception occurs. > > Following test code, based on ActiveMQ-CPP sample > 'activemq-cpp-library-3.2.4/src/examples/main.cpp': > > > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > // START SNIPPET: demo > > #include <activemq/library/ActiveMQCPP.h> > #include <activemq/cmsutil/SessionPool.h> > #include <activemq/cmsutil/PooledSession.h> > #include <activemq/cmsutil/ResourceLifecycleManager.h> > #include <activemq/exceptions/ActiveMQException.h> > #include <decaf/lang/Thread.h> > #include <decaf/lang/Runnable.h> > #include <decaf/util/concurrent/CountDownLatch.h> > #include <decaf/util/concurrent/Mutex.h> > #include <decaf/lang/Integer.h> > #include <decaf/lang/Long.h> > #include <decaf/lang/System.h> > #include <activemq/core/ActiveMQConnectionFactory.h> > #include <activemq/util/Config.h> > #include <cms/Connection.h> > #include <cms/Session.h> > #include <cms/TextMessage.h> > #include <cms/BytesMessage.h> > #include <cms/MapMessage.h> > #include <cms/ExceptionListener.h> > #include <cms/MessageListener.h> > #include <iostream> > #include <memory> > #include <map> > #include <vector> > #include <unistd.h> > > using namespace activemq::core; > using namespace activemq::cmsutil; > using namespace decaf::util::concurrent; > using namespace decaf::util; > using namespace decaf::lang; > using namespace cms; > > > class ConnPoolManager : protected decaf::util::concurrent::Mutex, > public ExceptionListener { > private: > typedef std::map< int, activemq::cmsutil::SessionPool* > tSessionPoolMap; > > std::string _brokerURI; > ConnectionFactory* _connectionFactory; > Connection* _connection; > ResourceLifecycleManager* _lifecycleManager; > tSessionPoolMap _pools; > bool _started; > > public: > ConnPoolManager() : > _brokerURI("tcp://localhost:61616"), > _connectionFactory(NULL), > _connection(NULL), > _lifecycleManager(NULL), > _started(false) > { > } > > virtual ~ConnPoolManager() { > cleanup(); > } > > void setBrokerURI(const std::string& uri) { > _brokerURI = uri; > } > > bool startPools() { > synchronized(this) { > if(_started) return true; > try { > createConnection(); > createPools(); > _started=true; > return true; > } catch(CMSException& e) { > std::cout<<"(ConnPoolManager::startPools) > CMSException: " << e.getMessage() << std::endl; > } catch(...) { > std::cout<<"(ConnPoolManager::startPools) Unknown > exception!!!" << std::endl; > } > } > cleanup(); > return false; > } > > PooledSession* getSession(cms::Session::AcknowledgeMode ack) { > if(_started) { > return _pools[ack]->takeSession(); > } else if(startPools()) { > return _pools[ack]->takeSession(); > } else { > return NULL; > } > } > > private: > > void createConnection(){ > // Create a ConnectionFactory > _connectionFactory = > ConnectionFactory::createCMSConnectionFactory( _brokerURI ); > > // Create a Connection > _connection = _connectionFactory->createConnection(); > _connection->start(); > _connection->setExceptionListener(this); > } > > void createPools(){ > // Create lifecycle manager > _lifecycleManager = new activemq::cmsutil::ResourceLifecycleManager(); > > // Create pools, one for each acknowledge type > for( int ack = cms::Session::AUTO_ACKNOWLEDGE; > ack <= cms::Session::INDIVIDUAL_ACKNOWLEDGE; > ack++) > { > _pools[ack] = new activemq::cmsutil::SessionPool( > _connection, > static_cast<cms::Session::AcknowledgeMode>(ack), > _lifecycleManager); > } > } > > void cleanup() { > synchronized(this) { > releasePools(); > if(_lifecycleManager!=NULL) { > try { delete _lifecycleManager; > } catch(...) {} > } _lifecycleManager = NULL; > > if(_connection!=NULL) { > try { delete _connection; > } catch(...) {} > } _connection = NULL; > if(_connectionFactory!=NULL) { > try { delete _connectionFactory; > } catch(...) {} > } _connectionFactory = NULL; > _started=false; > } > } > > void releasePools() { > for(tSessionPoolMap::iterator it=_pools.begin(); > it!=_pools.end(); it++) { > try { > delete it->second; > } catch(...) { } > } > _pools.clear(); > } > > // If something bad happens you see it here as this class is also been > // registered as an ExceptionListener with the connection. > virtual void onException( const CMSException& ex AMQCPP_UNUSED) { > std::cout<<"(ConnPoolManager::onException) CMS Exception > occurred. Shutting down client."<<std::endl; > ex.printStackTrace(); > exit(1); > } > > > > }; > > class JMSEndPointThread : public Thread { > private: > CountDownLatch _stopNow; > ConnPoolManager* _pools; > std::string _destinationString; > PooledSession* _session; > Destination* _destination; > bool _sessionTransacted; > > protected: > > JMSEndPointThread( ConnPoolManager* pools, > const std::string& dest, > bool sessionTransacted) : > _stopNow(1), > _pools(pools), > _destinationString(dest), > _session(NULL), > _destination(NULL), > _sessionTransacted(sessionTransacted) > { > } > > virtual bool prepareSession() { > cleanup(); > // Create a Session > if( _sessionTransacted ) { > _session = _pools->getSession(Session::SESSION_TRANSACTED); > } else { > _session = _pools->getSession(Session::AUTO_ACKNOWLEDGE); > } > if(_session==NULL) return false; > > // Create the Queue destination > _destination = _session->createQueue( _destinationString ); > return true; > } > > PooledSession* getSession() { return _session; } > Destination* getDestination() { return _destination; } > bool getTransacted() { return _sessionTransacted; } > > bool stopCheck(int timeOut=0) { > if(timeOut>=0) return _stopNow.await(timeOut); > else _stopNow.await(); > return true; > } > > virtual void cleanup() { > // Destroy resources. > try{ > if( _destination != NULL ) delete _destination; > } catch(...) { } > _destination = NULL; > > // Back session to pool > try{ > if( _session != NULL ) _session->close(); > } catch(...) { } > _session = NULL; > } > > public: > > typedef std::vector<JMSEndPointThread*> List; > > virtual ~JMSEndPointThread() { > cleanup(); > } > > void stopNow() { > _stopNow.countDown(); > } > > }; > > class Producer : public JMSEndPointThread { > private: > > MessageProducer* _producer; > int _delay; > int _numMessages; > > public: > > Producer( ConnPoolManager* pools, > const std::string& dest, > int delay, > int numMessages, > bool sessionTransacted = false ) : > JMSEndPointThread(pools,dest,sessionTransacted) > { > _producer = NULL; > _delay = delay; > _numMessages = numMessages; > } > > virtual ~Producer() { > cleanup(); > } > > virtual void run() { > std::cout<<"Producer started!!!! Thread: > "<<Thread::getId()<<std::endl; > while(!stopCheck() && _numMessages) { > try { > if(prepareProducer()) { > sendMessages(); > continue; > } > } catch(CMSException& e) { > std::cout<<"(Producer::run) CMSException: " << > e.getMessage() << std::endl; > } catch(...) { > std::cout<<"(Producer::run) Unknown exception!!!" << > std::endl; > } > cleanup(); > stopCheck(5000); // reconnect delay > } > std::cout<<"Producer end!!!! Thread: "<<Thread::getId()<<std::endl; > cleanup(); > } > > private: > > bool prepareProducer() { > // Get Session > if(!prepareSession()) return false; > // Create a MessageProducer from the Session to the Queue > _producer = getSession()->createProducer( getDestination() ); > _producer->setDeliveryMode( DeliveryMode::PERSISTENT ); > return true; > } > void sendMessages() { > // Create a messages > std::stringstream text; > text<<"Hello world! from thread " << Thread::getId(); > > while(!stopCheck() && _numMessages) { > > TextMessage* message = getSession()->createTextMessage( > text.str() ); > message->setIntProperty( "Integer", _numMessages ); > > // Tell the producer to send the message > std::cout<<"Sent message #"<<_numMessages<<" from thread > "<<Thread::getId()<<std::endl; > _producer->send( message ); > delete message; > --_numMessages; > > stopCheck(_delay); > if(getTransacted()) getSession()->commit(); > } > } > > virtual void cleanup() { > try{ > if( _producer != NULL ) delete _producer; > } catch(...) { } > _producer = NULL; > > JMSEndPointThread::cleanup(); > } > > }; > > > class Consumer : public MessageListener, > public JMSEndPointThread { > > private: > > MessageConsumer* _consumer; > int _delay; > bool _useListener; > > public: > > Consumer( ConnPoolManager* pools, > const std::string& dest, > int delay, > bool useListener = true, > bool sessionTransacted = true ) : > JMSEndPointThread(pools,dest,sessionTransacted) > { > _consumer = NULL; > _delay = delay; > _useListener = useListener; > } > virtual ~Consumer(){ > cleanup(); > } > > virtual void run() { > std::cout<<"Consumer started!!!! Thread: > "<<Thread::getId()<<std::endl; > while(!stopCheck()) { > try { > if(prepareConsumer()) { > // Wait while asynchronous messages come in. > if(_useListener) { > stopCheck(-1); > } else { > consumeLoop(); > } > continue; > } > } catch (CMSException& e) { > std::cout<<"(Consumer::run) CMSException: " << > e.getMessage() << std::endl; > } catch(...) { > std::cout<<"(Consumer::run) Unknown exception!!!" << > std::endl; > } > cleanup(); > stopCheck(5000); // reconnect delay > } > std::cout<<"Consumer end!!!! Thread: "<<Thread::getId()<<std::endl; > cleanup(); > } > > > private: > > bool prepareConsumer() { > // Get Session > if(!prepareSession()) return false; > // Create a MessageConsumer from the Session to the Queue > _consumer = getSession()->createConsumer( getDestination() ); > if(_useListener) > _consumer->setMessageListener( this ); > > return true; > } > > void consumeLoop() { > while(!stopCheck()) { > Message *msg = _consumer->receive(500); > if(msg!=NULL) { > onMessage(msg); > delete msg; > } > } > } > > // Called from the consumer since this class is a registered > MessageListener. > virtual void onMessage( const Message* message ) { > > static int count = 0; > > try > { > count++; > const TextMessage* textMessage = > dynamic_cast< const TextMessage* >( message ); > std::string text = ""; > > if( textMessage != NULL ) { > text = textMessage->getText(); > } else { > text = "NOT A TEXTMESSAGE!"; > } > > std::cout<<"Message #"<<count<<" Received: "<<text<<std::endl; > if(_delay) { > std::cout<<"Starting delay..."<<std::endl; > stopCheck(_delay); > } > > } catch (CMSException& e) { > std::cout<<"(Consumer::onMessage) CMSException: " << > e.getMessage() << std::endl; > } catch(...) { > std::cout<<"(Consumer::onMessage) Unknown exception!!!" << > std::endl; > } > > try > { // Commit all messages. > if( getTransacted() ) { > std::cout<<"Try commit....."<<std::endl; > getSession()->commit(); > std::cout<<"Commit OK!!!"<<std::endl; > } > } catch (CMSException& e) { > std::cout<<"(Consumer::onMessage) CMSException: " << > e.getMessage() << std::endl; > } catch(...) { > std::cout<<"(Consumer::onMessage) Unknown exception!!!" << > std::endl; > } > > } > > virtual void cleanup() { > try{ > if( _consumer != NULL ) delete _consumer; > } catch (...) { } > _consumer = NULL; > JMSEndPointThread::cleanup(); > } > }; > > > class ScopedActiveMQLibrary { > public: > ScopedActiveMQLibrary() { > activemq::library::ActiveMQCPP::initializeLibrary(); > } > virtual ~ScopedActiveMQLibrary() { > activemq::library::ActiveMQCPP::shutdownLibrary(); > } > }; > > class AppTest : private ScopedActiveMQLibrary { > private: > int _consumer; > int _producer; > int _numMessages; > bool _useListener; > int _delay; > ConnPoolManager _poolManager; > JMSEndPointThread::List _endPointList; > > static AppTest* _app; > static CountDownLatch _terminationRequest; > > public: > AppTest(int argc, char** argv) : > _consumer(0), > _producer(0), > _numMessages(1), > _useListener(false), > _delay(0) > { > _app=this; > int c; > while ((c = getopt (argc, argv, "lc:p:n:d:")) != -1) { > switch(c) { > case 'l': _useListener=true; break; > case 'c': _consumer=atoi(optarg); break; > case 'p': _producer=atoi(optarg); break; > case 'n': _numMessages=atoi(optarg); break; > case 'd': _delay=atoi(optarg); break; > } > } > if(_consumer<0) _consumer=0; > if(_producer<0) _producer=0; > _poolManager.setBrokerURI( > "failover://(" > "tcp://localhost:61616" > "?transport.useInactivityMonitor=false" > ")?timeout=1000" > "&cms.RedeliveryPolicy.maximumRedeliveries=-1" > ); > _poolManager.startPools(); > } > > virtual ~AppTest() { > } > > int run() { > > // Install signal handler > if(!installSigAction()) > return 1; > > // Create consumer/producer objects > loadEndPoins(); > > // Start the producer/consumer thread. > startEndPoins(); > > // start another tasks > // ... > > // Wait for termination request > /*if(_consumer) { > _terminationRequest.await(); > stopEndPoins(); > }*/ > > // join threads > joinEndPoins(); > > return 0; > } > > private: > > void loadEndPoins() { > for(int i=0; i<_consumer;i++) { > _endPointList.push_back( new > Consumer(&_poolManager,"TEST.FOO",_delay,_useListener) ); > } > for(int i=0; i<_producer;i++) { > _endPointList.push_back( new > Producer(&_poolManager,"TEST.FOO",_delay,_numMessages) ); > } > } > > void startEndPoins() { > for(JMSEndPointThread::List::iterator it=_endPointList.begin(); > it!=_endPointList.end(); it++ ) > (**it).start(); > } > > void stopEndPoins() { > for(JMSEndPointThread::List::reverse_iterator > it=_endPointList.rbegin(); > it!=_endPointList.rend(); it++ ) > (**it).stopNow(); > } > > void joinEndPoins() { > for(JMSEndPointThread::List::reverse_iterator > it=_endPointList.rbegin(); > it!=_endPointList.rend(); it++ ) { > (**it).join(); > delete *it; > *it = NULL; > } > _endPointList.clear(); > } > > static void signalHandler(int sig) { > _app->_terminationRequest.countDown(); > _app->stopEndPoins(); > } > > bool installSigAction() { > struct sigaction action; > memset(&action, 0, sizeof(action)); > action.sa_handler = signalHandler; > if( sigaction(SIGTERM, &action, NULL)<0 || > sigaction(SIGQUIT, &action, NULL)<0 || > sigaction(SIGINT, &action, NULL)<0 ) > { return false; } > return true; > } > > int termWait() { > sigset_t sset; > sigemptyset(&sset); > sigaddset(&sset, SIGINT); > sigaddset(&sset, SIGQUIT); > sigaddset(&sset, SIGTERM); > sigprocmask(SIG_BLOCK, &sset, NULL); > int sig; > sigwait(&sset, &sig); > return sig; > } > > }; > AppTest* AppTest::_app = NULL; > CountDownLatch AppTest::_terminationRequest(1); > > > void my_unexpected() { > throw activemq::exceptions::ActiveMQException(); > } > > int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { > //std::set_unexpected(my_unexpected); > AppTest test(argc, argv); > return test.run(); > } > > // END SNIPPET: demo -- Tim Bish ------------ FuseSource Email: tim.b...@fusesource.com Web: http://fusesource.com Twitter: tabish121 Blog: http://timbish.blogspot.com/