Hi all,

I've been testing ActiveMQ under relatively high load and number of
producers and consumers (50-100 on each side) and I've attached the
code that I have been using for test purposes (it's not really clean
code since it's used for testing purposes only). What happens is when
I launch about 50-60 consumers/producers (they run on a different
boxes) and I start to stop consumer processes, producers tend to block
for a shorter period, if I repeat this process for a while I get to
state where's a whole queue blocked for 1-2h or longer (sometimes for
good).
Is this general performance problem with ActiveMQ or am I doing something wrong?

Thanks,
Igor
package test;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Message;
import javax.jms.Topic;

import java.io.IOException;
import java.util.Random;
import java.util.Date;

// ActivemqSafeJMSTest class

public class ActivemqSafeJMSTest implements MessageListener, ExceptionListener 
{
  protected Destination destination;
  protected String subject = "";
  protected String url = "";
  // properties
  protected boolean topic = false;
  protected boolean transacted = false;
  protected boolean durable = false;
  protected boolean persistent = false;
  protected boolean verbose = false;
  private long sleepTime=0L;
  protected int ackMode = Session.CLIENT_ACKNOWLEDGE;
  // identification
  protected String user = ActiveMQConnection.DEFAULT_USER;
  protected String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
  protected String clientID = "";
  protected String consumerName = "";

  // consumer
  protected int count = 0;
  protected int dumpCount = 20000;
  protected int maxiumMessages = 0;
  private boolean pauseBeforeShutdown;
  private boolean running;
  private Session globSession;
  private Connection globConnection;
  private long receiveTimeOut=0;

  // producer
  protected int messageSize = 1000;
  protected int messageCount = 1000;
  protected long timeToLive = 0L;

  public ActivemqSafeJMSTest() {
    Runtime.getRuntime().addShutdownHook(new Thread() {  
      public void run(){
        try {
          if(globSession != null) 
            globSession.close();
          if(globConnection != null)
            globConnection.close(); 

          System.out.println("Shuting down nicely.");
        }
        catch (JMSException e) {
          System.out.println("Failed to shutdown nicely.");
        }
      }
    });

  }

