Author: kwall Date: Tue Oct 18 11:20:53 2011 New Revision: 1185580 URL: http://svn.apache.org/viewvc?rev=1185580&view=rev Log: QPID-3542: ensure session complete sent for filtered out messages
Applied patch from Andrew MacBean <andymacb...@gmail.com> and myself. Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1185580&r1=1185579&r2=1185580&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Oct 18 11:20:53 2011 @@ -294,23 +294,34 @@ public class AMQSession_0_10 extends AMQ } } - void messageAcknowledge(RangeSet ranges, boolean accept) + void messageAcknowledge(final RangeSet ranges, final boolean accept) { messageAcknowledge(ranges,accept,false); } - void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit) + void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit) { - Session ssn = getQpidSession(); - for (Range range : ranges) + final Session ssn = getQpidSession(); + flushProcessed(ranges,accept); + if (accept) { - ssn.processed(range); + ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE); } - ssn.flushProcessed(accept ? BATCH : NONE); - if (accept) + } + + /** + * Flush any outstanding commands. This causes session complete to be sent. + * @param ranges the range of command ids. + * @param batch true if batched. + */ + void flushProcessed(final RangeSet ranges, final boolean batch) + { + final Session ssn = getQpidSession(); + for (final Range range : ranges) { - ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE); + ssn.processed(range); } + ssn.flushProcessed(batch ? BATCH : NONE); } /** Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1185580&r1=1185579&r2=1185580&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Oct 18 11:20:53 2011 @@ -269,8 +269,9 @@ public class BasicMessageConsumer_0_10 e { if (_logger.isDebugEnabled()) { - _logger.debug("filterMessage - not ack'ing messaage as not aquired"); + _logger.debug("filterMessage - not ack'ing message as not acquired"); } + flushUnwantedMessage(message); } } @@ -312,6 +313,26 @@ public class BasicMessageConsumer_0_10 e } /** + * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated + * processed to ensure their AMQP command-id is marked completed. + * + * @param message The unwanted message to be flushed + * @throws AMQException If the unwanted message cannot be flushed due to some internal error. + */ + private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException + { + final RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); + _0_10session.flushProcessed(ranges,false); + + final AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } + } + + /** * Acquire a message * * @param message The message to be acquired Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=1185580&r1=1185579&r2=1185580&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Tue Oct 18 11:20:53 2011 @@ -33,6 +33,7 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -320,6 +321,7 @@ public class TopicSessionTest extends Qp final Connection con1 = getConnection(); final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE); final Topic topic1 = session1.createTopic(topicName); + final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId); // Setup subscriber with selector final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false); @@ -339,13 +341,9 @@ public class TopicSessionTest extends Qp session1.close(); - // Now recreate the session and subscriber (same clientid) but without selector and check that the message still - // is not received. This defect meant that such a message would be received. + // Now verify queue depth on broker. final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Topic topic2 = session2.createTopic(topicName); - - final TopicSubscriber sameSubscriberWithoutSelector = session2.createDurableSubscriber(topic2, clientId, null, false); - final Message message2 = sameSubscriberWithoutSelector.receive(1000); - assertNull("still should not have received message", message2); + final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker); + assertEquals("Expected queue depth of zero", 0, depth); } } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org