Hi all,
I have a small test app that creates a TopicPublisher that generates 1000
messages and then shuts down.
However when I run the app it does not stop. It appears that the
TopicPublisher keeps a thread alive which prevents the app from stopping...
From the logs of my app you can see the keep alive messages getting sent
even after the publisher has been stooped and the session closed:
1375 [main] DEBUG org.apache.activemq.ActiveMQSession - Sending message:
ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId =
ID:isis-ws-007-4500-1158269259517-1:0:1:1:1000, originalDestination = null,
originalTransactionId = null, producerId =
ID:isis-ws-007-4500-1158269259517-1:0:1:1, destination = topic://testtopic,
transactionId = null, expiration = 0, timestamp = 1158269260720, arrival =
0, correlationId = null, replyTo = null, persistent = true, type = null,
priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
null, dataStructure = null, redeliveryCounter = 0, size = 0, properties =
null, readOnlyProperties = true, readOnlyBody = true, text = Msg:999}
1375 [main] INFO TestPub - Stopping...
1375 [main] INFO TestPub - Stopped.
1375 [main] INFO TestPub - Done.
15560 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor - Message sent since last
write check, resetting flag
30557 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor - No message sent since
last write check, sending a KeepAliveInfo
30557 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor - Message received since
last read check, resetting flag:
45555 [ActiveMQ Scheduler] DEBUG
org.apache.activemq.transport.InactivityMonitor - No message sent since
last write check, sending a KeepAliveInfo
The code of the app:
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
/**
* Test topic subscriber.
*
* @author ackerj
* @version $Id$
*/
public class TestPub implements ExceptionListener
{
private static Logger log = Logger.getLogger(TestPub.class);
ActiveMQConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
String brokerUri;
String topicName;
private static int MSG_TO_SEND = 1000;
public void onException(JMSException e)
{
log.error("OnException event fired !", e);
System.exit(1);
}
public TestPub(String brokerUri, String topicName)
throws Exception
{
this.brokerUri = brokerUri;
this.topicName = topicName;
}
public void go()
throws Exception
{
log.info("Connecting.....");
topicConnectionFactory = new ActiveMQConnectionFactory();
topicConnectionFactory.setBrokerURL(brokerUri);
topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.setExceptionListener(this);
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = topicSession.createTopic(topicName);
topicPublisher = topicSession.createPublisher(topic);
topicConnection.start();
log.info("Connected.");
log.info("Sending " + MSG_TO_SEND + " messages..");
for (int loop=0; loop < MSG_TO_SEND; loop++)
{
TextMessage msg = topicSession.createTextMessage();
msg.setText("Msg:" + loop);
topicPublisher.send(msg);
}
log.info("Stopping...");
topicPublisher.close();
topicSession.close();
topicConnection.stop();
log.info("Stopped.");
}
public static void main(String[] args)
{
BasicConfigurator.configure();
try
{
System.out.println("Test ActiveMQ Publisher");
if (args.length != 1 && args.length != 2)
{
System.out.println("Usage: TestPub [brokerUri] [topicname]");
System.out.println("eg: ");
System.out.println(" TestPub tcp://127.0.0.1:12345 testtopic");
System.exit(1);
}
String brokerUri = new String(args[0]);
log.info("BrokerUri: " + brokerUri);
String topicName = new String(args[1]);
log.info("Topic name is " + topicName);
TestPub pub = new TestPub(brokerUri, topicName);
pub.go();
log.info("Done.");
}
catch (Exception e)
{
e.printStackTrace();
System.exit(1);
}
}
}