Hi All, I am facing a very strange problem with ActiveMQ.
I have a java class that attaches itself to a queue. This class holds a threadExecutor and once it gets a message, it uses the threadexecutor to run a task with just arrived message. It keeps track of how many threads are currently performing tasks, once it reaches a MAX, say 4, it stops the connection so that it does not get more messages from the queue. As and when any one of the tasks complete it starts connection again. This setup works for few messages and then suddenly my class waits forever in connection's stop method. it never returns!! Here is my onMessage method: public void onMessage(Message objectMessage) { if (objectMessage instanceof ObjectMessage) { try { Serializable serializableObject = ((ObjectMessage) objectMessage) .getObject(); // we look for MyRequest object, we ignore // everything else that was sent by mistake! // TODO: check if its possible to reject a message delivery. reject the // ones we are not looking for. if (serializableObject instanceof MyRequest) { Log.getLogger().log(Level.INFO, Thread.currentThread().getName() + " received an object"); MyRequest myRequest = (MyRequest) serializableObject; MyWorker myWorker = new MyWorker( myRequest); // make this object an observer of this categorisor MyWorker.addObserver(this); threadPoolExecutor.execute(myWorker); // increment running thread count, its inside synchronized as some // other thread may call update and try to decrement the count at the // same time when this thread is incrementing it. Log.getLogger().log( Level.INFO, Thread.currentThread().getName() + " Entering synchronised block..."); synchronized (this) { runningThreads++; Log.getLogger().log( Level.INFO, Thread.currentThread().getName() + " Running: " + runningThreads + " Max:" + MAX_THREADS); // now check if we are running max allowed thread for this machine. if (runningThreads == MAX_THREADS) { Log.getLogger() .log( Level.INFO, Thread.currentThread().getName() + " Reached max threads... stoping feedback message consumer!!"); // stop receiving more messages. stopConsumption(); } } Log.getLogger().log( Level.INFO, Thread.currentThread().getName() + " out of synchronised block...."); } } catch (JMSException e) { e.printStackTrace(); } } } Please note that this class adds itself as a listner to MyWorker object. // make this object an observer of this categorisor MyWorker.addObserver(this); MyWorker once complete, updates this class as below: public void update(Observable observableSource, MyStatus status) { // TODO: work with status object later. if (observableSource instanceof MyWorker) { MyWorker myWorker = (MyWorker) observableSource; // notify observers about categorization being over. notifyObservers((MyResult) myWorker.getResult()); Log.getLogger().log(Level.INFO, Thread.currentThread().getName() + ": Notified observers..."); synchronized (this) { Log.getLogger() .log( Level.INFO, Thread.currentThread().getName() + ": One of the threads comepleted... starting message consumer if max was reached!!"); runningThreads--; Log.getLogger().log( Level.INFO, Thread.currentThread().getName() + " Running: " + runningThreads + " Max:" + MAX_THREADS); // start consuming only when we were at max. need not call // startConsumption on each thread completion. if (runningThreads == MAX_THREADS - 1) { startConsumption(); } } } } Since there are multiple threads running and they all can call update together, i have used synchronized block to decrement runningThreads and call startConsumption() method. It works for random N messages and then it stops at the call to stopConsumption(); Here is my start and stop consumption methods. /** * stops listing to the queue */ private void stopConsumption() { try { conn.stop(); } catch (JMSException e) { e.printStackTrace(); } } /** * start listing to the queue again. */ private void startConsumption() { try { conn.start(); } catch (JMSException e) { e.printStackTrace(); } } Looking at jconsole threads, i can see that it waits in the call to conn.start() and keeps on waiting forever. Here is how i connect to the queue in one my run() method: @Override public void run() { // create a activeMQ Queue consumer and register this // object as a JMS message listner. // try connecting to ActiveMQ broker try { javax.naming.Context ctx = ActiveMQInitialContext.getInstance(); // lookup the connection factory javax.jms.QueueConnectionFactory factory = (javax.jms.QueueConnectionFactory) ctx .lookup(PropertyManager.getProperty("JMS_CONNECTION_FACTORY_NAME")); conn = factory.createQueueConnection(); conn.start(); // lookup an existing queue javax.jms.Queue myqueue = (Queue) ctx.lookup(PropertyManager .getProperty("JMS_REQUEST_QUEUE_NAME")); // create a new queueSession for the client QueueSession session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); // create a new subscriber to receive messages MessageConsumer subscriber = session.createConsumer(myqueue); subscriber.setMessageListener(this); Log.getLogger().log( Level.INFO, Thread.currentThread().getName() + ": Attached to: " + PropertyManager.getProperty("JMS_REQUEST_QUEUE_NAME")); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } As you can see i am using Auto ACK. I have tested this on activemq 5.4.2 and 5.5.0 and it behaves the same on both versions. I am using java 6. -- View this message in context: http://activemq.2283324.n4.nabble.com/QueueConnection-Stop-Waits-forever-Stops-Message-Delivery-to-consumer-tp3569019p3569019.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.