Author: kwall
Date: Wed Nov  9 08:58:59 2011
New Revision: 1199662

URL: http://svn.apache.org/viewvc?rev=1199662&view=rev
Log:
QPID-3518: Introduce client side ability to detect server side support.

Applied patch from Oleksandr Rudyy<oru...@gmail.com> and myself.

Added:
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
    
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
 Wed Nov  9 08:58:59 2011
@@ -21,6 +21,8 @@
 
 package org.apache.qpid.server.configuration;
 
+import java.util.List;
+
 
 public interface BrokerConfig  extends 
ConfiguredObject<BrokerConfigType,BrokerConfig>
 {
@@ -44,6 +46,19 @@ public interface BrokerConfig  extends C
 
     String getDataDirectory();
 
+    String getFederationTag();
+
+    /**
+     * List of feature(s) to be advertised to clients on connection.
+     * Feature names are strings, beginning with qpid. followed by more or more
+     * words separated by minus signs e.g. qpid.jms-selector.
+     *
+     * If there are no features, this method must return an empty array.
+     *
+     * @return list of feature names
+     */
+    List<String> getFeatures();
+
     void addVirtualHost(VirtualHostConfig virtualHost);
 
     void createBrokerConnection(String transport,
@@ -53,5 +68,4 @@ public interface BrokerConfig  extends C
                                 String authMechanism,
                                 String username, String password);
 
-    String getFederationTag();
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
 Wed Nov  9 08:58:59 2011
@@ -22,7 +22,6 @@
 package org.apache.qpid.server.configuration;
 
 import java.util.*;
-import java.io.File;
 
 public final class BrokerConfigType extends ConfigObjectType<BrokerConfigType, 
BrokerConfig>
 {

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
 Wed Nov  9 08:58:59 2011
@@ -793,4 +793,16 @@ public class ServerConfiguration extends
     {
         return getIntValue("maximumChannelCount", 256);
     }
+
+    /**
+     * List of Broker features that have been disabled within configuration.  
Disabled
+     * features won't be advertised to the clients on connection.
+     *
+     * @return list of disabled features, or empty list if no features are 
disabled.
+     */
+    public List<String> getDisabledFeatures()
+    {
+        final List<String> disabledFeatures = getListValue("disabledFeatures", 
Collections.emptyList());
+        return disabledFeatures;
+    }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
 Wed Nov  9 08:58:59 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.federation;
 
+import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
 import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -252,7 +253,7 @@ public class BrokerLink implements LinkC
                 _qpidConnection.connect(_host, _port, _remoteVhost, _username, 
_password, "ssl".equals(_transport), _authMechanism);
 
                 final Map<String,Object> serverProps = 
_qpidConnection.getServerProperties();
-                _remoteFederationTag = (String) 
serverProps.get("qpid.federation_tag");
+                _remoteFederationTag = (String) 
serverProps.get(ServerPropertyNames.FEDERATION_TAG);
                 if(_remoteFederationTag == null)
                 {
                     _remoteFederationTag = 
UUID.fromString(_transport+":"+_host+":"+_port).toString();

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
 Wed Nov  9 08:58:59 2011
@@ -23,7 +23,10 @@ package org.apache.qpid.server.registry;
 import org.apache.qpid.server.configuration.*;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.common.ServerPropertyNames;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.List;
 import java.util.Map;
@@ -158,4 +161,19 @@ public class BrokerConfigAdapter impleme
     {
         return _federationTag;
     }
+
+    /**
+     * @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures()
+     */
+    @Override
+    public List<String> getFeatures()
+    {
+        final List<String> features = new ArrayList<String>();
+        if 
(!_instance.getConfiguration().getDisabledFeatures().contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR))
+        {
+            features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+        }
+
+        return Collections.unmodifiableList(features);
+    }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
 Wed Nov  9 08:58:59 2011
@@ -32,7 +32,9 @@ import java.util.StringTokenizer;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
+import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.configuration.BrokerConfig;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -58,15 +60,14 @@ import org.apache.qpid.transport.Session
 
 public class ServerConnectionDelegate extends ServerDelegate
 {
-    private String _localFQDN;
+    private final String _localFQDN;
     private final IApplicationRegistry _appRegistry;
 
     public ServerConnectionDelegate(IApplicationRegistry appRegistry, String 
localFQDN)
     {
-        this(new 
HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())),
 Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+        this(createConnectionProperties(appRegistry.getBroker()), 
Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
     }
 
-
     public ServerConnectionDelegate(Map<String, Object> properties,
                                     List<Object> locales,
                                     IApplicationRegistry appRegistry,
@@ -78,6 +79,18 @@ public class ServerConnectionDelegate ex
         _localFQDN = localFQDN;
     }
 
+    private static Map<String, Object> createConnectionProperties(final 
BrokerConfig brokerConfig)
+    {
+        final Map<String,Object> map = new HashMap<String,Object>(2);
+        map.put(ServerPropertyNames.FEDERATION_TAG, 
brokerConfig.getFederationTag());
+        final List<String> features = brokerConfig.getFeatures();
+        if (features != null && features.size() > 0)
+        {
+            map.put(ServerPropertyNames.QPID_FEATURES, features);
+        }
+        return map;
+    }
+
     private static List<Object> parseToList(String mechanisms)
     {
         List<Object> list = new ArrayList<Object>();

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
 Wed Nov  9 08:58:59 2011
@@ -1298,7 +1298,7 @@ public class ServerConfigurationTest ext
     }
 
     /**
-     * Test that a non-existant virtualhost file throws a {@link 
ConfigurationException}.
+     * Test that a non-existent virtualhost file throws a {@link 
ConfigurationException}.
      * <p>
      * Test for QPID-2624
      */
@@ -1326,7 +1326,27 @@ public class ServerConfigurationTest ext
         }
     }
     
-    /*
+    /**
+     * Tests that element disabledFeatures allows features that would
+     * otherwise be advertised by the broker to be turned off.
+     */
+    public void testDisabledFeatures() throws ConfigurationException
+    {
+        // Check default
+        _serverConfig.initialise();
+        _serverConfig = new ServerConfiguration(_config);
+        assertEquals("Unexpected size", 0, 
_serverConfig.getDisabledFeatures().size());
+
+        // Check value we set
+        _config.addProperty("disabledFeatures", "qpid.feature1");
+        _config.addProperty("disabledFeatures", "qpid.feature2");
+        _serverConfig = new ServerConfiguration(_config);
+
+        assertEquals("Unexpected size",2, 
_serverConfig.getDisabledFeatures().size());
+        assertTrue("Unexpected contents", 
_serverConfig.getDisabledFeatures().contains("qpid.feature1"));
+    }
+
+    /**
      * Tests that the old element security.jmx.access (that used to be used
      * to define JMX access rights) is rejected.
      */
@@ -1352,7 +1372,7 @@ public class ServerConfigurationTest ext
         }
     }
 
