Author: kwall
Date: Mon Nov  7 08:27:31 2011
New Revision: 1198642

URL: http://svn.apache.org/viewvc?rev=1198642&view=rev
Log:
QPID-2848: refactored message consumer: pre-aquire, capacity decisions are 
moved into consumer, renamed field noConsume into browseOnly, cleaned up 
selector filter code.

Applied patch from Andrew MacBean <andymacb...@gmail.com>, Oleksandr 
Rudyy<oru...@gmail.com> and myself.

Added:
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/filter/
    
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java
Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
    
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Nov  7 08:27:31 2011
@@ -89,6 +89,7 @@ import org.apache.qpid.client.message.Un
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
@@ -894,7 +895,7 @@ public abstract class AMQSession<C exten
         C consumer = _consumers.get(consumerTag);
         if (consumer != null)
         {
-            if (!consumer.isNoConsume())  // Normal Consumer
+            if (!consumer.isBrowseOnly())  // Normal Consumer
             {
                 // Clean the Maps up first
                 // Flush any pending messages for this consumerTag
@@ -2572,7 +2573,7 @@ public abstract class AMQSession<C exten
      * @param queueName
      */
     private void consumeFromQueue(C consumer, AMQShortString queueName,
-                                  AMQProtocolHandler protocolHandler, boolean 
nowait, String messageSelector) throws AMQException, FailoverException
+                                  AMQProtocolHandler protocolHandler, boolean 
nowait, MessageFilter messageSelector) throws AMQException, FailoverException
     {
         int tagId = _nextTag++;
 
@@ -2600,7 +2601,7 @@ public abstract class AMQSession<C exten
     }
 
     public abstract void sendConsume(C consumer, AMQShortString queueName,
-                                     AMQProtocolHandler protocolHandler, 
boolean nowait, String messageSelector, int tag) throws AMQException, 
FailoverException;
+                                     AMQProtocolHandler protocolHandler, 
boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, 
FailoverException;
 
     private P createProducerImpl(final Destination destination, final boolean 
mandatory, final boolean immediate)
             throws JMSException
@@ -2925,7 +2926,7 @@ public abstract class AMQSession<C exten
 
         try
         {
-            consumeFromQueue(consumer, queueName, protocolHandler, nowait, 
consumer._messageSelector);
+            consumeFromQueue(consumer, queueName, protocolHandler, nowait, 
consumer.getMessageSelectorFilter());
         }
         catch (FailoverException e)
         {
@@ -3210,7 +3211,7 @@ public abstract class AMQSession<C exten
 
                 for (C consumer : _consumers.values())
                 {
-                    if (!consumer.isNoConsume())
+                    if (!consumer.isBrowseOnly())
                     {
                         consumer.rollback();
                     }
@@ -3397,7 +3398,7 @@ public abstract class AMQSession<C exten
                     }
                     else
                     {
-                        if (consumer.isNoConsume())
+                        if (consumer.isBrowseOnly())
                         {
                             _dispatcherLogger.info("Received a message("
                                                    + 
System.identityHashCode(message) + ")" + "["

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Mon Nov  7 08:27:31 2011
@@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.
 import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
@@ -579,56 +580,30 @@ public class AMQSession_0_10 extends AMQ
      * Registers the consumer with the broker
      */
     public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString 
queueName, AMQProtocolHandler protocolHandler,
-                            boolean nowait, String messageSelector, int tag)
+                            boolean nowait, MessageFilter messageSelector, int 
tag)
             throws AMQException, FailoverException
     {        
-        boolean preAcquire;
-        
-        long capacity = getCapacity(consumer.getDestination());
-        
-        try
-        {
-            boolean isTopic;
-            Map<String, Object> arguments = 
FieldTable.convertToMap(consumer.getArguments());
-            
-            if (consumer.getDestination().getDestSyntax() == 
AMQDestination.DestSyntax.BURL)
-            {
-                isTopic = consumer.getDestination() instanceof AMQTopic ||
-                          
consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
 ;
-                
-                preAcquire = isTopic || (!consumer.isNoConsume()  && 
-                        (consumer.getMessageSelector() == null || 
consumer.getMessageSelector().equals("")));
-            }
-            else
-            {
-                isTopic = consumer.getDestination().getAddressType() == 
AMQDestination.TOPIC_TYPE;
-                
-                preAcquire = !consumer.isNoConsume() && 
-                             (isTopic || consumer.getMessageSelector() == null 
|| 
-                              consumer.getMessageSelector().equals(""));
-                
-                arguments.putAll(
-                        (Map<? extends String, ? extends Object>) 
consumer.getDestination().getLink().getSubscription().getArgs());
-            }
-            
-            boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
-            
-            if (consumer.getDestination().getLink() != null)
-            {
-                acceptModeNone = 
consumer.getDestination().getLink().getReliability() == 
Link.Reliability.UNRELIABLE;
-            }
-            
-            getQpidSession().messageSubscribe
-                (queueName.toString(), String.valueOf(tag),
-                 acceptModeNone ? MessageAcceptMode.NONE : 
MessageAcceptMode.EXPLICIT,
-                 preAcquire ? MessageAcquireMode.PRE_ACQUIRED : 
MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
-                 consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
-        }
-        catch (JMSException e)
+        boolean preAcquire = consumer.isPreAcquire();
+
+        AMQDestination destination = consumer.getDestination();
+        long capacity = consumer.getCapacity();
+
+        Map<String, Object> arguments = 
FieldTable.convertToMap(consumer.getArguments());
+
+        Link link = destination.getLink();
+        if (link != null && link.getSubscription() != null && 
link.getSubscription().getArgs() != null)
         {
-            throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when 
registering consumer", e);
+            arguments.putAll((Map<? extends String, ? extends Object>) 
link.getSubscription().getArgs());
         }
 
+        boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+
+        getQpidSession().messageSubscribe
+            (queueName.toString(), String.valueOf(tag),
+             acceptModeNone ? MessageAcceptMode.NONE : 
MessageAcceptMode.EXPLICIT,
+             preAcquire ? MessageAcquireMode.PRE_ACQUIRED : 
MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
+             consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
         String consumerTag = 
((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
 
         if (capacity == 0)
@@ -657,21 +632,6 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    private long getCapacity(AMQDestination destination)
-    {
-        long capacity = 0;
-        if (destination.getDestSyntax() == DestSyntax.ADDR && 
-                destination.getLink().getConsumerCapacity() > 0)
-        {
-            capacity = destination.getLink().getConsumerCapacity();
-        }
-        else if (prefetch())
-        {
-            capacity = getAMQConnection().getMaxPrefetch();
-        }
-        return capacity;
-    }
-
     /**
      * Create an 0_10 message producer
      */
@@ -836,7 +796,7 @@ public class AMQSession_0_10 extends AMQ
                 //only set if msg list is null
                 try
                 {
-                    long capacity = getCapacity(consumer.getDestination());
+                    long capacity = consumer.getCapacity();
                     
                     if (capacity == 0)
                     {

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 Mon Nov  7 08:27:31 2011
@@ -41,6 +41,7 @@ import org.apache.qpid.client.state.AMQS
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
@@ -333,13 +334,13 @@ public final class AMQSession_0_8 extend
                                       AMQShortString queueName,
                                       AMQProtocolHandler protocolHandler,
                                       boolean nowait,
-                                      String messageSelector,
+                                      MessageFilter messageSelector,
                                       int tag) throws AMQException, 
FailoverException
     {
         FieldTable arguments = FieldTableFactory.newFieldTable();
-        if ((messageSelector != null) && !messageSelector.equals(""))
+        if (messageSelector != null)
         {
-            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), 
messageSelector);
+            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), 
messageSelector.getSelector());
         }
 
         if (consumer.isAutoClose())
@@ -347,7 +348,7 @@ public final class AMQSession_0_8 extend
             arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
         }
 
-        if (consumer.isNoConsume())
+        if (consumer.isBrowseOnly())
         {
             arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
         }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Mon Nov  7 08:27:31 2011
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.*;
@@ -31,6 +34,7 @@ import org.apache.qpid.transport.Transpo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
@@ -52,7 +56,7 @@ public abstract class BasicMessageConsum
     /** The connection being used by this consumer */
     protected final AMQConnection _connection;
 
-    protected final String _messageSelector;
+    protected final MessageFilter _messageSelectorFilter;
 
     private final boolean _noLocal;
 
@@ -138,7 +142,7 @@ public abstract class BasicMessageConsum
      */
     private final boolean _autoClose;
 
-    private final boolean _noConsume;
+    private final boolean _browseOnly;
     private List<StackTraceElement> _closedStack = null;
 
 
@@ -147,11 +151,10 @@ public abstract class BasicMessageConsum
                                    String messageSelector, boolean noLocal, 
MessageFactoryRegistry messageFactory,
                                    AMQSession session, AMQProtocolHandler 
protocolHandler,
                                    FieldTable arguments, int prefetchHigh, int 
prefetchLow,
-                                   boolean exclusive, int acknowledgeMode, 
boolean noConsume, boolean autoClose)
+                                   boolean exclusive, int acknowledgeMode, 
boolean browseOnly, boolean autoClose) throws JMSException
     {
         _channelId = channelId;
         _connection = connection;
-        _messageSelector = messageSelector;
         _noLocal = noLocal;
         _destination = destination;
         _messageFactory = messageFactory;
@@ -164,10 +167,28 @@ public abstract class BasicMessageConsum
         
         _synchronousQueue = new LinkedBlockingQueue();
         _autoClose = autoClose;
-        _noConsume = noConsume;
+        _browseOnly = browseOnly;
+
+        try
+        {
+            if (messageSelector == null || "".equals(messageSelector.trim()))
+            {
+                _messageSelectorFilter = null;
+            }
+            else
+            {
+                _messageSelectorFilter = new 
JMSSelectorFilter(messageSelector);
+            }
+        }
+        catch (final AMQInternalException ie)
+        {
+            InvalidSelectorException ise = new 
InvalidSelectorException("cannot create consumer because of selector issue");
+            ise.setLinkedException(ie);
+            throw ise;
+        }
 
         // Force queue browsers not to use acknowledge modes.
-        if (_noConsume)
+        if (_browseOnly)
         {
             _acknowledgeMode = Session.NO_ACKNOWLEDGE;
         }
@@ -186,7 +207,7 @@ public abstract class BasicMessageConsum
     {
         checkPreConditions();
 
-        return _messageSelector;
+        return _messageSelectorFilter == null ? null 
:_messageSelectorFilter.getSelector();
     }
 
     public MessageListener getMessageListener() throws JMSException
@@ -345,6 +366,11 @@ public abstract class BasicMessageConsum
         return _receiving.get();
     }
 
+    public MessageFilter getMessageSelectorFilter()
+    {
+        return _messageSelectorFilter;
+    }
+
     public Message receive() throws JMSException
     {
         return receive(0);
@@ -874,9 +900,9 @@ public abstract class BasicMessageConsum
         return _autoClose;
     }
 
-    public boolean isNoConsume()
+    public boolean isBrowseOnly()
     {
-        return _noConsume;
+        return _browseOnly;
     }
 
     public void rollback()

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Mon Nov  7 08:27:31 2011
@@ -26,17 +26,14 @@ import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.transport.*;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.filter.JMSSelectorFilter;
 import org.apache.qpid.jms.Session;
 
-import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -52,11 +49,6 @@ public class BasicMessageConsumer_0_10 e
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
     /**
-     * The message selector filter associated with this consumer message 
selector
-     */
-    private MessageFilter _filter = null;
-
-    /**
      * The underlying QpidSession
      */
     private AMQSession_0_10 _0_10session;
@@ -64,7 +56,7 @@ public class BasicMessageConsumer_0_10 e
     /**
      * Indicates whether this consumer receives pre-acquired messages
      */
-    private boolean _preAcquire = true;
+    private final boolean _preAcquire;
 
     /**
      * Specify whether this consumer is performing a sync receive
@@ -72,44 +64,22 @@ public class BasicMessageConsumer_0_10 e
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
     private String _consumerTagString;
     
-    private long capacity = 0;
+    private final long _capacity;
 
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection 
connection, AMQDestination destination,
                                         String messageSelector, boolean 
noLocal, MessageFactoryRegistry messageFactory,
-                                        AMQSession session, AMQProtocolHandler 
protocolHandler,
+                                        AMQSession<?,?> session, 
AMQProtocolHandler protocolHandler,
                                         FieldTable arguments, int 
prefetchHigh, int prefetchLow,
-                                        boolean exclusive, int 
acknowledgeMode, boolean noConsume, boolean autoClose)
+                                        boolean exclusive, int 
acknowledgeMode, boolean browseOnly, boolean autoClose)
             throws JMSException
     {
         super(channelId, connection, destination, messageSelector, noLocal, 
messageFactory, session, protocolHandler,
-                arguments, prefetchHigh, prefetchLow, exclusive, 
acknowledgeMode, noConsume, autoClose);
+                arguments, prefetchHigh, prefetchLow, exclusive, 
acknowledgeMode, browseOnly, autoClose);
         _0_10session = (AMQSession_0_10) session;
-        if (messageSelector != null && !messageSelector.equals(""))
-        {
-            try
-            {
-                _filter = new JMSSelectorFilter(messageSelector);
-            }
-            catch (AMQInternalException e)
-            {
-                throw new InvalidSelectorException("cannot create consumer 
because of selector issue");
-            }
-            if (destination instanceof AMQQueue)
-            {
-                _preAcquire = false;
-            }
-        }
-        
-        // Destination setting overrides connection defaults
-        if (destination.getDestSyntax() == DestSyntax.ADDR && 
-                destination.getLink().getConsumerCapacity() > 0)
-        {
-            capacity = destination.getLink().getConsumerCapacity();
-        }
-        else if (getSession().prefetch())
-        {
-            capacity = _0_10session.getAMQConnection().getMaxPrefetch();
-        }
+
+        _preAcquire = evaluatePreAcquire(browseOnly, destination);
+
+        _capacity = evaluateCapacity(destination);
 
         if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == 
destination.getAddressType()) 
         {            
@@ -123,7 +93,6 @@ public class BasicMessageConsumer_0_10 e
         }
     }
 
-
     @Override public void setConsumerTag(int consumerTag)
     {
         super.setConsumerTag(consumerTag);
@@ -149,7 +118,7 @@ public class BasicMessageConsumer_0_10 e
         {
             if (checkPreConditions(jmsMessage))
             {
-                if (isMessageListenerSet() && capacity == 0)
+                if (isMessageListenerSet() && _capacity == 0)
                 {
                     messageFlow();
                 }
@@ -160,7 +129,7 @@ public class BasicMessageConsumer_0_10 e
             {
                 // if we are synchronously waiting for a message
                 // and messages are not pre-fetched we then need to request 
another one
-                if(capacity == 0)
+                if(_capacity == 0)
                 {
                    messageFlow();
                 }
@@ -238,9 +207,9 @@ public class BasicMessageConsumer_0_10 e
         // TODO Use a tag for fiding out if message filtering is done here or 
by the broker.
         try
         {
-            if (_messageSelector != null && !_messageSelector.equals(""))
+            if (_messageSelectorFilter != null)
             {
-                messageOk = _filter.matches(message);
+                messageOk = _messageSelectorFilter.matches(message);
             }
         }
         catch (Exception e)
@@ -275,11 +244,10 @@ public class BasicMessageConsumer_0_10 e
                 flushUnwantedMessage(message);
             }
         }
-
-        // now we need to acquire this message if needed
-        // this is the case of queue with a message selector set
-        if (!_preAcquire && messageOk && !isNoConsume())
+        else if (!_preAcquire && !isBrowseOnly())
         {
+            // now we need to acquire this message if needed
+            // this is the case of queue with a message selector set
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("filterMessage - trying to acquire message");
@@ -368,7 +336,7 @@ public class BasicMessageConsumer_0_10 e
         super.setMessageListener(messageListener);
         try
         {
-            if (messageListener != null && capacity == 0)
+            if (messageListener != null && _capacity == 0)
             {
                 messageFlow();
             }
@@ -408,11 +376,11 @@ public class BasicMessageConsumer_0_10 e
      */
     public Object getMessageFromQueue(long l) throws InterruptedException
     {
-        if (capacity == 0)
+        if (_capacity == 0)
         {
             _syncReceive.set(true);
         }
-        if (_0_10session.isStarted() && capacity == 0 && 
_synchronousQueue.isEmpty())
+        if (_0_10session.isStarted() && _capacity == 0 && 
_synchronousQueue.isEmpty())
         {
             messageFlow();
         }
@@ -427,18 +395,18 @@ public class BasicMessageConsumer_0_10 e
                 (getConsumerTagString(), MessageCreditUnit.BYTE,
                  0xFFFFFFFF, Option.UNRELIABLE);
             
-            if (capacity > 0)
+            if (_capacity > 0)
             {
                 _0_10session.getQpidSession().messageFlow
                                                (getConsumerTagString(),
                                                 MessageCreditUnit.MESSAGE,
-                                                capacity,
+                                                _capacity,
                                                 Option.UNRELIABLE);
             }
             _0_10session.syncDispatchQueue();
             o = super.getMessageFromQueue(-1);
         }
-        if (capacity == 0)
+        if (_capacity == 0)
         {
             _syncReceive.set(false);
         }
@@ -536,4 +504,51 @@ public class BasicMessageConsumer_0_10 e
             }
         }
     }
