Perhaps my search foo is weak, but I have been unable to find any recent examples of people sending high data data rates over activemq CMS. http://www.mostly-useless.com/blog/2007/12/27/playing-with-activemq/ -> has some good data (although from 2007).
Have found some references to CMS 2.2 through the nabble search, but coming up short at the moment for more recent applications. I have a pretty simple data driven signal processing system, the processing is essentially linear: A-->B-->C-->D Data rates are highish (~110 Mbit/sec) but nothing astronomical, messages vary in size, but are generally pretty big (average ~ 200kB, but can be up to a 1MB). I moved from a simple sockets to activemq (for portability reasons), I am using activemq 5.4.2 and CMS 3.4.2 on a pretty peppy RHEL 5.3 64 bit machine (2.6 Ghz 8 core Xeon with 12GB RAM) (the versions/OS that I use is defined for me, so updating versions is a bigish deal (I know 5.5 is out)). I can slow down the processing, and running with anything close to 40Mb/sec results in the processes consuming a full CPU each (where as running without activemq each process consumes ~ 20% of a processor) so I am concerned about activemq's overhead (I suppose it is more likely the overhead of the CMS bindings). I don't care about persistence, the data is time sensitive so if the processing goes down it is of no value for me (video like data). I have disabled producerFlowControl (unsure if that was a good idea), but as a data driven processing chain if I am not able to keep up with the data rate then I am dead in the water. Going to try nio for the transport as I have seen that mentioned a few places for high throughput usage. Seem to get into a deadlock state where the process is waiting on a mutex: __kernel_vsyscall() at 0xffffe410 pthread_cond_timedwait@@GLIBC_2.3.2() at 0xf7b5bd12 decaf::internal::util::concurrent::ConditionImpl::wait() at 0xf77ce9f3 decaf::util::concurrent::Mutex::wait() at 0xf784d767 decaf::util::concurrent::Mutex::wait() at 0xf784d5f2 activemq::transport::failover::FailoverTransport::oneway() at 0xf7540ed7 activemq::transport::correlator::ResponseCorrelator::oneway() at 0xf752a2b8 activemq::core::ActiveMQConnection::oneway() at 0xf746af12 activemq::core::ActiveMQSession::send() at 0xf74bb9e2 activemq::core::ActiveMQProducer::send() at 0xf74aa764 activemq::core::ActiveMQProducer::send() at 0xf74a7dd1 activemq::core::ActiveMQProducer::send() at 0xf74a9585 activeMqProducer::send() at ioActiveMqProducer.cpp:116 0x805a235 io::send() at io.cpp:162 0x8057614 process_data_msg() at main.cpp:814 0x804cf7c process_message() at main.cpp:891 0x804d272 main() at main.cpp:1,097 0x804e481 I esentially used the cpp samples when I created my io library (just a simple wrapper). Consumer code (Sanitised - so may not be syntactically correct) : void activeMqConsumer::runConsumer() { try { // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI ); // Create a Connection connection = connectionFactory->createConnection(); delete connectionFactory; ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection ); if( amqConnection != NULL ) { amqConnection->addTransportListener( this ); } connection->setExceptionListener(this); // Create a Session if( clientAck ) { session = connection->createSession( Session::CLIENT_ACKNOWLEDGE ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination = session->createTopic( destURI ); } else { destination = session->createQueue( destURI ); } // Create a MessageConsumer from the Session to the Topic or Queue consumer = session->createConsumer( destination ); consumer->setMessageListener( this ); } catch (CMSException& e) { e.printStackTrace(); } } // Called from the consumer since this class is a registered MessageListener. void activeMqConsumer::onMessage( const Message* message ) { INT32 msgSize; INT32 bytesRead = 0; FLOAT64 latency = 0; TIME currentTime; INT32 rval; rval = SDL_SemWait(this->pElementAvailable); if (rval == -1) { printf("(onMessage) error: SDL_SemWait failed\n"); } try { // cast the activeMq "message" into an array of bytes const cms::BytesMessage *bytesMessage = dynamic_cast<const cms::BytesMessage *>(message); if(bytesMessage != NULL) { msgSize = bytesMessage->getBodyLength(); bytesRead = bytesMessage->readBytes((unsigned char*)this->pInputMsg, msgSize); totalBytesRecvd += msgSize; totalMsgsRecvd++; // the assumption here is that you receive messages after they have been sent // if the times between processing modules are not synchronized all bets are off. getSystemTime(&(currentTime)); latency = (FLOAT64)currentTime - (FLOAT64)this->pInputMsg->hdr.time; this->accruedLatency += latency; if(clientAck == TRUE) { message->acknowledge(); } } } catch(cms::CMSException &exception) { exception.printStackTrace(); } if(startTime == 0) { getSystemTime(&(startTime)); } // indicate that there is data available rval = SDL_SemPost(this->pDataAvailable); if (rval == -1) { printf("(enqueue) error: SDL_SemPost failed\n"); } } Producer code: activeMqProducer::activeMqProducer( const std::string& brokerURI, const std::string& destURI, BOOLEAN useTopic) { // Create a ConnectionFactory auto_ptr<ConnectionFactory> connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI ) ); // Create a Connection connection = connectionFactory->createConnection(); sessionTransacted = false; // Create a Session if( sessionTransacted ) { session = connection->createSession( Session::SESSION_TRANSACTED ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination = session->createTopic( destURI ); } else { destination = session->createQueue( destURI ); } // Create a MessageProducer from the Session to the Topic or Queue producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); message = session->createBytesMessage(); } void activeMqProducer::send(IO_MSG *pOutputMsg) { // Set the send time in the IO message getSystemTime(&(pOutputMsg->hdr.time)); // Convert the message into an activeMQ bytesMessage message->setBodyBytes( (const unsigned char*)pOutputMsg, pOutputMsg->hdr.msgSize + sizeof(MSG_HDR)); // Send to broker producer->send( message ); } Any suggestions on how to speed up the data rates and stop the producer from locking up would be greatly appreciated. V/R ~Joe -- View this message in context: http://activemq.2283324.n4.nabble.com/Activemq-CMS-high-data-rate-lockup-tp3670718p3670718.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.