Author: rajith Date: Tue May 18 23:05:36 2010 New Revision: 945945 URL: http://svn.apache.org/viewvc?rev=945945&view=rev Log: Implemented the feature described in QPID-2515 However a few issues needs to be ironed out - see the JIRA for these issues.
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=945945&r1=945944&r2=945945&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue May 18 23:05:36 2010 @@ -142,7 +142,7 @@ public class AMQSession_0_10 extends AMQ * USed to store the range of in tx messages */ private RangeSet _txRangeSet = new RangeSet(); - private int _txSize = 0; + private int _txSize = 0; //--- constructors /** @@ -560,6 +560,9 @@ public class AMQSession_0_10 extends AMQ throws AMQException, FailoverException { boolean preAcquire; + + long capacity = getCapacity(consumer.getDestination()); + try { preAcquire = ( ! consumer.isNoConsume() && @@ -578,7 +581,7 @@ public class AMQSession_0_10 extends AMQ String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); - if (! prefetch()) + if (capacity == 0) { getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT); } @@ -589,12 +592,12 @@ public class AMQSession_0_10 extends AMQ getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - if(prefetch() && _dispatcher != null && (isStarted() || _immediatePrefetch)) + if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch(), + capacity, Option.UNRELIABLE); } @@ -604,6 +607,21 @@ public class AMQSession_0_10 extends AMQ getCurrentException(); } } + + private long getCapacity(AMQDestination destination) + { + long capacity = 0; + if (destination.getDestSyntax() == DestSyntax.ADDR && + destination.getSourceLink().getCapacity() > 0) + { + capacity = destination.getSourceLink().getCapacity(); + } + else if (prefetch()) + { + capacity = getAMQConnection().getMaxPrefetch(); + } + return capacity; + } /** * Create an 0_10 message producer @@ -744,7 +762,9 @@ public class AMQSession_0_10 extends AMQ //only set if msg list is null try { - if (! prefetch()) + long capacity = getCapacity(consumer.getDestination()); + + if (capacity == 0) { if (consumer.getMessageListener() != null) { @@ -757,7 +777,7 @@ public class AMQSession_0_10 extends AMQ { getQpidSession() .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch(), + capacity, Option.UNRELIABLE); } getQpidSession() Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=945945&r1=945944&r2=945945&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue May 18 23:05:36 2010 @@ -19,6 +19,7 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; @@ -72,7 +73,9 @@ public class BasicMessageConsumer_0_10 e */ private final AtomicBoolean _syncReceive = new AtomicBoolean(false); private String _consumerTagString; - + + private long capacity = 0; + //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -100,6 +103,18 @@ public class BasicMessageConsumer_0_10 e } } _isStarted = connection.started(); + + // Destination setting overrides connection defaults + if (destination.getDestSyntax() == DestSyntax.ADDR && + destination.getSourceLink().getCapacity() > 0) + { + capacity = destination.getSourceLink().getCapacity(); + } + else if (getSession().prefetch()) + { + capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + } + } @@ -146,7 +161,7 @@ public class BasicMessageConsumer_0_10 e } if (messageOk) { - if (isMessageListenerSet() && ! getSession().prefetch()) + if (isMessageListenerSet() && capacity == 0) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, @@ -245,7 +260,7 @@ public class BasicMessageConsumer_0_10 e } // if we are syncrhonously waiting for a message // and messages are not prefetched we then need to request another one - if(! getSession().prefetch()) + if(capacity == 0) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, @@ -333,7 +348,7 @@ public class BasicMessageConsumer_0_10 e public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); - if (messageListener != null && ! getSession().prefetch()) + if (messageListener != null && capacity == 0) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, @@ -372,11 +387,11 @@ public class BasicMessageConsumer_0_10 e */ public Object getMessageFromQueue(long l) throws InterruptedException { - if (! getSession().prefetch()) + if (capacity == 0) { _syncReceive.set(true); } - if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) + if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, @@ -385,23 +400,26 @@ public class BasicMessageConsumer_0_10 e Object o = super.getMessageFromQueue(l); if (o == null && _0_10session.isStarted()) { + _0_10session.getQpidSession().messageFlush (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC); _0_10session.getQpidSession().sync(); _0_10session.getQpidSession().messageFlow (getConsumerTagString(), MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - if (getSession().prefetch()) + + if (capacity > 0) { _0_10session.getQpidSession().messageFlow - (getConsumerTagString(), MessageCreditUnit.MESSAGE, - _0_10session.getAMQConnection().getMaxPrefetch(), - Option.UNRELIABLE); + (getConsumerTagString(), + MessageCreditUnit.MESSAGE, + capacity, + Option.UNRELIABLE); } _0_10session.syncDispatchQueue(); o = super.getMessageFromQueue(-1); } - if (! getSession().prefetch()) + if (capacity == 0) { _syncReceive.set(false); } Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=945945&r1=945944&r2=945945&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue May 18 23:05:36 2010 @@ -21,8 +21,12 @@ package org.apache.qpid.test.client.dest */ +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import javax.jms.Connection; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -37,8 +41,6 @@ import org.apache.qpid.test.utils.QpidTe import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import edu.emory.mathcs.backport.java.util.Collections; - public class AddressBasedDestinationTest extends QpidTestCase { private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class); @@ -48,7 +50,7 @@ public class AddressBasedDestinationTest public void setUp() throws Exception { super.setUp(); - _connection = getConnection(); + _connection = getConnection() ; _connection.start(); } @@ -211,6 +213,7 @@ public class AddressBasedDestinationTest "}, " + "x-bindings: [{exchange : 'amq.direct', key : test}, " + "{exchange : 'amq.fanout'}," + + "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," + "{exchange : 'amq.topic', key : 'a.#'}" + "]," + @@ -236,7 +239,15 @@ public class AddressBasedDestinationTest assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", - dest.getAddressName(),"a.#", null)); + dest.getAddressName(),"a.#", null)); + + Map<String,Object> args = new HashMap<String,Object>(); + args.put("x-match","any"); + args.put("dep","sales"); + args.put("loc","CA"); + assertTrue("Queue not bound as expected",( + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + dest.getAddressName(),null, args)); } @@ -273,8 +284,7 @@ public class AddressBasedDestinationTest // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", - dest.getQueueName(),"hello", Collections.emptyMap())); - + dest.getQueueName(),"hello", Collections.<String, Object>emptyMap())); } public void testBindQueueWithArgs() throws Exception @@ -331,6 +341,53 @@ public class AddressBasedDestinationTest dest.getAddressName(),null, a.getOptions())); } + /** + * Test goal: Verifies the capacity property in address string is handled properly. + * Test strategy: + * Creates a destination with capacity 10. + * Creates consumer with client ack. + * Sends 15 messages to the queue, tries to receive 10. + * Tries to receive the 11th message and checks if its null. + * + * Since capacity is 10 and we haven't acked any messages, + * we should not have received the 11th. + * + * Acks the 10th message and verifies we receive the rest of the msgs. + */ + public void testLinkCapacity() throws Exception + { + if (!isCppBroker()) + { + _logger.info("Not C++ broker, exiting test"); + return; + } + + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); + + String addr = "ADDR:my-queue; {create: always, link:{capacity: 10}}"; + AMQDestination dest = new AMQAnyDestination(addr); + MessageConsumer cons = jmsSession.createConsumer(dest); + MessageProducer prod = jmsSession.createProducer(dest); + + for (int i=0; i< 15; i++) + { + prod.send(jmsSession.createTextMessage("msg" + i) ); + } + + for (int i=0; i< 9; i++) + { + cons.receive(); + } + Message msg = cons.receive(RECEIVE_TIMEOUT); + assertNotNull("Should have received the 10th message",msg); + assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT)); + msg.acknowledge(); + for (int i=11; i<16; i++) + { + assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT)); + } + } + /*public void testBindQueueForXMLExchange() throws Exception { if (!isCppBroker()) --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org