-    /*
+    /**
      * Tests that the old element security.jmx.principal-database (that used 
to define the
      * principal database used for JMX authentication) is rejected.
      */
@@ -1378,7 +1398,7 @@ public class ServerConfigurationTest ext
         }
     }
 
-    /*
+    /**
      * Tests that the old element security.principal-databases. ... (that used 
to define 
      * principal databases) is rejected.
      */
@@ -1403,7 +1423,7 @@ public class ServerConfigurationTest ext
         }
     }
 
-    /*
+    /**
      * Tests that the old element housekeeping.expiredMessageCheckPeriod. ... 
(that was
      * replaced by housekeeping.checkPeriod) is rejected.
      */

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Wed Nov  9 08:58:59 2011
@@ -1402,6 +1402,19 @@ public class AMQConnection extends Close
             return null;
         }
     }
+
+    /**
+     * Tests whether the Broker has advertised support for the named feature.
+     *
+     * @param featureName
+     *
+     * @return true if the feature is supported, or false otherwise.
+     */
+    boolean isSupportedServerFeature(final String featureName)
+    {
+        return _delegate.isSupportedServerFeature(featureName);
+    }
+
     public boolean isFailingOver()
     {
         return (_protocolHandler.getFailoverLatch() != null);

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
 Wed Nov  9 08:58:59 2011
@@ -65,4 +65,16 @@ public interface AMQConnectionDelegate
     ProtocolVersion getProtocolVersion();
 
     boolean verifyClientID() throws JMSException, AMQException;
+
+    /**
+     * Tests whether the server has advertised support for the specified 
feature
+     * via the qpid.features server connection property.  By convention the 
feature name
+     * with begin <code>qpid.</code> followed by one or more words separated 
by minus signs
+     * e.g. qpid.jms-selector.
+     *
+     * @param featureName name of feature.
+     *
+     * @return true if the feature is supported by the server
+     */
+    boolean isSupportedServerFeature(final String featureName);
 }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 Wed Nov  9 08:58:59 2011
@@ -1,4 +1,3 @@
-package org.apache.qpid.client;
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +19,7 @@ package org.apache.qpid.client;
  *
  */
 
