Hi,

I managed to reproduce the problem in the simple sample below. What I do is to 
create a connection that is shared between a consuming thread and a producing 
thread. The consumer is slower than the producer. When I run this sample, 
either the consumer stops consuming although lots of messages are available, or 
the producer stops producing.

Anybody any ideas what's wrong or what I am doing wrong?



package test.jms;

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class ConsumingProducer {

        public final static String INITIAL_CONTEXT_FACTORY = 
"org.apache.activemq.jndi.ActiveMQInitialContextFactory";
        public final static String PROVIDER_URL = "tcp://localhost:61616";

        public final static String CONNECTION_FACTORY_NAME = 
"ConnectionFactory";
        public final static String DESTINATION_NAME = "TEST.QUEUE";

        public final static long WRITE_DELAY = 10;
        public final static long READ_DELAY = 1000;

        public static QueueConnectionFactory connectionFactory;
        public static QueueConnection connection;

        public static void initialize() throws NamingException, JMSException {
                Hashtable<String, String> env = new Hashtable<String, String>();
                env
                                .put(InitialContext.INITIAL_CONTEXT_FACTORY,
                                                INITIAL_CONTEXT_FACTORY);
                env.put(InitialContext.PROVIDER_URL, PROVIDER_URL);
                InitialContext initialContext = new InitialContext(env);

                connectionFactory = (QueueConnectionFactory) initialContext
                                .lookup(CONNECTION_FACTORY_NAME);

                connection = connectionFactory.createQueueConnection();
                connection.start();
        }

        public static void startConsumer() {

                Thread t = new Thread() {
                        public void run() {
                                try {
                                        QueueSession session = 
connection.createQueueSession(false,
                                                        
Session.AUTO_ACKNOWLEDGE);
                                        Queue queue = 
session.createQueue(DESTINATION_NAME);
                                        while (true) {
                                                MessageConsumer consumer = 
session
                                                                
.createConsumer(queue);
                                                Message msg = 
consumer.receive();
                                                if (msg instanceof TextMessage) 
{
                                                        
System.out.println(((TextMessage) msg).getText());
                                                } else if (msg != null) {
                                                        
System.out.println("Message received");
                                                }
                                                consumer.close();
                                                sleep(READ_DELAY);
                                        }
                                } catch (Exception ex) {
                                        ex.printStackTrace();
                                }
                        }
                };
                t.setName("Consumer");
                t.start();
        }

        public static void startProducer() {

                Thread t = new Thread() {
                        public void run() {
                                try {
                                        QueueSession session = 
connection.createQueueSession(false,
                                                        
Session.AUTO_ACKNOWLEDGE);
                                        Queue queue = 
session.createQueue(DESTINATION_NAME);
                                        int msgctr = 1;
                                        while (true) {
                                                MessageProducer producer = 
session
                                                                
.createProducer(queue);
                                                Message msg = 
session.createTextMessage("Message "
                                                                + msgctr++);
                                                producer.send(msg);
                                                producer.close();

                                                sleep(WRITE_DELAY);
                                        }
                                } catch (Exception ex) {
                                        ex.printStackTrace();
                                }
                        }
                };
                t.setName("Producer");
                t.start();
        }

        public static void main(String[] args) {
                try {
                        initialize();
                        startConsumer();
                        startProducer();
                } catch (Exception ex) {
                        ex.printStackTrace();
                }
        }
}




-----Original Message-----
From: Joe Fernandez [mailto:[EMAIL PROTECTED]
Sent: maandag 13 oktober 2008 13:06
To: users@activemq.apache.org
Subject: Re: Concurrent use of connections


Sessions are single-threaded. So could the issue be related more to your use
of sessions?

Joe
Get a free ActiveMQ user guide @ http://www.ttmsolutions.com


Steven Van Loon-2 wrote:
>
> Hi all,
>
> Is anybody aware of possible problems when (re)using a same Connection to
> activeMQ by different threads? According to the JMS specification,
> implementations should support concurrent use of Connections, so I created
> one connection to be used by all consumer / producers.
>
> However, I experienced the problem that from a certain moment, no more
> messages were read from the queue although a lot of messages were still
> waiting. When another programs connects to the queue and reads a message,
> it can still fetch messages. We were able to track down the problem to
> activeMQ since using another queuing system solved the problem, it kept
> reading all the messages, no matter how many were waiting on the queue.
> Eventually, making a connection for each thread solved the problem also
> for activeMQ. Hence my question.
>
> My apologies I can provide you only with a vague description of the
> problem and no test case to reproduce the problem but I'm wondering
> whether anybody else experienced similar problems and can provide me with
> more insight in the problem.
>
> Thanks!
> Steven.
>
>
>

--
View this message in context: 
http://www.nabble.com/Concurrent-use-of-connections-tp19952062p19952819.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to