Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 10557c73e -> 489a7b85c


QPID-8114: [Broker-J] Detach link with not-implemented error when unsupported 
filter is supplied among source filters


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/489a7b85
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/489a7b85
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/489a7b85

Branch: refs/heads/master
Commit: 489a7b85ca0335732595f701b49883ea4ed751c7
Parents: 10557c7
Author: Alex Rudyy <oru...@apache.org>
Authored: Thu Dec 6 16:32:24 2018 +0000
Committer: Alex Rudyy <oru...@apache.org>
Committed: Thu Dec 6 16:32:24 2018 +0000

----------------------------------------------------------------------
 .../protocol/v1_0/SendingLinkEndpoint.java      |  8 +++
 .../protocol/v1_0/codec/MapConstructor.java     | 10 ++-
 .../v1_0/codec/SpecializedDescribedType.java    | 67 ++++++++++++++++++++
 .../protocol/v1_0/type/messaging/Filter.java    | 16 ++++-
 .../qpid/tests/protocol/v1_0/FrameEncoder.java  |  6 ++
 .../v1_0/extensions/type/TestFilter.java        | 67 ++++++++++++++++++++
 .../extensions/type/TestFilterConstructor.java  | 58 +++++++++++++++++
 .../v1_0/extensions/type/TestFilterWriter.java  | 46 ++++++++++++++
 .../v1_0/extensions/filter/FilterTest.java      | 46 +++++++++++++-
 9 files changed, 318 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 8919413..b3603d9 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -185,6 +185,14 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
 
 
                     }
+                    else if (entry.getValue() instanceof Filter.InvalidFilter)
+                    {
+                        Error error = new Error();
+                        error.setCondition(AmqpError.NOT_IMPLEMENTED);
+                        error.setDescription("Unsupported filter type: " + 
((Filter.InvalidFilter)entry.getValue()).getDescriptor());
+                        
error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), 
Symbol.valueOf("filter")));
+                        throw new AmqpErrorException(error);
+                    }
                 }
             }
             source.setFilter(actualFilters.isEmpty() ? null : actualFilters);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
index 70f5ed2..85ae873 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
@@ -113,9 +113,15 @@ public class MapConstructor extends 
VariableWidthTypeConstructor<Map<Object,Obje
             }
 
             Object value = handler.parse(in);
