Hi Rob,
I thought that would be your answer - fair enough.
I've attached a unit test which demonstrates the problem. I tried to
make the code as small as possible.
In this situation, the issue (program hangs) seems to happen when
QUEUE_PREFETCH_SIZE == 1 and NUM_WORKERS = 2 (90% of the time).
Changing NUM_WORKERS to 1 seems to make it work.
Increasing the QUEUE_PREFETCH_SIZE also seems to make things work.
Thanks for looking into this. I am really interested to find out what
the culprit is. I am a relative novice with JMS, so I wouldn't be
surprised if it is something I have done wrong.
Cheers,
David
Rob Davies wrote:
could you build a junit test case to demonstrate this ? This would be
most hopeful
cheers,
Rob
'Go Get Integrated - ride the Camel! - http://activemq.apache.org/camel/'
http://rajdavies.blogspot.com/
On Aug 23, 2007, at 9:22 AM, David Sitsky wrote:
I have an application which is using the latest snapshot of ActiveMQ 5.0.
I have a master JVM process which sends an item to a work-item queue.
I have worker JVM processes which pick up these items, process them,
and possibly create new items into the work-item queue as a part of
processing these messages.
The master sends a new work-item once the workers have completed all
their work.
I have a situation where if there is only one worker JVM process (one
consumer), everything works fine. If there are two workers, they
complete all of their work, the master sends the next work item to the
queue, but they never get notified of this new message.
I know the message is sent, via the JMX console, and that the worker
subscriptions are still active.
What is strange is the worker's subscriptions have large
PendingQueueSize values, which seems odd, since they aren't processing
any more messages. The queue size is 1 after the master sends its new
message.
If I create a new worker process, it immediately gets the message from
the master!
This happens with or without transactions, and with all sorts of
pre-fetch sizes, using the same or separate Connections.
I've started to run the broker in my IDEA debugger to try and
understand what is going on - but the fact that the PendingQueueSize
are quite large, does this explain why the old workers don't get new
messages? Does this sound like a bug?
Any suggestions on what I should look for? I'm happy to dive into the
code, but would appreciate some guidance.
Cheers,
David
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
package test.activemq;
import java.io.Serializable;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import junit.framework.TestCase;
/**
* Test case demonstrating situation where messages are not delivered to
consumers.
*/
public class TestActiveMQ extends TestCase implements MessageListener
{
/** The connection URL. */
private static final String CONNECTION_URL = "tcp://localhost:61616";
/** The queue prefetch size to use. A value greater than 1 seems to make
things work. */
private static final int QUEUE_PREFETCH_SIZE = 1;
/** The number of workers to use. A single worker with a prefetch of 1
works. */
private static final int NUM_WORKERS = 2;
/** Embedded JMS broker. */
private BrokerService broker;
/** The master's producer object for creating work items. */
private MessageProducer workItemProducer;
/** The master's consumer object for consuming ack messages from workers. */
private MessageConsumer masterItemConsumer;
/** The number of acks received by the master. */
private long acksReceived;
/** The expected number of acks the master should receive. */
private long expectedCount;
/** Messages sent to the work-item queue. */
private static class WorkMessage implements Serializable
{
}
/**
* The worker process. Consume messages from the work-item queue, possibly
creating
* more messages to submit to the work-item queue. For each work item,
send an ack
* to the master.
*/
private static class Worker implements MessageListener
{
/** Counter shared between workers to decided when new work-item
messages are created. */
private static Integer counter = new Integer(0);
/** Session to use. */
private Session session;
/** Producer for sending ack messages to the master. */
private MessageProducer masterItemProducer;
/** Producer for sending new work items to the work-items queue. */
private MessageProducer workItemProducer;
public Worker(Session session)
throws JMSException
{
this.session = session;
masterItemProducer =
session.createProducer(session.createQueue("master-item"));
Queue workItemQueue = session.createQueue("work-item");
workItemProducer = session.createProducer(workItemQueue);
MessageConsumer workItemConsumer =
session.createConsumer(workItemQueue);
workItemConsumer.setMessageListener(this);
}
public void onMessage(javax.jms.Message message)
{
try
{
boolean sendMessage = false;
// Don't create a new work item for every 1000th message. */
synchronized (counter)
{
counter++;
if (counter % 1000 != 0)
{
sendMessage = true;
}
}
if (sendMessage)
{
// Send new work item to work-item queue.
workItemProducer.send(session.createObjectMessage(
new WorkMessage()));
}
// Send ack to master.
masterItemProducer.send(session.createObjectMessage(
new WorkMessage()));
}
catch (JMSException e)
{
throw new IllegalStateException("Something has gone wrong", e);
}
}
/** Close of JMS resources used by worker. */
public void close() throws JMSException
{
masterItemProducer.close();
workItemProducer.close();
session.close();
}
}
/** Master message handler. Process ack messages. */
public synchronized void onMessage(javax.jms.Message message)
{
acksReceived++;
if (acksReceived == expectedCount)
{
// If expected number of acks are received, wake up the main
process.
notify();
}
if (acksReceived % 100 == 0)
{
System.out.println("Master now has ack count of: " + acksReceived);
}
}
protected void setUp() throws Exception
{
// Create the message broker.
super.setUp();
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(true);
broker.addConnector(CONNECTION_URL);
broker.start();
}
protected void tearDown() throws Exception
{
// Shut down the message broker.
broker.deleteAllMessages();
broker.stop();
super.tearDown();
}
public synchronized void testActiveMQ()
throws Exception
{
// Create the connection to the broker.
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(CONNECTION_URL);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(QUEUE_PREFETCH_SIZE);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
Connection connection = connectionFactory.createConnection();
connection.start();
Session masterSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
workItemProducer =
masterSession.createProducer(masterSession.createQueue("work-item"));
masterItemConsumer =
masterSession.createConsumer(masterSession.createQueue("master-item"));
masterItemConsumer.setMessageListener(this);
// Create the workers.
Worker[] workers = new Worker[NUM_WORKERS];
for (int i = 0; i < NUM_WORKERS; i++)
{
workers[i] = new Worker(connection.createSession(false,
Session.AUTO_ACKNOWLEDGE));
}
// Send a message to the work queue, and wait for the 1000 acks from
the workers.
expectedCount = 1000;
acksReceived = 0;
workItemProducer.send(masterSession.createObjectMessage(new
WorkMessage()));
while (acksReceived != expectedCount)
{
wait();
}
System.out.println("First batch received");
// Send another message to the work queue, and wait for the next 1000
acks. It is
// at this point where the workers never get notified of this message,
as they
// have a large pending queue. Creating a new worker at this point
however will
// receive this new message.
expectedCount = 2000;
workItemProducer.send(masterSession.createObjectMessage(new
WorkMessage()));
while (acksReceived != expectedCount)
{
wait();
}
System.out.println("Second batch received");
// Cleanup all JMS resources.
for (int i = 0; i < NUM_WORKERS; i++)
{
workers[i].close();
}
masterSession.close();
connection.close();
}
}