  public static void main(String[] args) {
    int count = 0;
    String mode = "";
    ActivemqSafeJMSTest newTest = new ActivemqSafeJMSTest();

    for (int i = 0; i < args.length; i++) {  
      // properties
      if (args[i].matches("--durable")) {
        newTest.durable = true;
      }
      else if (args[i].matches("--transacted")) {
        newTest.transacted = true;
      }
      else if (args[i].matches("--persistent")) {
        newTest.persistent = true;
      }
      else if (args[i].matches("--topic")) {
        newTest.topic = true;
      }
      else if (args[i].matches("--verbose")) {
        newTest.verbose = true;
      }
      // values
      else if (args[i].matches("--clientid")) {
        if (i < args.length - 1) {
          newTest.clientID = args[++i];
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--consumername")) {
        if (i < args.length - 1) {
          newTest.consumerName = args[++i];
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--subject")) {
        if (i < args.length - 1) {
          newTest.subject = args[++i];
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--url")) {
        if (i < args.length - 1) {
          newTest.url = args[++i];
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--size")) {
        if (i < args.length - 1) {
          try {
            newTest.messageSize = Integer.parseInt(args[++i]);
          } catch (NumberFormatException nx) {
            System.out.println("bad integer...");
            System.exit(1);
          }
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--messages")) {
        if (i < args.length - 1) {
          try {
            newTest.messageCount = Integer.parseInt(args[++i]);
          } catch (NumberFormatException nx) {
            System.out.println("bad integer...");
            System.exit(1);
          }
        }
        else {
          usage();
        }
      }
      // mode
      else if (i == args.length - 1) {
        mode = args[i];
      }
      // f*ckup 
      else {
        usage();
      }
    }

    // randomize consumer name and client id if it's not being set - aka give mojo
    if (newTest.consumerName.matches("")) {
      Random randomizer = new Random();
      newTest.consumerName = Long.toString(Math.abs(randomizer.nextLong()), 36);
    }
    if (newTest.clientID.matches("")) {
      Random randomizer = new Random();
      newTest.clientID = Long.toString(Math.abs(randomizer.nextLong()), 36);
    }

    // run test
    if (mode.equals("consumer")) {
      newTest.consume();
    }
    else if (mode.equals("producer")) {
      newTest.produce();
    }
  }

  //                //
  // ** GENERIC  ** //
  //                //


  protected Session createSession(Connection connection) throws Exception {
    Session session = connection.createSession(transacted, ackMode);
    if (topic) {
      destination = session.createTopic(subject);
    }
    else {
      destination = session.createQueue(subject);
    }
    return session;
  }

  protected Connection createConnection() throws JMSException, Exception {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
    Connection connection = connectionFactory.createConnection();
    if (durable && clientID!=null) {
      connection.setClientID(clientID);
    }
    
    connection.start();
    return connection;
  }

  protected void close(Connection connection, Session session) throws JMSException {
    // lets dump the stats
    dumpStats(connection);

    if (session != null) {
      session.close();
    }
    if (connection != null) {
      connection.close();
    }
  }

  protected void dumpStats(Connection connection) {
    ActiveMQConnection c = (ActiveMQConnection) connection;
    c.getConnectionStats().dump(new IndentPrinter());
  }

  //                //
  // ** CONSUMER ** //
  //                //

  private void consume() {
    try {
      running = true;
            
      System.out.println("Connecting to URL: " + url);
      System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
      System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
      System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " queue");

      globConnection = createConnection();
      globConnection.setExceptionListener(this);
      globSession = createSession(globConnection);
      MessageConsumer consumer = null;
      if (durable && topic) {
        consumer = globSession.createDurableSubscriber((Topic) destination, consumerName);
      }
      else {
        consumer = globSession.createConsumer(destination);
      }
      if ( maxiumMessages > 0 ) {
        consumeMessagesAndClose(globConnection, globSession, consumer);
      } else  {
        if(receiveTimeOut==0) {
          consumer.setMessageListener(this);
        } else {
          consumeMessagesAndClose(globConnection, globSession, consumer, receiveTimeOut);
        }
      }
    }
    catch (Exception e) {
      System.out.println("Caught while starting: " + e);
      e.printStackTrace();
    } 
  }

  public void onMessage(Message message) {
    try {
      if (message instanceof TextMessage) {
        TextMessage txtMsg = (TextMessage) message;
        if (verbose) {

          String msg = txtMsg.getText();
          if (msg.length() > 50) {
            msg = msg.substring(0, 50) + "...";
          }

          System.out.println("Received: " + msg);
        }
      }
      else {
        if (verbose) {
          System.out.println("Received: " + message);
        }
      }
      if(transacted) {
        globSession.commit();
      }

      message.acknowledge();
    }
    catch (JMSException e) {
      System.out.println("Caught: " + e);
      e.printStackTrace();
    } finally {
      if( sleepTime> 0 ) {
        try {
          Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
      
        }
      }
    }
  }

  protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
    System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");

    for (int i = 0; i < maxiumMessages && isRunning(); ) {
      Message message = consumer.receive(1000);
      if( message!=null ) {
        i++;
        onMessage(message);
      }
    }
    
    System.out.println("Closing connection");
    consumer.close();
    session.close();
    connection.close();

    consumer = null;
    session = null;
    connection = null;

    if (pauseBeforeShutdown) {
      System.out.println("Press return to shut down");
      System.in.read();
    }
  }
    
  protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
    System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");

    Message message;
    while ( (message = consumer.receive(timeout)) != null ) {
      onMessage(message);
    }
        
    System.out.println("Closing connection");
    consumer.close();
    session.close();
    connection.close();

    consumer = null;
    session = null;
    connection = null;

    if (pauseBeforeShutdown) {
      System.out.println("Press return to shut down");
      System.in.read();
    }
 }


  synchronized public void onException(JMSException ex) {
    System.out.println("JMS Exception occured.  Shutting down client.");
    running=false;
  }

  synchronized boolean isRunning() {
    return running;
  }

  //                //
  // ** PRODUCER ** //
  //                //

  private void produce() {
    try {
      System.out.println("Connecting to URL: " + url);
      System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
      System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
  
      globConnection = createConnection();
      globSession = createSession(globConnection);
      MessageProducer producer = createProducer(globSession);
      sendLoop(globSession, producer);

      System.out.println("Done.");
      close(globConnection, globSession); 
    }
    catch (Exception e) {
      System.out.println("Caught: " + e);
      e.printStackTrace();
    }
  }

  protected void sendLoop(Session session, MessageProducer producer) throws Exception {
    for (int i = 0; i < messageCount || messageCount==0 ; i++) {
      TextMessage message = session.createTextMessage(createMessageText(i));

      if (verbose) {
        String msg = message.getText();
        if (msg.length() > 50) {
          msg = msg.substring(0, 50) + "...";
        }
        System.out.println("Sending message: " + msg);
      }
            
      producer.send(message);
      if(transacted) {
        session.commit();
      }
            
      Thread.sleep(sleepTime);
            
    }
  }

  protected MessageProducer createProducer(Session session) throws JMSException {
    MessageProducer producer = session.createProducer(destination);
    if ((durable && topic) || (persistent && !topic)) {
      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    }
    else {
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }
    if( timeToLive!=0 )
      producer.setTimeToLive(timeToLive);
  
    return producer;
  }


  private String createMessageText(int index) {
    StringBuffer buffer = new StringBuffer(messageSize);
    buffer.append("Message: " + index + " sent at: " + new Date());
    if (buffer.length() > messageSize) {
      return buffer.substring(0, messageSize);
    }
    for (int i = buffer.length(); i < messageSize; i++) {
      buffer.append(' ');
    }
    return buffer.toString();
  }

  //            //
  // ** UTIL ** //
  //            //

  private static void usage() {
    System.out.println("usage : java -cp testModules.jar test.ActivemqSafeJMSTest [options] mode");
    System.out.println("options:                      ");
    System.out.println("\t--durable                   ");
    System.out.println("\t--transacted                ");
    System.out.println("\t--persistent                ");
    System.out.println("\t--topic                     ");
    System.out.println("\t--clientid clientID         ");
    System.out.println("\t--consumername consumerName ");
    System.out.println("\t--subject subject           ");
    System.out.println("\t--size size                 ");
    System.out.println("\t--messages messageCount     ");
    System.out.println("\t--url URL                   ");
    
    System.exit(1);
  }
}

Reply via email to