Hi,
I have the following configuration :
2 producers send message in a queue using camel producer template.
I have one consumer which finally send 2 messages in 2 different queues.
(using camel route)
and for those queues i have for each 1 consumer.

Globally, It works with 3 queues.

I am ActiveMQ 5.4.0 using vm transport and jdbc persistence. (flow control
is disabled in this case).
When trying some performance tests :

When the 2 producers are working it seems ok (bad performances but no
contention).

When the producers stop (i have a lot of enqueued messages), I see thread
contention for VMTransport threads.

I see 5 live threads (these threads are changing).



Stacks at 04:36:09 PM (uptime 1:33:37)

VMTransport [RUNNABLE, IN_NATIVE] CPU time: 0:01
java.net.SocketInputStream.socketRead0(FileDescriptor, byte[], int, int,
int)
java.net.SocketInputStream.read(byte[], int, int)
org.postgresql.core.VisibleBufferedInputStream.readMore(int)
org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)
org.postgresql.core.VisibleBufferedInputStream.read()
org.postgresql.core.PGStream.ReceiveChar()
org.postgresql.core.v3.QueryExecutorImpl.processResults(ResultHandler, int)
org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList,
ResultHandler, int, int, int)
org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query)
org.postgresql.jdbc2.AbstractJdbc2Connection.commit()
org.apache.commons.dbcp.DelegatingConnection.commit()
org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapper.commit()
org.apache.activemq.store.jdbc.TransactionContext.commit()
org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.commitTransaction(ConnectionContext)
org.apache.activemq.store.memory.MemoryTransactionStore$Tx.commit()
org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
boolean, Runnable, Runnable)
org.apache.activemq.transaction.LocalTransaction.commit(boolean)
org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
org.apache.activemq.broker.TransportConnection.service(Command)
org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()

VMTransport [BLOCKED] CPU time: 0:18
org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
boolean, Runnable, Runnable)
org.apache.activemq.transaction.LocalTransaction.commit(boolean)
org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
org.apache.activemq.broker.TransportConnection.service(Command)
org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()

VMTransport [BLOCKED] CPU time: 0:01
org.apache.activemq.store.memory.MemoryTransactionStore.commit(TransactionId,
boolean, Runnable, Runnable)
org.apache.activemq.transaction.LocalTransaction.commit(boolean)
org.apache.activemq.broker.TransactionBroker.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(ConnectionContext,
TransactionId, boolean)
org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransactionInfo)
org.apache.activemq.command.TransactionInfo.visit(CommandVisitor)
org.apache.activemq.broker.TransportConnection.service(Command)
org.apache.activemq.broker.TransportConnection$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()


VMTransport [WAITING] CPU time: 0:09
sun.misc.Unsafe.park(boolean, long)
java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
boolean, long)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
boolean, long)
java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
java.util.concurrent.ThreadPoolExecutor.getTask()
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()

VMTransport [WAITING] CPU time: 0:01
sun.misc.Unsafe.park(boolean, long)
java.util.concurrent.locks.LockSupport.parkNanos(Object, long)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue$TransferStack$SNode,
boolean, long)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(Object,
boolean, long)
java.util.concurrent.SynchronousQueue.poll(long, TimeUnit)
java.util.concurrent.ThreadPoolExecutor.getTask()
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()


I have some difficulties to understand why i have 2 BLOCKED threads and 1
Running in this case.

Thanks for help.



2011/8/10 Hervé BARRAULT <herve.barra...@gmail.com>

