Author: ritchiem Date: Fri Aug 7 18:03:13 2009 New Revision: 802112 URL: http://svn.apache.org/viewvc?rev=802112&view=rev Log: Add a drainQueue method to QTC to allow tests to clean up after themselves
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=802112&r1=802111&r2=802112&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Fri Aug 7 18:03:13 2009 @@ -26,6 +26,8 @@ import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageConsumer; import javax.naming.InitialContext; import javax.naming.NamingException; import java.io.BufferedReader; @@ -248,7 +250,7 @@ { fail("Unable to test without config file:" + _configFile); } - + startBroker(); } @@ -487,12 +489,12 @@ ApplicationRegistry.remove(port); } } - + public void nukeBroker() throws Exception { nukeBroker(0); } - + public void nukeBroker(int port) throws Exception { Process proc = _brokers.get(getPort(port)); @@ -503,7 +505,7 @@ else { String command = "pkill -KILL -f "+getBrokerCommand(getPort(port)); - try + try { Runtime.getRuntime().exec(command); } @@ -699,7 +701,7 @@ protected boolean isExternalBroker() { return !_broker.equals("vm"); - } + } public void restartBroker() throws Exception { @@ -835,7 +837,36 @@ revertSystemProperties(); } - + + /** + * Consume all the messages in the specified queue. Helper to ensure + * persistent tests don't leave data behind. + * + * @param queue the queue to purge + * @throws Exception if a problem occurs + * @return the count of messages drained + */ + protected int drainQueue(Queue queue) throws Exception + { + Connection connection = getConnection(); + + Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + int count = 0; + while (consumer.receive(1000) != null) + { + count++; + } + + connection.close(); + + return count; + } + public List<Message> sendMessage(Session session, Destination destination, int count) throws Exception { @@ -846,7 +877,7 @@ int count,int batchSize) throws Exception { List<Message> messages = new ArrayList<Message>(count); - + MessageProducer producer = session.createProducer(destination); for (int i = 0; i < count; i++) --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org