Hi, Is the transaction mechanism in ActiveMQ is for the whole broker or concerns only the queues involved in the transaction ?
Regards Hervé 2011/8/16 Hervé BARRAULT <herve.barra...@gmail.com> > Hi, > I have the following configuration : > 2 producers send message in a queue using camel producer template. > I have one consumer which finally send 2 messages in 2 different queues. > (using camel route) > and for those queues i have for each 1 consumer. > > Globally, It works with 3 queues. > > I am ActiveMQ 5.4.0 using vm transport and jdbc persistence. (flow control > is disabled in this case). > When trying some performance tests : > > When the 2 producers are working it seems ok (bad performances but no > contention). > > When the producers stop (i have a lot of enqueued messages), I see thread > contention for VMTransport threads. > > I see 5 live threads (these threads are changing). > > > > Stacks at 04:36:09 PM (uptime 1:33:37) > > VMTransport [RUNNABLE, IN_NATIVE] CPU time: 0:01 > java.net.SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, > int) > java.net.SocketInputStream.read(byte[], int, int) > org.postgresql.core.VisibleBufferedInputStream.readMore(int) > org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int) > org.postgresql.core.VisibleBufferedInputStream.read() > org.postgresql.core.PGStream.ReceiveChar() > org.postgresql.core.v3.QueryExecutorImpl.processResults(ResultHandler, int) > org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, > ResultHandler, int, int, int) > > org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query) > org.postgresql.jdbc2.AbstractJdbc2Connection.commit() > org.apache.commons.dbcp.DelegatingConnection.commit() > > org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapper.commit() > org.apache.activemq.store.jdbc.TransactionContext.commit() > > org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.commitTransaction(ConnectionContext) > org.apache.activemq.store.memory.MemoryTransactionStore$Tx.commit() > org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId, > boolean, Runnable, Runnable) > org.apache.activemq.transaction.LocalTransaction.commit(boolean) > org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext, > TransactionId, boolean) > org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext, > TransactionId, boolean) > > org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo) > org.apache.activemq.command.TransactionInfo.visit(CommandVisitor) > org.apache.activemq.broker.TransportConnection.service(Command) > org.apache.activemq.broker.TransportConnection$1.onCommand(Object) > org.apache.activemq.transport.ResponseCorrelator.onCommand(Object) > org.apache.activemq.transport.TransportFilter.onCommand(Object) > org.apache.activemq.transport.vm.VMTransport.iterate() > org.apache.activemq.thread.PooledTaskRunner.runTask() > org.apache.activemq.thread.PooledTaskRunner$1.run() > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable) > java.util.concurrent.ThreadPoolExecutor$Worker.run() > java.lang.Thread.run() > > VMTransport [BLOCKED] CPU time: 0:18 > org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId, > boolean, Runnable, Runnable) > org.apache.activemq.transaction.LocalTransaction.commit(boolean) > org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext, > TransactionId, boolean) > org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext, > TransactionId, boolean) > > org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo) > org.apache.activemq.command.TransactionInfo.visit(CommandVisitor) > org.apache.activemq.broker.TransportConnection.service(Command) > org.apache.activemq.broker.TransportConnection$1.onCommand(Object) > org.apache.activemq.transport.ResponseCorrelator.onCommand(Object) > org.apache.activemq.transport.TransportFilter.onCommand(Object) > org.apache.activemq.transport.vm.VMTransport.iterate() > org.apache.activemq.thread.PooledTaskRunner.runTask() > org.apache.activemq.thread.PooledTaskRunner$1.run() > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable) > java.util.concurrent.ThreadPoolExecutor$Worker.run() > java.lang.Thread.run() > > VMTransport [BLOCKED] CPU time: 0:01 > org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId, > boolean, Runnable, Runnable) > org.apache.activemq.transaction.LocalTransaction.commit(boolean) > org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext, > TransactionId, boolean) > org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext, > TransactionId, boolean) > > org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo) > org.apache.activemq.command.TransactionInfo.visit(CommandVisitor) > org.apache.activemq.broker.TransportConnection.service(Command) > org.apache.activemq.broker.TransportConnection$1.onCommand(Object) > org.apache.activemq.transport.ResponseCorrelator.onCommand(Object) > org.apache.activemq.transport.TransportFilter.onCommand(Object) > org.apache.activemq.transport.vm.VMTransport.iterate() > org.apache.activemq.thread.PooledTaskRunner.runTask() > org.apache.activemq.thread.PooledTaskRunner$1.run() > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable) > java.util.concurrent.ThreadPoolExecutor$Worker.run() > java.lang.Thread.run() > > > VMTransport [WAITING] CPU time: 0:09 > sun.misc.Unsafe.park(boolean, long) > java.util.concurrent.locks.LockSupport.parkNanos(Object, long) > java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode, > boolean, long) > java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object, > boolean, long) > java.util.concurrent.SynchronousQueue.poll(long, TimeUnit) > java.util.concurrent.ThreadPoolExecutor.getTask() > java.util.concurrent.ThreadPoolExecutor$Worker.run() > java.lang.Thread.run() > > VMTransport [WAITING] CPU time: 0:01 > sun.misc.Unsafe.park(boolean, long) > java.util.concurrent.locks.LockSupport.parkNanos(Object, long) > java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode, > boolean, long) > java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object, > boolean, long) > java.util.concurrent.SynchronousQueue.poll(long, TimeUnit) > java.util.concurrent.ThreadPoolExecutor.getTask() > java.util.concurrent.ThreadPoolExecutor$Worker.run() > java.lang.Thread.run() > > > I have some difficulties to understand why i have 2 BLOCKED threads and 1 > Running in this case. > > Thanks for help. > > > > > 2011/8/10 Hervé BARRAULT <herve.barra...@gmail.com> > >> Hi, i have done another experiment : >> I have tried something to see the influence of messages on ActiveMQ >> behavior. >> >> I try to send 20 messages per seconds during 20 seconds into the broker >> and use a processor to dequeue the messages. >> If there is no other messages, it is ok >> If i create a false queue with 16k messages it is always working >> If i create a false queue with 24k messages it takes about 25 seconds to >> do all the job. >> If i create a false queue with 32k messages it takes about 55 seconds to >> do all the job. >> >> Is there something to do to avoid that a queue has an influence on other >> queues? >> Regards >> >> >> >> 2011/8/1 Hervé BARRAULT <herve.barra...@gmail.com> >> >>> Hi, >>> I have looked to the purge mechanism to try to understand why it takes so >>> much time to clean a queue. >>> >>> I have noticed something : >>> >>> #### >>> CLASS org.apache.activemq.store.jdbc.Statements >>> public String getFindMessageSequenceIdStatement() { >>> if (findMessageSequenceIdStatement == null) { >>> findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " + >>> getFullMessageTableName() >>> + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?"; >>> } >>> return findMessageSequenceIdStatement; >>> } >>> >>> public String getRemoveMessageStatement() { >>> if (removeMessageStatement == null) { >>> removeMessageStatement = "DELETE FROM " + >>> getFullMessageTableName() + " WHERE ID=?"; >>> } >>> return removeMessageStatement; >>> } >>> ### >>> >>> ### >>> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore >>> >>> private long getStoreSequenceIdForMessageId(MessageId messageId) throws >>> IOException { >>> long result = -1; >>> TransactionContext c = >>> persistenceAdapter.getTransactionContext(); >>> try { >>> result = adapter.getStoreSequenceId(c, destination, >>> messageId)[0]; >>> } catch (SQLException e) { >>> JDBCPersistenceAdapter.log("JDBC Failure: ", e); >>> throw IOExceptionSupport.create("Failed to get store >>> sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: >>> " + e, e); >>> } finally { >>> c.close(); >>> } >>> return result; >>> } >>> >>> public void removeMessage(ConnectionContext context, MessageAck ack) >>> throws IOException { >>> >>> long seq = >>> getStoreSequenceIdForMessageId(ack.getLastMessageId()); >>> >>> // Get a connection and remove the message from the DB >>> TransactionContext c = >>> persistenceAdapter.getTransactionContext(context); >>> try { >>> adapter.doRemoveMessage(c, seq); >>> } catch (SQLException e) { >>> JDBCPersistenceAdapter.log("JDBC Failure: ", e); >>> throw IOExceptionSupport.create("Failed to broker message: " >>> + ack.getLastMessageId() + " in container: " + e, e); >>> } finally { >>> c.close(); >>> } >>> } >>> ### >>> >>> Could it be better to use only one request to remove a row, which is : >>> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND >>> MSGID_SEQ=? AND CONTAINER=?" ? >>> or is there a case where it is not working ? >>> >>> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and >>> CONTAINER (this index could be UNIQUE ?) ? >>> >>> I have checked that it takes 1min30 to dequeue my 16000 messages using >>> the purge method with these two modifications. I noticed that the page size >>> is 200 messages and it waits every 200 row to read from the database to be >>> able to purge it, i never noticed this limit before. >>> >>> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to >>> purge a queue (does it can also apply to other components reading the queue >>> ?). >>> >>> I don't have enough JMS broker and database mechanism knowledges to say >>> if it is a good or a bad idea. >>> >>> Anyone can help me for this ? >>> >>> Thanks for answers. >>> Hervé >>> >>> >>> 2011/7/29 Hervé BARRAULT <herve.barra...@gmail.com> >>> >>>> Hi, >>>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2 >>>> server) and i am doing some performance tests. >>>> >>>> I'm sending 16000 messages through web services (using one port) and it >>>> takes about 1 min to manage all messages and fill the JMS queue. >>>> >>>> When i use a consumer to dequeue this queue (without adding new >>>> messages) and fill another one, it takes about 6 min and 30 seconds. I was >>>> expecting that i have some code which slow down the consumption. >>>> >>>> But i have tried to use the purge method and it takes the same time. >>>> >>>> Is there a way to increase the consumption rate ? >>>> >>>> Thanks for answers. >>>> >>> >>> >> >