+
+    long getCapacity()
+    {
+        return _capacity;
+    }
+
+    boolean isPreAcquire()
+    {
+        return _preAcquire;
+    }
+
+    private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination 
destination)
+    {
+        boolean preAcquire;
+        if (browseOnly)
+        {
+            preAcquire = false;
+        }
+        else
+        {
+            boolean isQueue = (destination instanceof AMQQueue || 
getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
+            if (isQueue && getMessageSelectorFilter() != null)
+            {
+                preAcquire = false;
+            }
+            else
+            {
+                preAcquire = true;
+            }
+        }
+        return preAcquire;
+    }
+
+    private long evaluateCapacity(AMQDestination destination)
+    {
+        long capacity = 0;
+        if (destination.getLink() != null && 
destination.getLink().getConsumerCapacity() > 0)
+        {
+            capacity = destination.getLink().getConsumerCapacity();
+        }
+        else if (getSession().prefetch())
+        {
+            capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+        }
+        return capacity;
+    }
+
 }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
 Mon Nov  7 08:27:31 2011
@@ -20,16 +20,13 @@
  */
 package org.apache.qpid.client;
 
-import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.filter.JMSSelectorFilter;
 import org.apache.qpid.framing.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,23 +38,11 @@ public class BasicMessageConsumer_0_8 ex
     protected BasicMessageConsumer_0_8(int channelId, AMQConnection 
connection, AMQDestination destination,
                                        String messageSelector, boolean 
noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
                                        AMQProtocolHandler protocolHandler, 
FieldTable arguments, int prefetchHigh, int prefetchLow,
-                                       boolean exclusive, int acknowledgeMode, 
boolean noConsume, boolean autoClose) throws JMSException
+                                       boolean exclusive, int acknowledgeMode, 
boolean browseOnly, boolean autoClose) throws JMSException
     {
         super(channelId, connection, 
destination,messageSelector,noLocal,messageFactory,session,
               protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
-              acknowledgeMode, noConsume, autoClose);
-        try
-        {
-            
-            if (messageSelector != null && messageSelector.length() > 0)
-            {
-                JMSSelectorFilter _filter = new 
JMSSelectorFilter(messageSelector);
-            }
-        }
-        catch (AMQInternalException e)
-        {
-            throw new InvalidSelectorException("cannot create consumer because 
of selector issue");
-        }
+              acknowledgeMode, browseOnly, autoClose);
     }
 
     void sendCancel() throws AMQException, FailoverException

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
 Mon Nov  7 08:27:31 2011
