Author: ritchiem
Date: Fri Aug  7 18:03:13 2009
New Revision: 802112

URL: http://svn.apache.org/viewvc?rev=802112&view=rev
Log:
Add a drainQueue method to QTC to allow tests to clean up after themselves

Modified:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=802112&r1=802111&r2=802112&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
 Fri Aug  7 18:03:13 2009
@@ -26,6 +26,8 @@
 import javax.jms.MessageProducer;
 import javax.jms.Message;
 import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 import java.io.BufferedReader;
@@ -248,7 +250,7 @@
         {
             fail("Unable to test without config file:" + _configFile);
         }
-        
+
         startBroker();
     }
 
@@ -487,12 +489,12 @@
             ApplicationRegistry.remove(port);
         }
     }
-    
+
     public void nukeBroker() throws Exception
     {
         nukeBroker(0);
     }
-    
+
     public void nukeBroker(int port) throws Exception
     {
         Process proc = _brokers.get(getPort(port));
@@ -503,7 +505,7 @@
         else
         {
             String command = "pkill -KILL -f "+getBrokerCommand(getPort(port));
-            try 
+            try
             {
                 Runtime.getRuntime().exec(command);
             }
@@ -699,7 +701,7 @@
     protected boolean isExternalBroker()
     {
         return !_broker.equals("vm");
-    }    
+    }
 
     public void restartBroker() throws Exception
     {
@@ -835,7 +837,36 @@
 
         revertSystemProperties();
     }
-    
+
+    /**
+     * Consume all the messages in the specified queue. Helper to ensure
+     * persistent tests don't leave data behind.
+     *
+     * @param queue the queue to purge
+     * @throws Exception if a problem occurs
+     * @return the count of messages drained
+     */
+    protected int drainQueue(Queue queue) throws Exception
+    {
+        Connection connection = getConnection();
+
+        Session session = 
connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        connection.start();
+
+        int count = 0;
+        while (consumer.receive(1000) != null)
+        {
+            count++;
+        }
+
+        connection.close();
+
+        return count;
+    }
+
     public List<Message> sendMessage(Session session, Destination destination,
                                      int count) throws Exception
     {
@@ -846,7 +877,7 @@
                                      int count,int batchSize) throws Exception
     {
         List<Message> messages = new ArrayList<Message>(count);
-        
+
         MessageProducer producer = session.createProducer(destination);
 
         for (int i = 0; i < count; i++)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to