+package org.apache.qpid.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -36,6 +36,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.transport.ClientConnectionDelegate;
+import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
@@ -63,16 +64,12 @@ public class AMQConnectionDelegate_0_10 
     private static final Logger _logger = 
LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
 
     /**
-     * The name of the UUID property
-     */
-    private static final String UUID_NAME = "qpid.federation_tag";
-    /**
      * The AMQ Connection.
      */
-    private AMQConnection _conn;
+    private final AMQConnection _conn;
 
     /**
-     * The QpidConeection instance that is mapped with thie JMS connection.
+     * The QpidConeection instance that is mapped with this JMS connection.
      */
     org.apache.qpid.transport.Connection _qpidConnection;
     private ConnectionException exception = null;
@@ -369,7 +366,32 @@ public class AMQConnectionDelegate_0_10 
 
     public String getUUID()
     {
-        return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
+        return 
(String)_qpidConnection.getServerProperties().get(ServerPropertyNames.FEDERATION_TAG);
+    }
+
+    /*
+     * @see 
org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+     */
+    public boolean isSupportedServerFeature(final String featureName)
+    {
+        if (featureName == null)
+        {
+            throw new IllegalArgumentException("featureName cannot be null");
+        }
+        final Map<String, Object> serverProperties = 
_qpidConnection.getServerProperties();
+        boolean featureSupported = false;
+        if (serverProperties != null && 
serverProperties.containsKey(ServerPropertyNames.QPID_FEATURES))
+        {
+            final Object supportServerFeatures = 
serverProperties.get(ServerPropertyNames.QPID_FEATURES);
+            featureSupported = supportServerFeatures instanceof List && 
((List<String>)supportServerFeatures).contains(featureName);
+        }
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Server support for feature '" + featureName + "' : 
" + featureSupported);
+        }
+
+        return featureSupported;
     }
 
     private ConnectionSettings retriveConnectionSettings(BrokerDetails 
brokerDetail)

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 Wed Nov  9 08:58:59 2011
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.net.ConnectException;
 import java.nio.channels.UnresolvedAddressException;
 import java.security.GeneralSecurityException;
-import java.security.Security;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -44,6 +43,7 @@ import org.apache.qpid.client.protocol.A
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateWaiter;
+import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.framing.BasicQosBody;
 import org.apache.qpid.framing.BasicQosOkBody;
 import org.apache.qpid.framing.ChannelOpenBody;
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
 public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
 {
     private static final Logger _logger = 
LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
-    private AMQConnection _conn;
+    private final AMQConnection _conn;
 
 
     public void closeConnection(long timeout) throws JMSException, AMQException
@@ -379,4 +379,14 @@ public class AMQConnectionDelegate_8_0 i
     {
         return true;
     }
+
+    /*
+     * @see 
org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+     */
+    public boolean isSupportedServerFeature(String featureName)
+    {
+        // The Qpid Java Broker 0-8..0-9-1 does not advertise features by the 
qpid.features property, so for now
+        // we just hardcode JMS selectors as supported.
+        return 
ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
+    }
 }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Nov  9 08:58:59 2011
@@ -535,7 +535,7 @@ public abstract class AMQSession<C exten
         {
             _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
         }
-        
+
         // Add creation logging to tie in with the existing close logging
         if (_logger.isInfoEnabled())
         {
@@ -1097,7 +1097,7 @@ public abstract class AMQSession<C exten
                     // possible to determine  when querying the broker whether 
there are no arguments or just a non-matching selector
                     // argument, as specifying null for the arguments when 
querying means they should not be checked at all
                     
args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == 
null ? "" : messageSelector);
-                    
+
                     // if the queue is bound to the exchange but NOT for this 
topic and selector, then the JMS spec
                     // says we must trash the subscription.
                     boolean isQueueBound = 
isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Wed Nov  9 08:58:59 2011
@@ -75,6 +75,7 @@ import org.apache.qpid.transport.Session
 import org.apache.qpid.transport.SessionListener;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.Serial;
+import org.apache.qpid.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -376,7 +377,7 @@ public class AMQSession_0_10 extends AMQ
                 _logger.debug("Binding queue : " + queue + 
                               " exchange: " + exchange + 
                               " using binding key " + binding.getBindingKey() 
+ 
-                              " with args " + printMap(binding.getArgs()));
+                              " with args " + 
Strings.printMap(binding.getArgs()));
                 getQpidSession().exchangeBind(queue, 
                                               exchange,
                                               binding.getBindingKey(),
@@ -1313,22 +1314,6 @@ public class AMQSession_0_10 extends AMQ
         dest.setRoutingKey(new AMQShortString(dest.getSubject()));
     }
     