> Hi, i have done another experiment :
> I have tried something to see the influence of messages on ActiveMQ
> behavior.
>
> I try to send 20 messages per seconds during 20 seconds into the broker and
> use a processor to dequeue the messages.
> If there is no other messages, it is ok
> If i create a false queue with 16k messages it is always working
> If i create a false queue with 24k messages it takes about 25 seconds to do
> all the job.
> If i create a false queue with 32k messages it takes about 55 seconds to do
> all the job.
>
> Is there something to do to avoid that a queue has an influence on other
> queues?
> Regards
>
>
>
> 2011/8/1 Hervé BARRAULT <herve.barra...@gmail.com>
>
>> Hi,
>> I have looked to the purge mechanism to try to understand why it takes so
>> much time to clean a queue.
>>
>> I have noticed something :
>>
>> ####
>> CLASS org.apache.activemq.store.jdbc.Statements
>> public String getFindMessageSequenceIdStatement() {
>>     if (findMessageSequenceIdStatement == null) {
>>         findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " +
>> getFullMessageTableName()
>>             + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
>>     }
>>     return findMessageSequenceIdStatement;
>> }
>>
>> public String getRemoveMessageStatement() {
>>         if (removeMessageStatement == null) {
>>             removeMessageStatement = "DELETE FROM " +
>> getFullMessageTableName() + " WHERE ID=?";
>>         }
>>         return removeMessageStatement;
>>     }
>> ###
>>
>> ###
>> CLASS org.apache.activemq.store.jdbc.JDBCMessageStore
>>
>> private long getStoreSequenceIdForMessageId(MessageId messageId) throws
>> IOException {
>>         long result = -1;
>>         TransactionContext c = persistenceAdapter.getTransactionContext();
>>         try {
>>             result = adapter.getStoreSequenceId(c, destination,
>> messageId)[0];
>>         } catch (SQLException e) {
>>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>             throw IOExceptionSupport.create("Failed to get store
>> sequenceId for messageId: " + messageId +", on: " + destination + ". Reason:
>> " + e, e);
>>         } finally {
>>             c.close();
>>         }
>>         return result;
>>     }
>>
>> public void removeMessage(ConnectionContext context, MessageAck ack)
>> throws IOException {
>>
>>         long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
>>
>>         // Get a connection and remove the message from the DB
>>         TransactionContext c =
>> persistenceAdapter.getTransactionContext(context);
>>         try {
>>             adapter.doRemoveMessage(c, seq);
>>         } catch (SQLException e) {
>>             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>>             throw IOExceptionSupport.create("Failed to broker message: " +
>> ack.getLastMessageId() + " in container: " + e, e);
>>         } finally {
>>             c.close();
>>         }
>>     }
>> ###
>>
>> Could it be better to use only one request to remove a row, which is :
>> "DELETE FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND
>> MSGID_SEQ=? AND CONTAINER=?" ?
>> or is there a case where it is not working ?
>>
>> Could it be better to create an index based on MSGID_PROD, MSGID_SEQ and 
>> CONTAINER
>> (this index could be UNIQUE ?) ?
>>
>> I have checked that it takes 1min30 to dequeue my 16000 messages using the
>> purge method with these two modifications. I noticed that the page size is
>> 200 messages and it waits every 200 row to read from the database to be able
>> to purge it, i never noticed this limit before.
>>
>> So it increases the rate from about 40 msgs/sec to about 170 msgs/sec to
>> purge a queue (does it can also apply to other components reading the queue
>> ?).
>>
>> I don't have enough JMS broker and database mechanism knowledges to say if
>> it is a good or a bad idea.
>>
>> Anyone can help me for this ?
>>
>> Thanks for answers.
>> Hervé
>>
>>
>> 2011/7/29 Hervé BARRAULT <herve.barra...@gmail.com>
>>
>>> Hi,
>>> I am using ActiveMQ 5.4.0 with persistence (using an oracle 11g R2
>>> server) and i am doing some performance tests.
>>>
>>> I'm sending 16000 messages through web services (using one port) and it
>>> takes about 1 min to manage all messages and fill the JMS queue.
>>>
>>> When i use a consumer to dequeue this queue (without adding new messages)
>>> and fill another one, it takes about 6 min and 30 seconds. I was expecting
>>> that i have some code which slow down the consumption.
>>>
>>> But i have tried to use the purge method and it takes the same time.
>>>
>>> Is there a way to increase the consumption rate ?
>>>
>>> Thanks for answers.
>>>
>>
>>
>

Reply via email to