-            if (value != null && !valueType.isAssignableFrom(value.getClass()))
+            if (value instanceof DescribedType
+                && SpecializedDescribedType.class.isAssignableFrom(valueType)
+                && 
SpecializedDescribedType.hasInvalidValue((Class<SpecializedDescribedType>)valueType))
             {
-                String message = String.format("Expected key type is '%s' but 
got '%s'",
+                value = 
SpecializedDescribedType.getInvalidValue((Class<SpecializedDescribedType>)valueType,
 (DescribedType) value);
+            }
+            else if (value != null && 
!valueType.isAssignableFrom(value.getClass()))
+            {
+                String message = String.format("Expected value type is '%s' 
but got '%s'",
                                                valueType.getSimpleName(),
                                                
value.getClass().getSimpleName());
                 throw new AmqpErrorException(AmqpError.DECODE_ERROR, message);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java
new file mode 100644
index 0000000..bb913f3
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+public interface SpecializedDescribedType
+{
+    static <X extends SpecializedDescribedType> X getInvalidValue(Class<X> 
clazz, DescribedType value) {
+        for(Method method : clazz.getMethods())
+        {
+            if(method.getName().equals("getInvalidValue")
+               && method.getParameterCount() == 1
+               && method.getParameterTypes()[0] == DescribedType.class
+               && method.getReturnType() == clazz
+               && (method.getModifiers() & (Modifier.STATIC | 
Modifier.PUBLIC)) == (Modifier.STATIC | Modifier.PUBLIC))
+            {
+                try
+                {
+                    return (X) method.invoke(null, value);
+                }
+                catch (IllegalAccessException | InvocationTargetException e)
+                {
+                    return null;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    static <X extends SpecializedDescribedType> boolean 
hasInvalidValue(Class<X> clazz)
+    {
+        for(Method method : clazz.getMethods())
+        {
+            if(method.getName().equals("getInvalidValue")
+               && method.getParameterCount() == 1
+               && method.getParameterTypes()[0] == DescribedType.class
+               && method.getReturnType() == clazz
+               && (method.getModifiers() & (Modifier.STATIC | 
Modifier.PUBLIC)) == (Modifier.STATIC | Modifier.PUBLIC))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
index c8dd0d9..caceb2d 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
@@ -23,6 +23,20 @@
 
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
-public interface Filter
+import org.apache.qpid.server.protocol.v1_0.codec.DescribedType;
+import org.apache.qpid.server.protocol.v1_0.codec.SpecializedDescribedType;
+
+public interface Filter extends SpecializedDescribedType
 {
+
+    interface InvalidFilter extends Filter
+    {
+        Object getDescriptor();
+    }
+
+    @SuppressWarnings("unused")
+    static Filter getInvalidValue(DescribedType describedType)
+    {
+        return (InvalidFilter) describedType::getDescriptor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
index 56d6e6f..748407f 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
@@ -30,6 +30,8 @@ import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
 import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.tests.protocol.OutputEncoder;
+import 
org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilterConstructor;
+import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilterWriter;
 
 public class FrameEncoder implements OutputEncoder
 {
@@ -39,6 +41,10 @@ public class FrameEncoder implements OutputEncoder
                                                                                
             .registerTransactionLayer()
                                                                                
             .registerSecurityLayer()
                                                                                
             .registerExtensionSoleconnLayer();
+    static{
+        TestFilterConstructor.register(TYPE_REGISTRY);
+        TestFilterWriter.register(TYPE_REGISTRY);
+    }
 
     @Override
     public ByteBuffer encode(final Object msg)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java
new file mode 100644
index 0000000..9e610d9
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.type;
+
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+
+public class TestFilter implements Filter
+{
+    private final String _value;
+
+    public TestFilter(String value)
+    {
+        _value = value;
+    }
+
+    public String getValue()
+    {
+        return _value;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "TestFilter{" + _value + '}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        TestFilter that = (TestFilter) o;
+
+        return _value != null ? _value.equals(that._value) : that._value == 
null;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return _value != null ? _value.hashCode() : 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java
new file mode 100644
index 0000000..de2cdb9
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.type;
+
+import 
org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
+import 
org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+
+public class TestFilterConstructor extends 
AbstractDescribedTypeConstructor<TestFilter>
+{
+    private static final Object[] DESCRIPTORS =
+            {Symbol.valueOf("apache.org:test-filter:string"), 
UnsignedLong.valueOf(0x0000468C0000000AL)};
+    private static final TestFilterConstructor INSTANCE = new 
TestFilterConstructor();
+
+    public static void register(DescribedTypeConstructorRegistry registry)
+    {
+        for (Object descriptor : DESCRIPTORS)
+        {
+            registry.register(descriptor, INSTANCE);
+        }
+    }
+
+    @Override
+    public TestFilter construct(Object underlying) throws AmqpErrorException
+    {
+        if (underlying instanceof String)
+        {
+            return new TestFilter((String) underlying);
+        }
+        else
+        {
+            final String msg = String.format("Cannot decode 
'apache.org:test-filter:string' from '%s'",
+                                             underlying == null ? null : 
underlying.getClass().getSimpleName());
+            throw new AmqpErrorException(AmqpError.DECODE_ERROR, msg);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java
new file mode 100644
index 0000000..093377e
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java
@@ -0,0 +1,46 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+package org.apache.qpid.tests.protocol.v1_0.extensions.type;
+
+import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.UnsignedLongWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+
+public class TestFilterWriter extends AbstractDescribedTypeWriter<TestFilter>
+{
+    private static final ValueWriter<UnsignedLong> DESCRIPTOR_WRITER = 
UnsignedLongWriter.getWriter(0x0000468C0000000AL);
+
+    private TestFilterWriter(final Registry registry, final TestFilter object)
+    {
+        super(DESCRIPTOR_WRITER, registry.getValueWriter(object.getValue()));
+    }
+
+    private static final Factory<TestFilter> FACTORY = TestFilterWriter::new;
+
+    public static void register(Registry registry)
+    {
+        registry.register(TestFilter.class, FACTORY);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
index 027c785..680c7b6 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
@@ -22,14 +22,19 @@ package 
org.apache.qpid.tests.protocol.v1_0.extensions.filter;
 
 import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.junit.After;
@@ -42,19 +47,19 @@ import 
org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilter;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 import org.apache.qpid.tests.utils.BrokerSpecific;
@@ -154,6 +159,41 @@ public class FilterTest extends BrokerAdminUsingTestBase
 
     }
 
+    @Test
+    @SpecificationTest(section = "3.5.1", description = "")
+    public void unsupportedFilter() throws Exception
+    {
+        final InetSocketAddress addr = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final Map<Symbol, Filter> filters = new HashMap<>();
+            filters.put(Symbol.valueOf("selector-filter"), new 
JMSSelectorFilter("index=1"));
+            filters.put(Symbol.valueOf("test-filter"), new TestFilter("foo"));
+            final Attach responseAttach = 
interaction.negotiateProtocol().consumeResponse()
+                                                     
.open().consumeResponse(Open.class)
+                                                     
.begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.RECEIVER)
+                                                     
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     
.attachSourceFilter(filters)
+                                                     
.attach().consumeResponse()
+                                                     
.getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(),
+                       
is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.SENDER));
+            assertThat(responseAttach.getSource(), is(nullValue()));
+            assertThat(responseAttach.getTarget(), is(nullValue()));
+
+            final Detach responseDetach = 
interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(true));
+            assertThat(responseDetach.getError(), is(notNullValue()));
+            assertThat(responseDetach.getError().getCondition(), 
is(equalTo(AmqpError.NOT_IMPLEMENTED)));
+
+            interaction.doCloseConnection();
+        }
+    }
+
     private QpidByteBuffer 
generateMessagePayloadWithApplicationProperties(final Map<String, Object> 
applicationProperties, String content)
     {
         MessageEncoder messageEncoder = new MessageEncoder();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to