Author: robbie Date: Fri Dec 10 15:01:03 2010 New Revision: 1044389 URL: http://svn.apache.org/viewvc?rev=1044389&view=rev Log: QPID-2971, QPID-2972, QPID-2973: update system test to perform additional verification of the DLQ'd messages
Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java?rev=1044389&r1=1044388&r2=1044389&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java Fri Dec 10 15:01:03 2010 @@ -22,6 +22,7 @@ package org.apache.qpid.test.unit.client import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -34,6 +35,7 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -96,7 +98,7 @@ public class MaxDeliveryCountTest extend for (int count = 1; count <= MSG_COUNT; count++) { - Message msg = producerSession.createTextMessage("Message " + count); + Message msg = producerSession.createTextMessage(generateContent(count)); msg.setIntProperty("count", count); producer.send(msg); } @@ -107,6 +109,11 @@ public class MaxDeliveryCountTest extend _awaitCompletion = new CountDownLatch(1); } + private String generateContent(int count) + { + return "Message " + count + " content."; + } + /** * Test that Max Redelivery is enforced when using onMessage() on a * Client-Ack session. @@ -202,12 +209,12 @@ public class MaxDeliveryCountTest extend Connection clientConnection = getConnection(); boolean transacted = deliveryMode == Session.SESSION_TRANSACTED ? true : false; - final Session clientSession = clientConnection.createSession(transacted, deliveryMode); + Session clientSession = clientConnection.createSession(transacted, deliveryMode); MessageConsumer consumer = clientSession.createConsumer(_queue); assertEquals("The queue should have " + MSG_COUNT + " msgs at start", - MSG_COUNT, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue)); + MSG_COUNT, ((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) _queue)); clientConnection.start(); @@ -246,14 +253,74 @@ public class MaxDeliveryCountTest extend //check the source queue is now empty assertEquals("The queue should have 0 msgs left", 0, ((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) _queue)); - //check the DLQ has the rejected-without-requeue messages + //check the DLQ has the required number of rejected-without-requeue messages String dlQueueName = getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - assertEquals("The DLQ should have " + redeliverMsgs.size() + " msgs on it", redeliverMsgs.size() - , ((AMQSession<?,?>) clientSession).getQueueDepth((AMQDestination) new AMQQueue("amq.direct", dlQueueName))); + AMQDestination queue = new AMQQueue("amq.direct", dlQueueName); + assertEquals("The DLQ should have " + redeliverMsgs.size() + " msgs on it", redeliverMsgs.size(), + ((AMQSession<?,?>) clientSession).getQueueDepth(queue)); + + if(isBrokerStorePersistent()) + { + //restart the broker to verify persistence of the DLQ and the messages on it + clientConnection.close(); + + restartBroker(); + + clientConnection = getConnection(); + clientSession = clientConnection.createSession(transacted, deliveryMode); + clientConnection.start(); + } + + //verify the messages on the DLQ + verifyDLQcontent(clientConnection, redeliverMsgs, queue); clientConnection.close(); } + private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, AMQDestination queue) throws JMSException + { + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = clientSession.createConsumer(queue); + + //keep track of the message we expect to still be on the DLQ + List<Integer> outstandingMessages = new ArrayList<Integer>(redeliverMsgs); + int numMsg = outstandingMessages.size(); + + for(int i = 0; i < numMsg; i++) + { + Message message = consumer.receive(250); + + assertNotNull("failed to consume expected message from DLQ", message); + assertTrue("message was the wrong type", message instanceof TextMessage); + + //using Integer here to allow removing the value from the list, using int + //would instead result in removal of the element at that index + Integer msgId = message.getIntProperty("count"); + + TextMessage txt = (TextMessage) message; + _logger.info("Received message " + msgId + " from the DLQ: " + txt.getText()); + + assertTrue("message was not one of those which should have been on the DLQ", + redeliverMsgs.contains(msgId)); + assertTrue("message was not one of those expected to still be on the DLQ", + outstandingMessages.contains(msgId)); + assertEquals("Message content was not as expected", generateContent(msgId), txt.getText()); + + //remove from the list of outstanding msgs + outstandingMessages.remove(msgId); + } + + if(outstandingMessages.size() > 0) + { + String failures = ""; + for(Integer msg : outstandingMessages) + { + failures = failures.concat(msg + " "); + } + fail("some DLQ'd messages were not found on the DLQ: " + failures); + } + } + private void addMessageListener(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount, final int expectedTotalNumberOfDeliveries, final ArrayList<Integer> redeliverMsgs) throws JMSException { @@ -291,7 +358,7 @@ public class MaxDeliveryCountTest extend { int msgId = message.getIntProperty("count"); - _logger.info("Recieved Message: " + msgId); + _logger.info("Received message: " + msgId); //check the message is the one we expected if(_expectedMessage != msgId) @@ -447,7 +514,7 @@ public class MaxDeliveryCountTest extend { int msgId = message.getIntProperty("count"); - _logger.info("Recieved Message: " + msgId); + _logger.info("Received message: " + msgId); //check the message is the one we expected assertEquals("Unexpected message.", _expectedMessage, msgId); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org