@@ -26,16 +26,17 @@ import org.slf4j.LoggerFactory;
 
 public class JMSSelectorFilter implements MessageFilter
 {
-    /**
-     * this JMSSelectorFilter's logger
-     */
     private static final Logger _logger = 
LoggerFactory.getLogger(JMSSelectorFilter.class);
 
-    private String _selector;
-    private BooleanExpression _matcher;
+    private final String _selector;
+    private final BooleanExpression _matcher;
 
     public JMSSelectorFilter(String selector) throws AMQInternalException
     {
+        if (selector == null || "".equals(selector))
+        {
+            throw new IllegalArgumentException("Cannot create a 
JMSSelectorFilter with a null or empty selector string");
+        }
         _selector = selector;
         if (_logger.isDebugEnabled())
         {
@@ -51,8 +52,7 @@ public class JMSSelectorFilter implement
             boolean match = _matcher.matches(message);
             if (_logger.isDebugEnabled())
             {
-                _logger.debug(message + " match(" + match + ") selector(" + 
System
-                        .identityHashCode(_selector) + "):" + _selector);
+                _logger.debug(message + " match(" + match + ") selector(" + 
_selector + "): " + _selector);
             }
             return match;
         }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
 Mon Nov  7 08:27:31 2011
@@ -17,11 +17,11 @@
  */
 package org.apache.qpid.filter;
 
-import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 
 
 public interface MessageFilter
 {
-    boolean matches(AbstractJMSMessage message) throws AMQInternalException;
+    boolean matches(AbstractJMSMessage message);
+    String getSelector();
 }

Added: 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java?rev=1198642&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java
 (added)
+++ 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java
 Mon Nov  7 08:27:31 2011
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.filter;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.TestMessageHelper;
+
+public class JMSSelectorFilterTest extends TestCase
+{
+
+    public void testEmptySelectorFilter() throws Exception
+    {
+        try
+        {
+            new JMSSelectorFilter("");
+            fail("Should not be able to create a JMSSelectorFilter with an 
empty selector");
+        }
+        catch (IllegalArgumentException iae)
+        {
+            // pass
+        }
+    }
+
+    public void testNullSelectorFilter() throws Exception
+    {
+        try
+        {
+            new JMSSelectorFilter(null);
+            fail("Should not be able to create a JMSSelectorFilter with a null 
selector");
+        }
+        catch (IllegalArgumentException iae)
+        {
+            // pass
+        }
+    }
+
+    public void testInvalidSelectorFilter() throws Exception
+    {
+        try
+        {
+            new JMSSelectorFilter("$%^");
+            fail("Unparsable selector so expected AMQInternalException to be 
thrown");
+        }
+        catch (AMQInternalException amqie)
+        {
+            // pass
+        }
+    }
+
+    public void testSimpleSelectorFilter() throws Exception
+    {
+        MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select=5");
+
+        assertNotNull("Filter object is null", simpleSelectorFilter);
+        assertNotNull("Selector string is null", 
simpleSelectorFilter.getSelector());
+        assertEquals("Unexpected selector", "select=5", 
simpleSelectorFilter.getSelector());
+        assertTrue("Filter object is invalid", simpleSelectorFilter != null);
+
+        final JMSTextMessage message = TestMessageHelper.newJMSTextMessage();
+
+        message.setIntProperty("select", 4);
+        assertFalse("Selector did match when not expected", 
simpleSelectorFilter.matches(message));
+        message.setIntProperty("select", 5);
+        assertTrue("Selector didnt match when expected", 
simpleSelectorFilter.matches(message));
+        message.setIntProperty("select", 6);
+        assertFalse("Selector did match when not expected", 
simpleSelectorFilter.matches(message));
+    }
+
+    public void testFailedMatchingFilter() throws Exception
+    {
+        MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select>4");
+
+        assertNotNull("Filter object is null", simpleSelectorFilter);
+        assertNotNull("Selector string is null", 
simpleSelectorFilter.getSelector());
+        assertEquals("Unexpected selector", "select>4", 
simpleSelectorFilter.getSelector());
+        assertTrue("Filter object is invalid", simpleSelectorFilter != null);
+
+        final JMSTextMessage message = TestMessageHelper.newJMSTextMessage();
+
+        message.setStringProperty("select", "5");
+        assertFalse("Selector matched when not expected", 
simpleSelectorFilter.matches(message));
+        message.setStringProperty("select", "elephant");
+        assertFalse("Selector matched when not expected", 
simpleSelectorFilter.matches(message));
+        message.setBooleanProperty("select", false);
+        assertFalse("Selector matched when not expected", 
simpleSelectorFilter.matches(message));
+    }
+}

Modified: 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
 Mon Nov  7 08:27:31 2011
@@ -36,6 +36,7 @@ import org.apache.qpid.client.BasicMessa
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 
@@ -124,7 +125,7 @@ public class TestAMQSession extends AMQS
         return false;
     }
 
-    public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString 
queueName, AMQProtocolHandler protocolHandler, boolean nowait, String 
messageSelector, int tag) throws AMQException, FailoverException
+    public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString 
queueName, AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter 
messageSelector, int tag) throws AMQException, FailoverException
     {
 
     }

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
 Mon Nov  7 08:27:31 2011
@@ -261,8 +261,7 @@ public class QueueBrowserAutoAckTest ext
     protected void checkOverlappingMultipleGetEnum(int expectedMessages, int 
browserEnumerationCount, String selector) throws JMSException
     {
         QueueBrowser queueBrowser = selector == null ?
-                                _clientSession.createBrowser(_queue) : 
_clientSession.createBrowser(_queue);
-//                _clientSession.createBrowser(_queue) : 
_clientSession.createBrowser(_queue, selector);
+                                _clientSession.createBrowser(_queue) : 
_clientSession.createBrowser(_queue, selector);
 
         Enumeration[] msgs = new Enumeration[browserEnumerationCount];
         int[] msgCount = new int[browserEnumerationCount];
@@ -347,7 +346,7 @@ public class QueueBrowserAutoAckTest ext
     protected void checkQueueDepthWithSelectors(int totalMessages, int 
clients) throws JMSException
     {
 
-        String selector = MESSAGE_ID_PROPERTY + " % " + clients;
+        String selector = MESSAGE_ID_PROPERTY + " % " + clients + " = 0" ;
 
         checkOverlappingMultipleGetEnum(totalMessages / clients, clients, 
selector);
     }

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1198642&r1=1198641&r2=1198642&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
 Mon Nov  7 08:27:31 2011
@@ -1,4 +1,3 @@
-package org.apache.qpid.test.client.destination;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,11 +18,13 @@ package org.apache.qpid.test.client.dest
  * under the License.
  * 
  */
-
+package org.apache.qpid.test.client.destination;
 
 import java.util.Collections;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Hashtable;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -34,6 +35,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
+import javax.jms.QueueBrowser;
 import javax.jms.QueueReceiver;
 import javax.jms.QueueSession;
 import javax.jms.Session;
@@ -475,13 +477,13 @@ public class AddressBasedDestinationTest
         {
             prod.send(jmsSession.createTextMessage("msg" + i) );
         }
-        
-        for (int i=0; i< 9; i++)
+        Message msg = null;
+        for (int i=0; i< 10; i++)
         {
-            cons.receive();
+            msg = cons.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Should have received " + i + " message", msg);
+            assertEquals("Unexpected message received", "msg" + i, 
((TextMessage)msg).getText());
         }
-        Message msg = cons.receive(RECEIVE_TIMEOUT);
-        assertNotNull("Should have received the 10th message",msg);        
         assertNull("Shouldn't have received the 11th message as capacity is 
10",cons.receive(RECEIVE_TIMEOUT));
         msg.acknowledge();
         for (int i=11; i<16; i++)
@@ -1182,4 +1184,106 @@ public class AddressBasedDestinationTest
         assertNotNull("The consumer on the queue bound to the alt-exchange 
should receive the message",cons.receive(1000));
         cons.close();
     }
+
+    public void testQueueBrowserWithSelectorAutoAcknowledgement() throws 
Exception
+    {
+        assertQueueBrowserWithSelector(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void testQueueBrowserWithSelectorClientAcknowldgement() throws 
Exception
+    {
+        assertQueueBrowserWithSelector(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    public void testQueueBrowserWithSelectorTransactedSession() throws 
Exception
+    {
+        assertQueueBrowserWithSelector(Session.SESSION_TRANSACTED);
+    }
+
+    public void testConsumerWithSelectorAutoAcknowledgement() throws Exception
+    {
+        assertConsumerWithSelector(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void testConsumerWithSelectorClientAcknowldgement() throws Exception
+    {
+        assertConsumerWithSelector(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    public void testConsumerWithSelectorTransactedSession() throws Exception
+    {
+        assertConsumerWithSelector(Session.SESSION_TRANSACTED);
+    }
+
+    private void assertQueueBrowserWithSelector(int acknowledgement) throws 
Exception
+    {
+        String queueAddress = "ADDR:" + getTestQueueName() + ";{create: 
always}";
+
+        boolean transacted = acknowledgement == Session.SESSION_TRANSACTED;
+        Session session = _connection.createSession(transacted, 
acknowledgement);
+
+        Queue queue = session.createQueue(queueAddress);
+
+        final int numberOfMessages = 10;
+        List<Message> sentMessages = sendMessage(session, queue, 
numberOfMessages);
+        assertNotNull("Messages were not sent", sentMessages);
+        assertEquals("Unexpected number of messages were sent", 
numberOfMessages, sentMessages.size());
+
+        QueueBrowser browser = session.createBrowser(queue, INDEX + "%2=0");
+        _connection.start();
+
+        Enumeration<Message> enumaration = browser.getEnumeration();
+
+        int counter = 0;
+        int expectedIndex = 0;
+        while (enumaration.hasMoreElements())
+        {
+            Message m = enumaration.nextElement();
+            assertNotNull("Expected not null message at step " + counter, m);
+            int messageIndex = m.getIntProperty(INDEX);
+            assertEquals("Unexpected index", expectedIndex, messageIndex);
+            expectedIndex += 2;
+            counter++;
+        }
+        assertEquals("Unexpected number of messsages received", 5, counter);
+    }
+
+    private void assertConsumerWithSelector(int acknowledgement) throws 
Exception
+    {
+        String queueAddress = "ADDR:" + getTestQueueName() + ";{create: 
always}";
+
+        boolean transacted = acknowledgement == Session.SESSION_TRANSACTED;
+        Session session = _connection.createSession(transacted, 
acknowledgement);
+
+        Queue queue = session.createQueue(queueAddress);
+
+        final int numberOfMessages = 10;
+        List<Message> sentMessages = sendMessage(session, queue, 
numberOfMessages);
+        assertNotNull("Messages were not sent", sentMessages);
+        assertEquals("Unexpected number of messages were sent", 
numberOfMessages, sentMessages.size());
+
+        MessageConsumer consumer = session.createConsumer(queue, INDEX + 
"%2=0");
+
+        int expectedIndex = 0;
+        for (int i = 0; i < 5; i++)
+        {
+            Message m = consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Expected not null message at step " + i, m);
+            int messageIndex = m.getIntProperty(INDEX);
+            assertEquals("Unexpected index", expectedIndex, messageIndex);
+            expectedIndex += 2;
+
+            if (transacted)
+            {
+                session.commit();
+            }
+            else if (acknowledgement == Session.CLIENT_ACKNOWLEDGE)
+            {
+                m.acknowledge();
+            }
+        }
+
+        Message m = consumer.receive(RECEIVE_TIMEOUT);
+        assertNull("Unexpected message received", m);
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to