Author: robbie
Date: Fri Dec 10 15:01:03 2010
New Revision: 1044389

URL: http://svn.apache.org/viewvc?rev=1044389&view=rev
Log:
QPID-2971, QPID-2972, QPID-2973: update system test to perform additional 
verification of the DLQ'd messages

Modified:
    
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java

Modified: 
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java?rev=1044389&r1=1044388&r2=1044389&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
 Fri Dec 10 15:01:03 2010
@@ -22,6 +22,7 @@ package org.apache.qpid.test.unit.client
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +35,7 @@ import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -96,7 +98,7 @@ public class MaxDeliveryCountTest extend
 
         for (int count = 1; count <= MSG_COUNT; count++)
         {
-            Message msg = producerSession.createTextMessage("Message " + 
count);
+            Message msg = 
producerSession.createTextMessage(generateContent(count));
             msg.setIntProperty("count", count);
             producer.send(msg);
         }
@@ -107,6 +109,11 @@ public class MaxDeliveryCountTest extend
         _awaitCompletion = new CountDownLatch(1);
     }
 
+    private String generateContent(int count)
+    {
+        return "Message " + count + " content.";
+    }
+
     /**
      * Test that Max Redelivery is enforced when using onMessage() on a 
      * Client-Ack session.
@@ -202,12 +209,12 @@ public class MaxDeliveryCountTest extend
         Connection clientConnection = getConnection();
         
         boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true 
: false;
-        final Session clientSession = 
clientConnection.createSession(transacted, deliveryMode);
+        Session clientSession = clientConnection.createSession(transacted, 
deliveryMode);
 
         MessageConsumer consumer = clientSession.createConsumer(_queue);
 
         assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
-                MSG_COUNT, ((AMQSession) 
clientSession).getQueueDepth((AMQDestination) _queue));
+                MSG_COUNT, ((AMQSession<?,?>) 
clientSession).getQueueDepth((AMQDestination) _queue));
 
         clientConnection.start();
 
@@ -246,14 +253,74 @@ public class MaxDeliveryCountTest extend
         //check the source queue is now empty
         assertEquals("The queue should have 0 msgs left", 0, 
((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) _queue));
         
-        //check the DLQ has the rejected-without-requeue messages
+        //check the DLQ has the required number of rejected-without-requeue 
messages
         String dlQueueName = getTestQueueName() + 
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
-        assertEquals("The DLQ should have " + redeliverMsgs.size() + " msgs on 
it", redeliverMsgs.size()
-                , ((AMQSession<?,?>) 
clientSession).getQueueDepth((AMQDestination) new AMQQueue("amq.direct", 
dlQueueName)));
+        AMQDestination queue =  new AMQQueue("amq.direct", dlQueueName);
+        assertEquals("The DLQ should have " + redeliverMsgs.size() + " msgs on 
it", redeliverMsgs.size(),
+                        ((AMQSession<?,?>) 
clientSession).getQueueDepth(queue));
+
+        if(isBrokerStorePersistent())
+        {
+            //restart the broker to verify persistence of the DLQ and the 
messages on it
+            clientConnection.close();
+
+            restartBroker();
+
+            clientConnection = getConnection();
+            clientSession = clientConnection.createSession(transacted, 
deliveryMode);
+            clientConnection.start();
+        }
+
+        //verify the messages on the DLQ
+        verifyDLQcontent(clientConnection, redeliverMsgs, queue);
 
         clientConnection.close();
     }
 
+    private void verifyDLQcontent(Connection clientConnection, List<Integer> 
redeliverMsgs, AMQDestination queue) throws JMSException
+    {
+        Session clientSession = clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = clientSession.createConsumer(queue);
+
+        //keep track of the message we expect to still be on the DLQ
+        List<Integer> outstandingMessages = new 
ArrayList<Integer>(redeliverMsgs);
+        int numMsg = outstandingMessages.size();
+
+        for(int i = 0; i < numMsg; i++)
+        {
+            Message message = consumer.receive(250);
+
+            assertNotNull("failed to consume expected message from DLQ", 
message);
+            assertTrue("message was the wrong type", message instanceof 
TextMessage);
+
+            //using Integer here to allow removing the value from the list, 
using int
+            //would instead result in removal of the element at that index
+            Integer msgId = message.getIntProperty("count");
+
+            TextMessage txt = (TextMessage) message;
+            _logger.info("Received message " + msgId + " from the DLQ: " + 
txt.getText());
+
+            assertTrue("message was not one of those which should have been on 
the DLQ",
+                    redeliverMsgs.contains(msgId));
+            assertTrue("message was not one of those expected to still be on 
the DLQ",
+                    outstandingMessages.contains(msgId));
+            assertEquals("Message content was not as expected", 
generateContent(msgId), txt.getText());
+
+            //remove from the list of outstanding msgs
+            outstandingMessages.remove(msgId);
+        }
+
+        if(outstandingMessages.size() > 0)
+        {
+            String failures = "";
+            for(Integer msg : outstandingMessages)
+            {
+                failures = failures.concat(msg + " ");
+            }
+            fail("some DLQ'd messages were not found on the DLQ: " + failures);
+        }
+    }
+
     private void addMessageListener(final Session session, final 
MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount,
                                     final int expectedTotalNumberOfDeliveries, 
final ArrayList<Integer> redeliverMsgs) throws JMSException
     {
@@ -291,7 +358,7 @@ public class MaxDeliveryCountTest extend
                 {
                     int msgId = message.getIntProperty("count");
                     
-                    _logger.info("Recieved Message: " + msgId);
+                    _logger.info("Received message: " + msgId);
 
                     //check the message is the one we expected
                     if(_expectedMessage != msgId)
@@ -447,7 +514,7 @@ public class MaxDeliveryCountTest extend
             {
                 int msgId = message.getIntProperty("count");
 
-                _logger.info("Recieved Message: " + msgId);
+                _logger.info("Received message: " + msgId);
 
                 //check the message is the one we expected
                 assertEquals("Unexpected message.", _expectedMessage, msgId);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to