-    /** This should be moved to a suitable utility class */
-    private String printMap(Map<String,Object> map)
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append("<");
-        if (map != null)
-        {
-            for(String key : map.keySet())
-            {
-                sb.append(key).append(" = ").append(map.get(key)).append(" ");
-            }
-        }
-        sb.append(">");
-        return sb.toString();
-    }
-
     protected void acknowledgeImpl()
     {
         RangeSet range = gatherUnackedRangeSet();

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Wed Nov  9 08:58:59 2011
@@ -20,10 +20,10 @@ package org.apache.qpid.client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -66,6 +66,9 @@ public class BasicMessageConsumer_0_10 e
     
     private final long _capacity;
 
+    /** Flag indicating if the server supports message selectors */
+    protected final boolean _serverJmsSelectorSupport;
+
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection 
connection, AMQDestination destination,
                                         String messageSelector, boolean 
noLocal, MessageFactoryRegistry messageFactory,
                                         AMQSession<?,?> session, 
AMQProtocolHandler protocolHandler,
@@ -80,6 +83,8 @@ public class BasicMessageConsumer_0_10 e
         _preAcquire = evaluatePreAcquire(browseOnly, destination);
 
         _capacity = evaluateCapacity(destination);
+        _serverJmsSelectorSupport = 
connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+
 
         if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == 
destination.getAddressType()) 
         {            
@@ -204,10 +209,9 @@ public class BasicMessageConsumer_0_10 e
     private boolean checkPreConditions(AbstractJMSMessage message) throws 
AMQException
     {
         boolean messageOk = true;
-        // TODO Use a tag for finding out if message filtering is done here or 
by the broker.
         try
         {
-            if (_messageSelectorFilter != null)
+            if (_messageSelectorFilter != null && !_serverJmsSelectorSupport)
             {
                 messageOk = _messageSelectorFilter.matches(message);
             }

Modified: 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
 Wed Nov  9 08:58:59 2011
@@ -47,12 +47,17 @@ public class MessageConverterTest extend
     protected JMSTextMessage testTextMessage;
 
     protected JMSMapMessage testMapMessage;
-    private AMQSession _session = new TestAMQSession();
+    private AMQConnection _connection;
+    private AMQSession _session;
 
 
     protected void setUp() throws Exception
     {
         super.setUp();
+
+        _connection =  new 
MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'");
+        _session = new TestAMQSession(_connection);
+
         testTextMessage = new 
JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
 
         //Set Message Text

Modified: 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
 Wed Nov  9 08:58:59 2011
@@ -29,10 +29,12 @@ import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.BasicMessageConsumer_0_8;
 import org.apache.qpid.client.BasicMessageProducer_0_8;
+import org.apache.qpid.client.MockAMQConnection;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -43,9 +45,9 @@ import org.apache.qpid.framing.FieldTabl
 public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, 
BasicMessageProducer_0_8>
 {
 
-    public TestAMQSession()
+    public TestAMQSession(AMQConnection connection)
     {
-        super(null, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
+        super(connection, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
     }
 
     public void acknowledgeMessage(long deliveryTag, boolean multiple)

Added: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java?rev=1199662&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
 (added)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
 Wed Nov  9 08:58:59 2011
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.common;
+
+/**
+ * Keys names used within the serverProperties argument of the ConnectionStart
+ * method.  These property names are Qpid specific.
+ */
+public final class ServerPropertyNames
+{
+    /**
+     * Server property: federation tag UUID
+     */
+    public static final String FEDERATION_TAG = "qpid.federation_tag";
+
+    /**
+     * Server property: array of features supported by the server.
+     */
+    public static final String QPID_FEATURES = "qpid.features";
+
+    /**
+     * Feature: Signifies that a server supports JMS selectors.
+     */
+    public static final String FEATURE_QPID_JMS_SELECTOR = "qpid.jms-selector";
+}

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java?rev=1199662&r1=1199661&r2=1199662&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java 
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java 
Wed Nov  9 08:58:59 2011
@@ -257,4 +257,19 @@ public final class Strings
         return join(sep, Arrays.asList(items));
     }
 
+    public static String printMap(Map<String,Object> map)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("<");
+        if (map != null)
+        {
+            for(String key : map.keySet())
+            {
+                sb.append(key).append(" = ").append(map.get(key)).append(" ");
+            }
+        }
+        sb.append(">");
+        return sb.toString();
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to