If you post a thread dump when you are blocked on the stop call, from
something like kill -3 or jstack we may be able to spot the problem

On 2 June 2011 19:18, ravimbhatt <r...@qubitdigital.com> wrote:
> Hi All,
>
> I am facing a very strange problem with ActiveMQ.
>
> I have a java class that attaches itself to a queue. This class holds a
> threadExecutor and once it gets a message, it uses the threadexecutor to run
> a task with just arrived message.
>
> It keeps track of how many threads are currently performing tasks, once it
> reaches a MAX, say 4, it stops the connection so that it does not get more
> messages from the queue. As and when any one of the tasks complete it starts
> connection again.
>
> This setup works for few messages and then suddenly my class waits forever
> in connection's stop method. it never returns!!
>
> Here is my onMessage method:
>
> public void onMessage(Message objectMessage) {
>    if (objectMessage instanceof ObjectMessage) {
>      try {
>        Serializable serializableObject = ((ObjectMessage) objectMessage)
>            .getObject();
>
>        // we look for MyRequest object, we ignore
>        // everything else that was sent by mistake!
>        // TODO: check if its possible to reject a message delivery. reject
> the
>        // ones we are not looking for.
>        if (serializableObject instanceof MyRequest) {
>          Log.getLogger().log(Level.INFO,
>              Thread.currentThread().getName() + " received an object");
>          MyRequest myRequest = (MyRequest) serializableObject;
>          MyWorker myWorker = new MyWorker(
>              myRequest);
>          // make this object an observer of this categorisor
>          MyWorker.addObserver(this);
>          threadPoolExecutor.execute(myWorker);
>          // increment running thread count, its inside synchronized as some
>          // other thread may call update and try to decrement the count at
> the
>          // same time when this thread is incrementing it.
>          Log.getLogger().log(
>              Level.INFO,
>              Thread.currentThread().getName()
>                  + " Entering synchronised block...");
>          synchronized (this) {
>            runningThreads++;
>
>            Log.getLogger().log(
>                Level.INFO,
>                Thread.currentThread().getName() + " Running: "
>                    + runningThreads + " Max:" + MAX_THREADS);
>
>            // now check if we are running max allowed thread for this
> machine.
>            if (runningThreads == MAX_THREADS) {
>              Log.getLogger()
>                  .log(
>                      Level.INFO,
>                      Thread.currentThread().getName()
>                          + " Reached max threads... stoping feedback
> message consumer!!");
>              // stop receiving more messages.
>              stopConsumption();
>
>            }
>          }
>
>          Log.getLogger().log(
>              Level.INFO,
>              Thread.currentThread().getName()
>                  + " out of synchronised block....");
>
>        }
>      } catch (JMSException e) {
>        e.printStackTrace();
>      }
>    }
>  }
>
>
> Please note that this class adds itself as a listner to MyWorker object.
>  // make this object an observer of this categorisor
>   MyWorker.addObserver(this);
>
> MyWorker once complete, updates this class as below:
> public void update(Observable observableSource,
>      MyStatus status) {
>    // TODO: work with status object later.
>    if (observableSource instanceof MyWorker) {
>      MyWorker myWorker = (MyWorker) observableSource;
>      // notify observers about categorization being over.
>      notifyObservers((MyResult) myWorker.getResult());
>
>      Log.getLogger().log(Level.INFO,
>          Thread.currentThread().getName() + ": Notified observers...");
>
>      synchronized (this) {
>        Log.getLogger()
>            .log(
>                Level.INFO,
>                Thread.currentThread().getName()
>                    + ": One of the threads comepleted... starting message
> consumer if max was reached!!");
>        runningThreads--;
>        Log.getLogger().log(
>            Level.INFO,
>            Thread.currentThread().getName() + " Running: " + runningThreads
>                + " Max:" + MAX_THREADS);
>        // start consuming only when we were at max. need not call
>        // startConsumption on each thread completion.
>        if (runningThreads == MAX_THREADS - 1) {
>          startConsumption();
>        }
>      }
>    }
>  }
>
> Since there are multiple threads running and they all can call update
> together, i have used synchronized block to decrement runningThreads and
> call startConsumption() method.
>
> It works for random N messages and then it stops at the call to
> stopConsumption();
>
> Here is my start and stop consumption methods.
>
>  /**
>   * stops listing to the queue
>   */
>  private void stopConsumption() {
>    try {
>      conn.stop();
>    } catch (JMSException e) {
>      e.printStackTrace();
>    }
>  }
>
>  /**
>   * start listing to the queue again.
>   */
>  private void startConsumption() {
>    try {
>      conn.start();
>    } catch (JMSException e) {
>      e.printStackTrace();
>    }
>  }
>
> Looking at jconsole threads, i can see that it waits in the call to
> conn.start() and keeps on waiting forever.
>
> Here is how i connect to the queue in one my run() method:
>
>  @Override
>  public void run() {
>    // create a activeMQ Queue consumer and register this
>    // object as a JMS message listner.
>    // try connecting to ActiveMQ broker
>    try {
>      javax.naming.Context ctx = ActiveMQInitialContext.getInstance();
>      // lookup the connection factory
>      javax.jms.QueueConnectionFactory factory =
> (javax.jms.QueueConnectionFactory) ctx
>
> .lookup(PropertyManager.getProperty("JMS_CONNECTION_FACTORY_NAME"));
>
>      conn = factory.createQueueConnection();
>      conn.start();
>      // lookup an existing queue
>      javax.jms.Queue myqueue = (Queue) ctx.lookup(PropertyManager
>          .getProperty("JMS_REQUEST_QUEUE_NAME"));
>      // create a new queueSession for the client
>      QueueSession session = conn.createQueueSession(false,
>          QueueSession.AUTO_ACKNOWLEDGE);
>
>      // create a new subscriber to receive messages
>      MessageConsumer subscriber = session.createConsumer(myqueue);
>      subscriber.setMessageListener(this);
>
>      Log.getLogger().log(
>          Level.INFO,
>          Thread.currentThread().getName() + ": Attached to: "
>              + PropertyManager.getProperty("JMS_REQUEST_QUEUE_NAME"));
>
>    } catch (NamingException e) {
>      e.printStackTrace();
>    } catch (JMSException e) {
>      e.printStackTrace();
>    }
>  }
>
> As you can see i am using Auto ACK. I have tested this on activemq 5.4.2 and
> 5.5.0 and it behaves the same on both versions. I am using java 6.
>
>
> --
> View this message in context: 
> http://activemq.2283324.n4.nabble.com/QueueConnection-Stop-Waits-forever-Stops-Message-Delivery-to-consumer-tp3569019p3569019.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://fusesource.com
http://blog.garytully.com

Reply via email to