Hello all
After hacking up most of our application, I came up with the following
simple (I hope?) test case:
public final class SlowConsumerTest {
public static void main(final String[] args) throws JMSException,
InterruptedException, IOException {
class Listener implements MessageListener {
public void onMessage(final Message message) {
System.out.println("GOT A MESSAGE BEING SLOW");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
final Topic topic = new ActiveMQTopic("topic");
final String brokerURL =
"vm://localhost?broker.useJmx=false&broker.persistent=false";
Connection connection = new
ActiveMQConnectionFactory(brokerURL).createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
session.createConsumer(topic).setMessageListener(new Listener());
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
while (true) {
BytesMessage message = new ActiveMQBytesMessage();
try {
message.writeBytes(new byte[2048]);
producer.send(message);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
}
Whether it hangs or not depends seems to depend on the message size and
the VM's heap size arguments.
With 2048 byte messages with -Xms384m -Xmx512m it hangs reliably within
about 7 messages received by the slow consumer on Windows XP SP2 with
JDK 1.5.0_10 and 1.6.0 and on Linux with JDK 1.6.0.
In cases where it doesn't hang, one typically sees a "out of heap space"
error after about 20 or 30 messages, but the messages keep going for
some reason.
When it hangs, there are 2 or 3 threads running.
The main thread's stack:
Name: main
State: WAITING on [EMAIL PROTECTED]
Total blocked: 2 Total waited: 2
Stack trace:
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:485)
org.apache.activemq.memory.UsageManager.waitForSpace(UsageManager.java:91)
org.apache.activemq.memory.UsageManager.waitForSpace(UsageManager.java:88)
org.apache.activemq.broker.region.Topic.send(Topic.java:248)
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:305)
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:381)
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:197)
org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:98)
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:136)
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:449)
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:604)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:258)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:164)
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:95)
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:65)
org.apache.activemq.transport.vm.VMTransport.syncOneWay(VMTransport.java:99)
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:86)
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
- locked [EMAIL PROTECTED]
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1165)
org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1545)
- locked [EMAIL PROTECTED]
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:473)
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:358)
SlowConsumerTest.main(SlowConsumerTest.java:41)
The ActiveMQ Session task's stack:
Name: ActiveMQ Session Task
State: BLOCKED on [EMAIL PROTECTED] owned by: main
Total blocked: 3 Total waited: 8
Stack trace:
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:43)
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1165)
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1647)
org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:700)
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:871)
- locked [EMAIL PROTECTED]
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:99)
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:166)
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111)
org.apache.activemq.thread.PooledTaskRunner.access$1(PooledTaskRunner.java:95)
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
java.lang.Thread.run(Thread.java:619)
The third thread is some kind of timer thread that disappears after a
while.
We teested with the latest ActiveMQ 4.2 from trunk and 4.2 checked out
a few weeks ago -- same problem in both cases.
We'd simply like our producer to block waiting for space if the
consumers can't keep up -- this is what I expected this code to do (or
am I mistaken?). Naturally, if the consumers have a few messages to
send, we don't want them to block too, so some kind of per-destination
buffer would help here.
Should we be doing something differently to achieve this? We don't want
to spool to disk, since our data is already being read from a disk, so
if the producer dies, it can just restart and resend the data.
Thanks for all the help so far.
Cheers,
Albert
On Tue, 23 Jan 2007, Rob Davies wrote:
> reproduced - will fix shortly
> On 23 Jan 2007, at 07:26, James Strachan wrote:
>
> >I thought that the new spool-to-disk feature should kick in
> >automatically - obviously not :). Anyone know if that has to be
> >explicitly enabled?
> >
> >BTW are you explicitly disabling persistence on ActiveMQ? I think you
> >might need to keep it enabled for spooling to disk to work.