Perhaps my search foo is weak, but I have been unable to find any recent
examples of people sending high data data rates over activemq CMS. 
http://www.mostly-useless.com/blog/2007/12/27/playing-with-activemq/ -> has
some good data (although from 2007).

Have found some references to CMS 2.2 through the nabble search, but coming
up short at the moment for more recent applications.

I have a pretty simple data driven signal processing system, the processing
is essentially linear:

A-->B-->C-->D

Data rates are highish (~110 Mbit/sec) but nothing astronomical, messages
vary in size, but are generally pretty big (average ~ 200kB, but can be up
to a 1MB).

I moved from a simple sockets to activemq (for portability reasons), I am
using activemq 5.4.2 and CMS 3.4.2 on a pretty peppy RHEL 5.3 64 bit machine
(2.6 Ghz 8 core Xeon with 12GB RAM) (the versions/OS that I use is defined
for me, so updating versions is a bigish deal (I know 5.5 is out)).

I can slow down the processing, and running with anything close to 40Mb/sec
results in the processes consuming a full CPU each (where as running without
activemq each process consumes ~ 20% of a processor) so I am concerned about
activemq's overhead (I suppose it is more likely the overhead of the CMS
bindings). 

I don't care about persistence, the data is time sensitive so if the
processing goes down it is of no value for me (video like data).  I have
disabled producerFlowControl (unsure if that was a good idea), but as a data
driven processing chain if I am not able to keep up with the data rate then
I am dead in the water.  Going to try nio for the transport as I have seen
that mentioned a few places for high throughput usage.

Seem to get into a deadlock state where the process is waiting on a mutex:

__kernel_vsyscall() at 0xffffe410       
pthread_cond_timedwait@@GLIBC_2.3.2() at 0xf7b5bd12     
decaf::internal::util::concurrent::ConditionImpl::wait() at 0xf77ce9f3  
decaf::util::concurrent::Mutex::wait() at 0xf784d767    
decaf::util::concurrent::Mutex::wait() at 0xf784d5f2    
activemq::transport::failover::FailoverTransport::oneway() at 0xf7540ed7        
activemq::transport::correlator::ResponseCorrelator::oneway() at 0xf752a2b8     
activemq::core::ActiveMQConnection::oneway() at 0xf746af12      
activemq::core::ActiveMQSession::send() at 0xf74bb9e2   
activemq::core::ActiveMQProducer::send() at 0xf74aa764  
activemq::core::ActiveMQProducer::send() at 0xf74a7dd1  
activemq::core::ActiveMQProducer::send() at 0xf74a9585  
activeMqProducer::send() at ioActiveMqProducer.cpp:116 0x805a235        
io::send() at io.cpp:162 0x8057614              
process_data_msg() at main.cpp:814 0x804cf7c    
process_message() at main.cpp:891 0x804d272     
main() at main.cpp:1,097 0x804e481      

I esentially used the cpp samples when I created my io library (just a
simple wrapper).  

Consumer code (Sanitised - so may not be syntactically correct) :

void activeMqConsumer::runConsumer()
{

        try {

                // Create a ConnectionFactory
                ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory( brokerURI );

                // Create a Connection
                connection = connectionFactory->createConnection();
                delete connectionFactory;

                ActiveMQConnection* amqConnection =
dynamic_cast<ActiveMQConnection*>( connection );
                if( amqConnection != NULL ) {
                        amqConnection->addTransportListener( this );
                }

                connection->setExceptionListener(this);

                // Create a Session
                if( clientAck ) {
                        session = connection->createSession( 
Session::CLIENT_ACKNOWLEDGE );
                } else {
                        session = connection->createSession( 
Session::AUTO_ACKNOWLEDGE );
                }

                // Create the destination (Topic or Queue)
                if( useTopic ) {
                        destination = session->createTopic( destURI );
                } else {
                        destination = session->createQueue( destURI );
                }

                // Create a MessageConsumer from the Session to the Topic or 
Queue
                consumer = session->createConsumer( destination );
                consumer->setMessageListener( this );

        } catch (CMSException& e) {
                e.printStackTrace();
        }
}


// Called from the consumer since this class is a registered
MessageListener.
void activeMqConsumer::onMessage( const Message* message )
{
        INT32 msgSize;
        INT32 bytesRead = 0;
        FLOAT64 latency = 0;
        TIME currentTime;
        INT32 rval;

        rval = SDL_SemWait(this->pElementAvailable);
        if (rval == -1)
        {
                printf("(onMessage) error: SDL_SemWait failed\n");
        }

        try
        {
                // cast the activeMq "message" into an array of bytes
                const cms::BytesMessage *bytesMessage = dynamic_cast<const
cms::BytesMessage *>(message);
            if(bytesMessage != NULL)
            {
                msgSize = bytesMessage->getBodyLength();
                        bytesRead = bytesMessage->readBytes((unsigned 
char*)this->pInputMsg,
msgSize);
                        totalBytesRecvd += msgSize;
                        totalMsgsRecvd++;
                        // the assumption here is that you receive messages 
after they have been
sent
                        // if the times between processing modules are not 
synchronized all bets
are off.
                        getSystemTime(&(currentTime));
                        latency = (FLOAT64)currentTime - 
(FLOAT64)this->pInputMsg->hdr.time;
                        this->accruedLatency += latency;

              if(clientAck == TRUE)
              {
                 message->acknowledge();
              }
            }
        }
   catch(cms::CMSException &exception)
   {
      exception.printStackTrace();
   }

        if(startTime == 0)
        {
                getSystemTime(&(startTime));
        }

        // indicate that there is data available
        rval = SDL_SemPost(this->pDataAvailable);
        if (rval == -1)
        {
                printf("(enqueue) error: SDL_SemPost failed\n");
        }

}

Producer code:

activeMqProducer::activeMqProducer( const std::string& brokerURI,
                         const std::string& destURI,
                         BOOLEAN useTopic)
{
                // Create a ConnectionFactory
            auto_ptr<ConnectionFactory>
connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )
);

            // Create a Connection
            connection = connectionFactory->createConnection();

            sessionTransacted = false;

            // Create a Session
            if( sessionTransacted )
            {
                session = connection->createSession( 
Session::SESSION_TRANSACTED );
            }
            else
            {
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE 
);
            }

            // Create the destination (Topic or Queue)
            if( useTopic )
            {
                destination = session->createTopic( destURI );
            }
            else
            {
                destination = session->createQueue( destURI );
            }

            // Create a MessageProducer from the Session to the Topic or Queue
            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

            message = session->createBytesMessage();

}


void activeMqProducer::send(IO_MSG *pOutputMsg)
{
        // Set the send time in the IO message
        getSystemTime(&(pOutputMsg->hdr.time));
        // Convert the message into an activeMQ bytesMessage
        message->setBodyBytes( (const unsigned char*)pOutputMsg,
pOutputMsg->hdr.msgSize + sizeof(MSG_HDR));
        // Send to broker
        producer->send( message );
}

Any suggestions on how to speed up the data rates and stop the producer from
locking up would be greatly appreciated. 

V/R
~Joe

--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Activemq-CMS-high-data-rate-lockup-tp3670718p3670718.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to