If you post a thread dump when you are blocked on the stop call, from something like kill -3 or jstack we may be able to spot the problem
On 2 June 2011 19:18, ravimbhatt <r...@qubitdigital.com> wrote: > Hi All, > > I am facing a very strange problem with ActiveMQ. > > I have a java class that attaches itself to a queue. This class holds a > threadExecutor and once it gets a message, it uses the threadexecutor to run > a task with just arrived message. > > It keeps track of how many threads are currently performing tasks, once it > reaches a MAX, say 4, it stops the connection so that it does not get more > messages from the queue. As and when any one of the tasks complete it starts > connection again. > > This setup works for few messages and then suddenly my class waits forever > in connection's stop method. it never returns!! > > Here is my onMessage method: > > public void onMessage(Message objectMessage) { > if (objectMessage instanceof ObjectMessage) { > try { > Serializable serializableObject = ((ObjectMessage) objectMessage) > .getObject(); > > // we look for MyRequest object, we ignore > // everything else that was sent by mistake! > // TODO: check if its possible to reject a message delivery. reject > the > // ones we are not looking for. > if (serializableObject instanceof MyRequest) { > Log.getLogger().log(Level.INFO, > Thread.currentThread().getName() + " received an object"); > MyRequest myRequest = (MyRequest) serializableObject; > MyWorker myWorker = new MyWorker( > myRequest); > // make this object an observer of this categorisor > MyWorker.addObserver(this); > threadPoolExecutor.execute(myWorker); > // increment running thread count, its inside synchronized as some > // other thread may call update and try to decrement the count at > the > // same time when this thread is incrementing it. > Log.getLogger().log( > Level.INFO, > Thread.currentThread().getName() > + " Entering synchronised block..."); > synchronized (this) { > runningThreads++; > > Log.getLogger().log( > Level.INFO, > Thread.currentThread().getName() + " Running: " > + runningThreads + " Max:" + MAX_THREADS); > > // now check if we are running max allowed thread for this > machine. > if (runningThreads == MAX_THREADS) { > Log.getLogger() > .log( > Level.INFO, > Thread.currentThread().getName() > + " Reached max threads... stoping feedback > message consumer!!"); > // stop receiving more messages. > stopConsumption(); > > } > } > > Log.getLogger().log( > Level.INFO, > Thread.currentThread().getName() > + " out of synchronised block...."); > > } > } catch (JMSException e) { > e.printStackTrace(); > } > } > } > > > Please note that this class adds itself as a listner to MyWorker object. > // make this object an observer of this categorisor > MyWorker.addObserver(this); > > MyWorker once complete, updates this class as below: > public void update(Observable observableSource, > MyStatus status) { > // TODO: work with status object later. > if (observableSource instanceof MyWorker) { > MyWorker myWorker = (MyWorker) observableSource; > // notify observers about categorization being over. > notifyObservers((MyResult) myWorker.getResult()); > > Log.getLogger().log(Level.INFO, > Thread.currentThread().getName() + ": Notified observers..."); > > synchronized (this) { > Log.getLogger() > .log( > Level.INFO, > Thread.currentThread().getName() > + ": One of the threads comepleted... starting message > consumer if max was reached!!"); > runningThreads--; > Log.getLogger().log( > Level.INFO, > Thread.currentThread().getName() + " Running: " + runningThreads > + " Max:" + MAX_THREADS); > // start consuming only when we were at max. need not call > // startConsumption on each thread completion. > if (runningThreads == MAX_THREADS - 1) { > startConsumption(); > } > } > } > } > > Since there are multiple threads running and they all can call update > together, i have used synchronized block to decrement runningThreads and > call startConsumption() method. > > It works for random N messages and then it stops at the call to > stopConsumption(); > > Here is my start and stop consumption methods. > > /** > * stops listing to the queue > */ > private void stopConsumption() { > try { > conn.stop(); > } catch (JMSException e) { > e.printStackTrace(); > } > } > > /** > * start listing to the queue again. > */ > private void startConsumption() { > try { > conn.start(); > } catch (JMSException e) { > e.printStackTrace(); > } > } > > Looking at jconsole threads, i can see that it waits in the call to > conn.start() and keeps on waiting forever. > > Here is how i connect to the queue in one my run() method: > > @Override > public void run() { > // create a activeMQ Queue consumer and register this > // object as a JMS message listner. > // try connecting to ActiveMQ broker > try { > javax.naming.Context ctx = ActiveMQInitialContext.getInstance(); > // lookup the connection factory > javax.jms.QueueConnectionFactory factory = > (javax.jms.QueueConnectionFactory) ctx > > .lookup(PropertyManager.getProperty("JMS_CONNECTION_FACTORY_NAME")); > > conn = factory.createQueueConnection(); > conn.start(); > // lookup an existing queue > javax.jms.Queue myqueue = (Queue) ctx.lookup(PropertyManager > .getProperty("JMS_REQUEST_QUEUE_NAME")); > // create a new queueSession for the client > QueueSession session = conn.createQueueSession(false, > QueueSession.AUTO_ACKNOWLEDGE); > > // create a new subscriber to receive messages > MessageConsumer subscriber = session.createConsumer(myqueue); > subscriber.setMessageListener(this); > > Log.getLogger().log( > Level.INFO, > Thread.currentThread().getName() + ": Attached to: " > + PropertyManager.getProperty("JMS_REQUEST_QUEUE_NAME")); > > } catch (NamingException e) { > e.printStackTrace(); > } catch (JMSException e) { > e.printStackTrace(); > } > } > > As you can see i am using Auto ACK. I have tested this on activemq 5.4.2 and > 5.5.0 and it behaves the same on both versions. I am using java 6. > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/QueueConnection-Stop-Waits-forever-Stops-Message-Delivery-to-consumer-tp3569019p3569019.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > -- http://fusesource.com http://blog.garytully.com