Hi, I have the following configuration : 2 producers send message in a queue using camel producer template. I have one consumer which finally send 2 messages in 2 different queues. (using camel route) and for those queues i have for each 1 consumer.
Globally, It works with 3 queues. I am ActiveMQ 5.4.0 using vm transport and jdbc persistence. (flow control is disabled in this case). When trying some performance tests : When the 2 producers are working it seems ok (bad performances but no contention). When the producers stop (i have a lot of enqueued messages), I see thread contention for VMTransport threads. I see 5 live threads (these threads are changing). Stacks at 04:36:09 PM (uptime 1:33:37) VMTransport [RUNNABLE, IN_NATIVE] CPU time: 0:01 java.net.SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, int) java.net.SocketInputStream.read(byte[], int, int) org.postgresql.core.VisibleBufferedInputStream.readMore(int) org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int) org.postgresql.core.VisibleBufferedInputStream.read() org.postgresql.core.PGStream.ReceiveChar() org.postgresql.core.v3.QueryExecutorImpl.processResults(ResultHandler, int) org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, ResultHandler, int, int, int) org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query) org.postgresql.jdbc2.AbstractJdbc2Connection.commit() org.apache.commons.dbcp.DelegatingConnection.commit() org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapper.commit() org.apache.activemq.store.jdbc.TransactionContext.commit() org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.commitTransaction(ConnectionContext) org.apache.activemq.store.memory.MemoryTransactionStore$Tx.commit() org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId, boolean, Runnable, Runnable) org.apache.activemq.transaction.LocalTransaction.commit(boolean) org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext, TransactionId, boolean) org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext, TransactionId, boolean) org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo) org.apache.activemq.command.TransactionInfo.visit(CommandVisitor) org.apache.activemq.broker.TransportConnection.service(Command) org.apache.activemq.broker.TransportConnection$1.onCommand(Object) org.apache.activemq.transport.ResponseCorrelator.onCommand(Object) org.apache.activemq.transport.TransportFilter.onCommand(Object) org.apache.activemq.transport.vm.VMTransport.iterate() org.apache.activemq.thread.PooledTaskRunner.runTask() org.apache.activemq.thread.PooledTaskRunner$1.run() java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable) java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() VMTransport [BLOCKED] CPU time: 0:18 org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId, boolean, Runnable, Runnable) org.apache.activemq.transaction.LocalTransaction.commit(boolean) org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext, TransactionId, boolean) org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext, TransactionId, boolean) org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo) org.apache.activemq.command.TransactionInfo.visit(CommandVisitor) org.apache.activemq.broker.TransportConnection.service(Command) org.apache.activemq.broker.TransportConnection$1.onCommand(Object) org.apache.activemq.transport.ResponseCorrelator.onCommand(Object) org.apache.activemq.transport.TransportFilter.onCommand(Object) org.apache.activemq.transport.vm.VMTransport.iterate() org.apache.activemq.thread.PooledTaskRunner.runTask() org.apache.activemq.thread.PooledTaskRunner$1.run() java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable) java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() VMTransport [BLOCKED] CPU time: 0:01 org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId, boolean, Runnable, Runnable) org.apache.activemq.transaction.LocalTransaction.commit(boolean) org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext, TransactionId, boolean) org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext, TransactionId, boolean) org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo) org.apache.activemq.command.TransactionInfo.visit(CommandVisitor) org.apache.activemq.broker.TransportConnection.service(Command) org.apache.activemq.broker.TransportConnection$1.onCommand(Object) org.apache.activemq.transport.ResponseCorrelator.onCommand(Object) org.apache.activemq.transport.TransportFilter.onCommand(Object) org.apache.activemq.transport.vm.VMTransport.iterate() org.apache.activemq.thread.PooledTaskRunner.runTask() org.apache.activemq.thread.PooledTaskRunner$1.run() java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable) java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() VMTransport [WAITING] CPU time: 0:09 sun.misc.Unsafe.park(boolean, long) java.util.concurrent.locks.LockSupport.parkNanos(Object, long) java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode, boolean, long) java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object, boolean, long) java.util.concurrent.SynchronousQueue.poll(long, TimeUnit) java.util.concurrent.ThreadPoolExecutor.getTask() java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() VMTransport [WAITING] CPU time: 0:01 sun.misc.Unsafe.park(boolean, long) java.util.concurrent.locks.LockSupport.parkNanos(Object, long) java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode, boolean, long) java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object, boolean, long) java.util.concurrent.SynchronousQueue.poll(long, TimeUnit) java.util.concurrent.ThreadPoolExecutor.getTask() java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.run() I have some difficulties to understand why i have 2 BLOCKED threads and 1 Running in this case. Thanks for help. 2011/8/10 Hervé BARRAULT <herve.barra...@gmail.com> > Hi, i have done another experiment : > I have tried something to see the influence of messages on ActiveMQ > behavior. > > I try to send 20 messages per seconds during 20 seconds into the broker and > use a processor to dequeue the messages. > If there is no other messages, it is ok > If i create a false queue with 16k messages it is always working > If i create a false queue with 24k messages it takes about 25 seconds to do > all the job. > If i create a false queue with 32k messages it takes about 55 seconds to do > all the job. > > Is there something to do to avoid that a queue has an influence on other > queues? > Regards > > > > 2011/8/1 Hervé BARRAULT <herve.barra...@gmail.com> > >> Hi, >> I have looked to the purge mechanism to try to understand why it takes so >> much time to clean a queue. >> >> I have noticed something : >> >> #### >> CLASS org.apache.activemq.store.jdbc.Statements >> public String getFindMessageSequenceIdStatement() { >> if (findMessageSequenceIdStatement == null) { >> findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " + >> getFullMessageTableName() >> + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?"; >> } >> return findMessageSequenceIdStatement; >> } >> >> public String getRemoveMessageStatement() { >> if (removeMessageStatement == null) { >> removeMessageStatement = "DELETE FROM " + >> getFullMessageTableName() + " WHERE ID=?"; >> } >> return removeMessageStatement; >> } >> ### >> >> ### >> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore >> >> private long getStoreSequenceIdForMessageId(MessageId messageId) throws >> IOException { >> long result = -1; >> TransactionContext c = persistenceAdapter.getTransactionContext(); >> try { >> result = adapter.getStoreSequenceId(c, destination, >> messageId)[0]; >> } catch (SQLException e) { >> JDBCPersistenceAdapter.log("JDBC Failure: ", e); >> throw IOExceptionSupport.create("Failed to get store >> sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: >> " + e, e); >> } finally { >> c.close(); >> } >> return result; >> } >> >> public void removeMessage(ConnectionContext context, MessageAck ack) >> throws IOException { >> >> long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId()); >> >> // Get a connection and remove the message from the DB >> TransactionContext c = >> persistenceAdapter.getTransactionContext(context); >> try { >> adapter.doRemoveMessage(c, seq); >> } catch (SQLException e) { >> JDBCPersistenceAdapter.log("JDBC Failure: ", e); >> throw IOExceptionSupport.create("Failed to broker message: " + >> ack.getLastMessageId() + " in container: " + e, e); >> } finally { >> c.close(); >> } >> } >> ### >> >> Could it be better to use only one request to remove a row, which is : >> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND >> MSGID_SEQ=? AND CONTAINER=?" ? >> or is there a case where it is not working ? >> >> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and >> CONTAINER >> (this index could be UNIQUE ?) ? >> >> I have checked that it takes 1min30 to dequeue my 16000 messages using the >> purge method with these two modifications. I noticed that the page size is >> 200 messages and it waits every 200 row to read from the database to be able >> to purge it, i never noticed this limit before. >> >> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to >> purge a queue (does it can also apply to other components reading the queue >> ?). >> >> I don't have enough JMS broker and database mechanism knowledges to say if >> it is a good or a bad idea. >> >> Anyone can help me for this ? >> >> Thanks for answers. >> Hervé >> >> >> 2011/7/29 Hervé BARRAULT <herve.barra...@gmail.com> >> >>> Hi, >>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2 >>> server) and i am doing some performance tests. >>> >>> I'm sending 16000 messages through web services (using one port) and it >>> takes about 1 min to manage all messages and fill the JMS queue. >>> >>> When i use a consumer to dequeue this queue (without adding new messages) >>> and fill another one, it takes about 6 min and 30 seconds. I was expecting >>> that i have some code which slow down the consumption. >>> >>> But i have tried to use the purge method and it takes the same time. >>> >>> Is there a way to increase the consumption rate ? >>> >>> Thanks for answers. >>> >> >> >