Hi all,
I've been testing ActiveMQ under relatively high load and number of
producers and consumers (50-100 on each side) and I've attached the
code that I have been using for test purposes (it's not really clean
code since it's used for testing purposes only). What happens is when
I launch about 50-60 consumers/producers (they run on a different
boxes) and I start to stop consumer processes, producers tend to block
for a shorter period, if I repeat this process for a while I get to
state where's a whole queue blocked for 1-2h or longer (sometimes for
good).
Is this general performance problem with ActiveMQ or am I doing something wrong?
Thanks,
Igor
package test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Message;
import javax.jms.Topic;
import java.io.IOException;
import java.util.Random;
import java.util.Date;
// ActivemqSafeJMSTest class
public class ActivemqSafeJMSTest implements MessageListener, ExceptionListener
{
protected Destination destination;
protected String subject = "";
protected String url = "";
// properties
protected boolean topic = false;
protected boolean transacted = false;
protected boolean durable = false;
protected boolean persistent = false;
protected boolean verbose = false;
private long sleepTime=0L;
protected int ackMode = Session.CLIENT_ACKNOWLEDGE;
// identification
protected String user = ActiveMQConnection.DEFAULT_USER;
protected String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
protected String clientID = "";
protected String consumerName = "";
// consumer
protected int count = 0;
protected int dumpCount = 20000;
protected int maxiumMessages = 0;
private boolean pauseBeforeShutdown;
private boolean running;
private Session globSession;
private Connection globConnection;
private long receiveTimeOut=0;
// producer
protected int messageSize = 1000;
protected int messageCount = 1000;
protected long timeToLive = 0L;
public ActivemqSafeJMSTest() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run(){
try {
if(globSession != null)
globSession.close();
if(globConnection != null)
globConnection.close();
System.out.println("Shuting down nicely.");
}
catch (JMSException e) {
System.out.println("Failed to shutdown nicely.");
}
}
});
}
public static void main(String[] args) {
int count = 0;
String mode = "";
ActivemqSafeJMSTest newTest = new ActivemqSafeJMSTest();
for (int i = 0; i < args.length; i++) {
// properties
if (args[i].matches("--durable")) {
newTest.durable = true;
}
else if (args[i].matches("--transacted")) {
newTest.transacted = true;
}
else if (args[i].matches("--persistent")) {
newTest.persistent = true;
}
else if (args[i].matches("--topic")) {
newTest.topic = true;
}
else if (args[i].matches("--verbose")) {
newTest.verbose = true;
}
// values
else if (args[i].matches("--clientid")) {
if (i < args.length - 1) {
newTest.clientID = args[++i];
}
else {
usage();
}
}
else if (args[i].matches("--consumername")) {
if (i < args.length - 1) {
newTest.consumerName = args[++i];
}
else {
usage();
}
}
else if (args[i].matches("--subject")) {
if (i < args.length - 1) {
newTest.subject = args[++i];
}
else {
usage();
}
}
else if (args[i].matches("--url")) {
if (i < args.length - 1) {
newTest.url = args[++i];
}
else {
usage();
}
}
else if (args[i].matches("--size")) {
if (i < args.length - 1) {
try {
newTest.messageSize = Integer.parseInt(args[++i]);
} catch (NumberFormatException nx) {
System.out.println("bad integer...");
System.exit(1);
}
}
else {
usage();
}
}
else if (args[i].matches("--messages")) {
if (i < args.length - 1) {
try {
newTest.messageCount = Integer.parseInt(args[++i]);
} catch (NumberFormatException nx) {
System.out.println("bad integer...");
System.exit(1);
}
}
else {
usage();
}
}
// mode
else if (i == args.length - 1) {
mode = args[i];
}
// f*ckup
else {
usage();
}
}
// randomize consumer name and client id if it's not being set - aka give mojo
if (newTest.consumerName.matches("")) {
Random randomizer = new Random();
newTest.consumerName = Long.toString(Math.abs(randomizer.nextLong()), 36);
}
if (newTest.clientID.matches("")) {
Random randomizer = new Random();
newTest.clientID = Long.toString(Math.abs(randomizer.nextLong()), 36);
}
// run test
if (mode.equals("consumer")) {
newTest.consume();
}
else if (mode.equals("producer")) {
newTest.produce();
}
}
// //
// ** GENERIC ** //
// //
protected Session createSession(Connection connection) throws Exception {
Session session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject);
}
else {
destination = session.createQueue(subject);
}
return session;
}
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientID!=null) {
connection.setClientID(clientID);
}
connection.start();
return connection;
}
protected void close(Connection connection, Session session) throws JMSException {
// lets dump the stats
dumpStats(connection);
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
protected void dumpStats(Connection connection) {
ActiveMQConnection c = (ActiveMQConnection) connection;
c.getConnectionStats().dump(new IndentPrinter());
}
// //
// ** CONSUMER ** //
// //
private void consume() {
try {
running = true;
System.out.println("Connecting to URL: " + url);
System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " queue");
globConnection = createConnection();
globConnection.setExceptionListener(this);
globSession = createSession(globConnection);
MessageConsumer consumer = null;
if (durable && topic) {
consumer = globSession.createDurableSubscriber((Topic) destination, consumerName);
}
else {
consumer = globSession.createConsumer(destination);
}
if ( maxiumMessages > 0 ) {
consumeMessagesAndClose(globConnection, globSession, consumer);
} else {
if(receiveTimeOut==0) {
consumer.setMessageListener(this);
} else {
consumeMessagesAndClose(globConnection, globSession, consumer, receiveTimeOut);
}
}
}
catch (Exception e) {
System.out.println("Caught while starting: " + e);
e.printStackTrace();
}
}
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
if (verbose) {
String msg = txtMsg.getText();
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println("Received: " + msg);
}
}
else {
if (verbose) {
System.out.println("Received: " + message);
}
}
if(transacted) {
globSession.commit();
}
message.acknowledge();
}
catch (JMSException e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
if( sleepTime> 0 ) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
}
}
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
for (int i = 0; i < maxiumMessages && isRunning(); ) {
Message message = consumer.receive(1000);
if( message!=null ) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
consumer = null;
session = null;
connection = null;
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");
Message message;
while ( (message = consumer.receive(timeout)) != null ) {
onMessage(message);
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
consumer = null;
session = null;
connection = null;
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
synchronized public void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
running=false;
}
synchronized boolean isRunning() {
return running;
}
// //
// ** PRODUCER ** //
// //
private void produce() {
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
globConnection = createConnection();
globSession = createSession(globConnection);
MessageProducer producer = createProducer(globSession);
sendLoop(globSession, producer);
System.out.println("Done.");
close(globConnection, globSession);
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
protected void sendLoop(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < messageCount || messageCount==0 ; i++) {
TextMessage message = session.createTextMessage(createMessageText(i));
if (verbose) {
String msg = message.getText();
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println("Sending message: " + msg);
}
producer.send(message);
if(transacted) {
session.commit();
}
Thread.sleep(sleepTime);
}
}
protected MessageProducer createProducer(Session session) throws JMSException {
MessageProducer producer = session.createProducer(destination);
if ((durable && topic) || (persistent && !topic)) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if( timeToLive!=0 )
producer.setTimeToLive(timeToLive);
return producer;
}
private String createMessageText(int index) {
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if (buffer.length() > messageSize) {
return buffer.substring(0, messageSize);
}
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append(' ');
}
return buffer.toString();
}
// //
// ** UTIL ** //
// //
private static void usage() {
System.out.println("usage : java -cp testModules.jar test.ActivemqSafeJMSTest [options] mode");
System.out.println("options: ");
System.out.println("\t--durable ");
System.out.println("\t--transacted ");
System.out.println("\t--persistent ");
System.out.println("\t--topic ");
System.out.println("\t--clientid clientID ");
System.out.println("\t--consumername consumerName ");
System.out.println("\t--subject subject ");
System.out.println("\t--size size ");
System.out.println("\t--messages messageCount ");
System.out.println("\t--url URL ");
System.exit(1);
}
}