Hi,

Is the transaction mechanism in ActiveMQ is for the whole broker or concerns
only the queues involved in the transaction ?

Regards
Hervé






2011/8/16 Hervé BARRAULT <herve.barra...@gmail.com>

> Hi,
> I have the following configuration :
> 2 producers send message in a queue using camel producer template.
> I have one consumer which finally send 2 messages in 2 different queues.
> (using camel route)
> and for those queues i have for each 1 consumer.
>
> Globally, It works with 3 queues.
>
> I am ActiveMQ 5.4.0 using vm transport and jdbc persistence. (flow control
> is disabled in this case).
> When trying some performance tests :
>
> When the 2 producers are working it seems ok (bad performances but no
> contention).
>
> When the producers stop (i have a lot of enqueued messages), I see thread
> contention for VMTransport threads.
>
> I see 5 live threads (these threads are changing).
>
>
>
> Stacks at 04:36:09 PM (uptime 1:33:37)
>
> VMTransport [RUNNABLE, IN_NATIVE] CPU time: 0:01
> java.net.SocketInputStream.socketRead0(FileDescriptor, byte[], int, int,
> int)
> java.net.SocketInputStream.read(byte[], int, int)
> org.postgresql.core.VisibleBufferedInputStream.readMore(int)
> org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)
> org.postgresql.core.VisibleBufferedInputStream.read()
> org.postgresql.core.PGStream.ReceiveChar()
> org.postgresql.core.v3.QueryExecutorImpl.processResults(ResultHandler, int)
> org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList,
> ResultHandler, int, int, int)
>
> org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query)
> org.postgresql.jdbc2.AbstractJdbc2Connection.commit()
> org.apache.commons.dbcp.DelegatingConnection.commit()
>
> org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapper.commit()
> org.apache.activemq.store.jdbc.TransactionContext.commit()
>
> org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.commitTransaction(ConnectionContext)
> org.apache.activemq.store.memory.MemoryTransactionStore$Tx.commit()
> org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
> boolean, Runnable, Runnable)
> org.apache.activemq.transaction.LocalTransaction.commit(boolean)
> org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
> TransactionId, boolean)
> org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
> TransactionId, boolean)
>
> org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
> org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
> org.apache.activemq.broker.TransportConnection.service(Command)
> org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
> VMTransport [BLOCKED] CPU time: 0:18
> org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
> boolean, Runnable, Runnable)
> org.apache.activemq.transaction.LocalTransaction.commit(boolean)
> org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
> TransactionId, boolean)
> org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
> TransactionId, boolean)
>
> org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
> org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
> org.apache.activemq.broker.TransportConnection.service(Command)
> org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
> VMTransport [BLOCKED] CPU time: 0:01
> org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
> boolean, Runnable, Runnable)
> org.apache.activemq.transaction.LocalTransaction.commit(boolean)
> org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
> TransactionId, boolean)
> org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
> TransactionId, boolean)
>
> org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
> org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
> org.apache.activemq.broker.TransportConnection.service(Command)
> org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
>
> VMTransport [WAITING] CPU time: 0:09
> sun.misc.Unsafe.park(boolean, long)
> java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
> boolean, long)
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
> boolean, long)
> java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
> java.util.concurrent.ThreadPoolExecutor.getTask()
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
> VMTransport [WAITING] CPU time: 0:01
> sun.misc.Unsafe.park(boolean, long)
> java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
> boolean, long)
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
> boolean, long)
> java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
> java.util.concurrent.ThreadPoolExecutor.getTask()
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
>
>
> I have some difficulties to understand why i have 2 BLOCKED threads and 1
> Running in this case.
>
> Thanks for help.
>
>
>
>
> 2011/8/10 Hervé BARRAULT <herve.barra...@gmail.com>
>
>> Hi, i have done another experiment :
>> I have tried something to see the influence of messages on ActiveMQ
>> behavior.
>>
>> I try to send 20 messages per seconds during 20 seconds into the broker
>> and use a processor to dequeue the messages.
>> If there is no other messages, it is ok
>> If i create a false queue with 16k messages it is always working
>> If i create a false queue with 24k messages it takes about 25 seconds to
>> do all the job.
>> If i create a false queue with 32k messages it takes about 55 seconds to
>> do all the job.
>>
>> Is there something to do to avoid that a queue has an influence on other
>> queues?
>> Regards
>>
>>
>>
>> 2011/8/1 Hervé BARRAULT <herve.barra...@gmail.com>
>>
>>> Hi,
>>> I have looked to the purge mechanism to try to understand why it takes so
>>> much time to clean a queue.
>>>
>>> I have noticed something :
>>>
>>> ####
>>> CLASS org.apache.activemq.store.jdbc.Statements
>>> public String getFindMessageSequenceIdStatement() {
>>>     if (findMessageSequenceIdStatement == null) {
>>>         findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
>>> getFullMessageTableName()
>>>             + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
>>>     }
>>>     return findMessageSequenceIdStatement;
>>> }
>>>
>>> public String getRemoveMessageStatement() {
>>>         if (removeMessageStatement == null) {
>>>             removeMessageStatement = "DELETE FROM " +
>>> getFullMessageTableName() + " WHERE ID=?";
>>>         }
>>>         return removeMessageStatement;
>>>     }
>>> ###
>>>
>>> ###
>>> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore
>>>
>>> private long getStoreSequenceIdForMessageId(MessageId messageId) throws
>>> IOException {
>>>         long result = -1;
>>>         TransactionContext c =
>>> persistenceAdapter.getTransactionContext();
>>>         try {
>>>             result = adapter.getStoreSequenceId(c, destination,
>>> messageId)[0];
>>>         } catch (SQLException e) {
>>>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>>             throw IOExceptionSupport.create("Failed to get store
>>> sequenceId for messageId: " + messageId +", on: " + destination + ". Reason:
>>> " + e, e);
>>>         } finally {
>>>             c.close();
>>>         }
>>>         return result;
>>>     }
>>>
>>> public void removeMessage(ConnectionContext context, MessageAck ack)
>>> throws IOException {
>>>
>>>         long seq =
>>> getStoreSequenceIdForMessageId(ack.getLastMessageId());
>>>
>>>         // Get a connection and remove the message from the DB
>>>         TransactionContext c =
>>> persistenceAdapter.getTransactionContext(context);
>>>         try {
>>>             adapter.doRemoveMessage(c, seq);
>>>         } catch (SQLException e) {
>>>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>>             throw IOExceptionSupport.create("Failed to broker message: "
>>> + ack.getLastMessageId() + " in container: " + e, e);
>>>         } finally {
>>>             c.close();
>>>         }
>>>     }
>>> ###
>>>
>>> Could it be better to use only one request to remove a row, which is :
>>> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
>>> MSGID_SEQ=? AND CONTAINER=?" ?
>>> or is there a case where it is not working ?
>>>
>>> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and
>>> CONTAINER (this index could be UNIQUE ?) ?
>>>
>>> I have checked that it takes 1min30 to dequeue my 16000 messages using
>>> the purge method with these two modifications. I noticed that the page size
>>> is 200 messages and it waits every 200 row to read from the database to be
>>> able to purge it, i never noticed this limit before.
>>>
>>> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
>>> purge a queue (does it can also apply to other components reading the queue
>>> ?).
>>>
>>> I don't have enough JMS broker and database mechanism knowledges to say
>>> if it is a good or a bad idea.
>>>
>>> Anyone can help me for this ?
>>>
>>> Thanks for answers.
>>> Hervé
>>>
>>>
>>> 2011/7/29 Hervé BARRAULT <herve.barra...@gmail.com>
>>>
>>>> Hi,
>>>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2
>>>> server) and i am doing some performance tests.
>>>>
>>>> I'm sending 16000 messages through web services (using one port) and it
>>>> takes about 1 min to manage all messages and fill the JMS queue.
>>>>
>>>> When i use a consumer to dequeue this queue (without adding new
>>>> messages) and fill another one, it takes about 6 min and 30 seconds. I was
>>>> expecting that i have some code which slow down the consumption.
>>>>
>>>> But i have tried to use the purge method and it takes the same time.
>>>>
>>>> Is there a way to increase the consumption rate ?
>>>>
>>>> Thanks for answers.
>>>>
>>>
>>>
>>
>

Reply via email to