Hi Rob,

I thought that would be your answer - fair enough.

I've attached a unit test which demonstrates the problem. I tried to make the code as small as possible.

In this situation, the issue (program hangs) seems to happen when QUEUE_PREFETCH_SIZE == 1 and NUM_WORKERS = 2 (90% of the time). Changing NUM_WORKERS to 1 seems to make it work.

Increasing the QUEUE_PREFETCH_SIZE also seems to make things work.

Thanks for looking into this. I am really interested to find out what the culprit is. I am a relative novice with JMS, so I wouldn't be surprised if it is something I have done wrong.

Cheers,
David

Rob Davies wrote:
could you build a junit test case to demonstrate this ? This would be most hopeful


cheers,

Rob

'Go Get Integrated -  ride the Camel! - http://activemq.apache.org/camel/'
http://rajdavies.blogspot.com/



On Aug 23, 2007, at 9:22 AM, David Sitsky wrote:

I have an application which is using the latest snapshot of ActiveMQ 5.0.

I have a master JVM process which sends an item to a work-item queue. I have worker JVM processes which pick up these items, process them, and possibly create new items into the work-item queue as a part of processing these messages.

The master sends a new work-item once the workers have completed all their work.

I have a situation where if there is only one worker JVM process (one consumer), everything works fine. If there are two workers, they complete all of their work, the master sends the next work item to the queue, but they never get notified of this new message.

I know the message is sent, via the JMX console, and that the worker subscriptions are still active.

What is strange is the worker's subscriptions have large PendingQueueSize values, which seems odd, since they aren't processing any more messages. The queue size is 1 after the master sends its new message.

If I create a new worker process, it immediately gets the message from the master!

This happens with or without transactions, and with all sorts of pre-fetch sizes, using the same or separate Connections.

I've started to run the broker in my IDEA debugger to try and understand what is going on - but the fact that the PendingQueueSize are quite large, does this explain why the old workers don't get new messages? Does this sound like a bug?

Any suggestions on what I should look for? I'm happy to dive into the code, but would appreciate some guidance.

Cheers,
David




--
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902
package test.activemq;

import java.io.Serializable;

import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;

import junit.framework.TestCase;

/**
 * Test case demonstrating situation where messages are not delivered to 
consumers.
 */
public class TestActiveMQ extends TestCase implements MessageListener
{
    /** The connection URL. */
    private static final String CONNECTION_URL = "tcp://localhost:61616";

    /** The queue prefetch size to use. A value greater than 1 seems to make 
things work. */
    private static final int QUEUE_PREFETCH_SIZE = 1;

    /** The number of workers to use.  A single worker with a prefetch of 1 
works. */
    private static final int NUM_WORKERS = 2;

    /** Embedded JMS broker. */
    private BrokerService broker;

    /** The master's producer object for creating work items. */
    private MessageProducer workItemProducer;

    /** The master's consumer object for consuming ack messages from workers. */
    private MessageConsumer masterItemConsumer;

    /** The number of acks received by the master. */
    private long acksReceived;

    /** The expected number of acks the master should receive. */
    private long expectedCount;

    /** Messages sent to the work-item queue. */
    private static class WorkMessage implements Serializable
    {
    }

    /**
     * The worker process.  Consume messages from the work-item queue, possibly 
creating
     * more messages to submit to the work-item queue.  For each work item, 
send an ack
     * to the master.
     */
    private static class Worker implements MessageListener
    {
        /** Counter shared between workers to decided when new work-item 
messages are created. */
        private static Integer counter = new Integer(0);

        /** Session to use. */
        private Session session;

        /** Producer for sending ack messages to the master. */
        private MessageProducer masterItemProducer;

        /** Producer for sending new work items to the work-items queue. */
        private MessageProducer workItemProducer;

        public Worker(Session session)
            throws JMSException
        {
            this.session = session;
            masterItemProducer = 
session.createProducer(session.createQueue("master-item"));
            Queue workItemQueue = session.createQueue("work-item");
            workItemProducer = session.createProducer(workItemQueue);
            MessageConsumer workItemConsumer = 
session.createConsumer(workItemQueue);
            workItemConsumer.setMessageListener(this);
        }

        public void onMessage(javax.jms.Message message)
        {
            try
            {
                boolean sendMessage = false;

                // Don't create a new work item for every 1000th message. */
                synchronized (counter)
                {
                    counter++;
                    if (counter % 1000 != 0)
                    {
                        sendMessage = true;
                    }
                }

                if (sendMessage)
                {
                    // Send new work item to work-item queue.
                    workItemProducer.send(session.createObjectMessage(
                            new WorkMessage()));
                }

                // Send ack to master.
                masterItemProducer.send(session.createObjectMessage(
                        new WorkMessage()));
            }
            catch (JMSException e)
            {
                throw new IllegalStateException("Something has gone wrong", e);
            }
        }

        /** Close of JMS resources used by worker. */
        public void close() throws JMSException
        {
            masterItemProducer.close();
            workItemProducer.close();
            session.close();
        }
    }

    /** Master message handler.  Process ack messages. */
    public synchronized void onMessage(javax.jms.Message message)
    {
        acksReceived++;
        if (acksReceived == expectedCount)
        {
            // If expected number of acks are received, wake up the main 
process.
            notify();
        }
        if (acksReceived % 100 == 0)
        {
            System.out.println("Master now has ack count of: " + acksReceived);
        }
    }

    protected void setUp() throws Exception
    {
        // Create the message broker.
        super.setUp();
        broker = new BrokerService();
        broker.setPersistent(false);
        broker.setUseJmx(true);
        broker.addConnector(CONNECTION_URL);
        broker.start();
    }

    protected void tearDown() throws Exception
    {
        // Shut down the message broker.
        broker.deleteAllMessages();
        broker.stop();
        super.tearDown();
    }

    public synchronized void testActiveMQ()
        throws Exception
    {
        // Create the connection to the broker.
        ActiveMQConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory(CONNECTION_URL);
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setQueuePrefetch(QUEUE_PREFETCH_SIZE);
        connectionFactory.setPrefetchPolicy(prefetchPolicy);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session masterSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
        workItemProducer = 
masterSession.createProducer(masterSession.createQueue("work-item"));
        masterItemConsumer = 
masterSession.createConsumer(masterSession.createQueue("master-item"));
        masterItemConsumer.setMessageListener(this);

        // Create the workers.
        Worker[] workers = new Worker[NUM_WORKERS];
        for (int i = 0; i < NUM_WORKERS; i++)
        {
            workers[i] = new Worker(connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE));
        }

        // Send a message to the work queue, and wait for the 1000 acks from 
the workers.
        expectedCount = 1000;
        acksReceived = 0;
        workItemProducer.send(masterSession.createObjectMessage(new 
WorkMessage()));
        while (acksReceived != expectedCount)
        {
            wait();
        }
        System.out.println("First batch received");

        // Send another message to the work queue, and wait for the next 1000 
acks.  It is
        // at this point where the workers never get notified of this message, 
as they
        // have a large pending queue.  Creating a new worker at this point 
however will
        // receive this new message.
        expectedCount = 2000;
        workItemProducer.send(masterSession.createObjectMessage(new 
WorkMessage()));
        while (acksReceived != expectedCount)
        {
            wait();
        }
        System.out.println("Second batch received");

        // Cleanup all JMS resources.
        for (int i = 0; i < NUM_WORKERS; i++)
        {
            workers[i].close();
        }
        masterSession.close();
        connection.close();
    }
}

Reply via email to