Hi All,

I am facing a very strange problem with ActiveMQ. 

I have a java class that attaches itself to a queue. This class holds a
threadExecutor and once it gets a message, it uses the threadexecutor to run
a task with just arrived message. 

It keeps track of how many threads are currently performing tasks, once it
reaches a MAX, say 4, it stops the connection so that it does not get more
messages from the queue. As and when any one of the tasks complete it starts
connection again. 

This setup works for few messages and then suddenly my class waits forever
in connection's stop method. it never returns!!

Here is my onMessage method:

public void onMessage(Message objectMessage) {
    if (objectMessage instanceof ObjectMessage) {
      try {
        Serializable serializableObject = ((ObjectMessage) objectMessage)
            .getObject();

        // we look for MyRequest object, we ignore
        // everything else that was sent by mistake!
        // TODO: check if its possible to reject a message delivery. reject
the
        // ones we are not looking for.
        if (serializableObject instanceof MyRequest) {
          Log.getLogger().log(Level.INFO,
              Thread.currentThread().getName() + " received an object");
          MyRequest myRequest = (MyRequest) serializableObject;
          MyWorker myWorker = new MyWorker(
              myRequest);
          // make this object an observer of this categorisor
          MyWorker.addObserver(this);
          threadPoolExecutor.execute(myWorker);
          // increment running thread count, its inside synchronized as some
          // other thread may call update and try to decrement the count at
the
          // same time when this thread is incrementing it.
          Log.getLogger().log(
              Level.INFO,
              Thread.currentThread().getName()
                  + " Entering synchronised block...");
          synchronized (this) {
            runningThreads++;

            Log.getLogger().log(
                Level.INFO,
                Thread.currentThread().getName() + " Running: "
                    + runningThreads + " Max:" + MAX_THREADS);

            // now check if we are running max allowed thread for this
machine.
            if (runningThreads == MAX_THREADS) {
              Log.getLogger()
                  .log(
                      Level.INFO,
                      Thread.currentThread().getName()
                          + " Reached max threads... stoping feedback
message consumer!!");
              // stop receiving more messages.
              stopConsumption();

            }
          }

          Log.getLogger().log(
              Level.INFO,
              Thread.currentThread().getName()
                  + " out of synchronised block....");

        }
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }


Please note that this class adds itself as a listner to MyWorker object.
  // make this object an observer of this categorisor
   MyWorker.addObserver(this);

MyWorker once complete, updates this class as below: 
public void update(Observable observableSource,
      MyStatus status) {
    // TODO: work with status object later.
    if (observableSource instanceof MyWorker) {
      MyWorker myWorker = (MyWorker) observableSource;
      // notify observers about categorization being over.
      notifyObservers((MyResult) myWorker.getResult());

      Log.getLogger().log(Level.INFO,
          Thread.currentThread().getName() + ": Notified observers...");

      synchronized (this) {
        Log.getLogger()
            .log(
                Level.INFO,
                Thread.currentThread().getName()
                    + ": One of the threads comepleted... starting message
consumer if max was reached!!");
        runningThreads--;
        Log.getLogger().log(
            Level.INFO,
            Thread.currentThread().getName() + " Running: " + runningThreads
                + " Max:" + MAX_THREADS);
        // start consuming only when we were at max. need not call
        // startConsumption on each thread completion.
        if (runningThreads == MAX_THREADS - 1) {
          startConsumption();
        }
      }
    }
  }

Since there are multiple threads running and they all can call update
together, i have used synchronized block to decrement runningThreads and
call startConsumption() method. 

It works for random N messages and then it stops at the call to 
stopConsumption();

Here is my start and stop consumption methods. 

  /**
   * stops listing to the queue
   */
  private void stopConsumption() {
    try {
      conn.stop();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

  /**
   * start listing to the queue again.
   */
  private void startConsumption() {
    try {
      conn.start();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

Looking at jconsole threads, i can see that it waits in the call to
conn.start() and keeps on waiting forever. 

Here is how i connect to the queue in one my run() method:

  @Override
  public void run() {
    // create a activeMQ Queue consumer and register this
    // object as a JMS message listner.
    // try connecting to ActiveMQ broker
    try {
      javax.naming.Context ctx = ActiveMQInitialContext.getInstance();
      // lookup the connection factory
      javax.jms.QueueConnectionFactory factory =
(javax.jms.QueueConnectionFactory) ctx
         
.lookup(PropertyManager.getProperty("JMS_CONNECTION_FACTORY_NAME"));
     
      conn = factory.createQueueConnection();
      conn.start();
      // lookup an existing queue
      javax.jms.Queue myqueue = (Queue) ctx.lookup(PropertyManager
          .getProperty("JMS_REQUEST_QUEUE_NAME"));
      // create a new queueSession for the client
      QueueSession session = conn.createQueueSession(false,
          QueueSession.AUTO_ACKNOWLEDGE);

      // create a new subscriber to receive messages
      MessageConsumer subscriber = session.createConsumer(myqueue);
      subscriber.setMessageListener(this);

      Log.getLogger().log(
          Level.INFO,
          Thread.currentThread().getName() + ": Attached to: "
              + PropertyManager.getProperty("JMS_REQUEST_QUEUE_NAME"));

    } catch (NamingException e) {
      e.printStackTrace();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

As you can see i am using Auto ACK. I have tested this on activemq 5.4.2 and
5.5.0 and it behaves the same on both versions. I am using java 6.


--
View this message in context: 
http://activemq.2283324.n4.nabble.com/QueueConnection-Stop-Waits-forever-Stops-Message-Delivery-to-consumer-tp3569019p3569019.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to