Repository: qpid-broker-j Updated Branches: refs/heads/master 10557c73e -> 489a7b85c
QPID-8114: [Broker-J] Detach link with not-implemented error when unsupported filter is supplied among source filters Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/489a7b85 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/489a7b85 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/489a7b85 Branch: refs/heads/master Commit: 489a7b85ca0335732595f701b49883ea4ed751c7 Parents: 10557c7 Author: Alex Rudyy <oru...@apache.org> Authored: Thu Dec 6 16:32:24 2018 +0000 Committer: Alex Rudyy <oru...@apache.org> Committed: Thu Dec 6 16:32:24 2018 +0000 ---------------------------------------------------------------------- .../protocol/v1_0/SendingLinkEndpoint.java | 8 +++ .../protocol/v1_0/codec/MapConstructor.java | 10 ++- .../v1_0/codec/SpecializedDescribedType.java | 67 ++++++++++++++++++++ .../protocol/v1_0/type/messaging/Filter.java | 16 ++++- .../qpid/tests/protocol/v1_0/FrameEncoder.java | 6 ++ .../v1_0/extensions/type/TestFilter.java | 67 ++++++++++++++++++++ .../extensions/type/TestFilterConstructor.java | 58 +++++++++++++++++ .../v1_0/extensions/type/TestFilterWriter.java | 46 ++++++++++++++ .../v1_0/extensions/filter/FilterTest.java | 46 +++++++++++++- 9 files changed, 318 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java index 8919413..b3603d9 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java @@ -185,6 +185,14 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target> } + else if (entry.getValue() instanceof Filter.InvalidFilter) + { + Error error = new Error(); + error.setCondition(AmqpError.NOT_IMPLEMENTED); + error.setDescription("Unsupported filter type: " + ((Filter.InvalidFilter)entry.getValue()).getDescriptor()); + error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); + throw new AmqpErrorException(error); + } } } source.setFilter(actualFilters.isEmpty() ? null : actualFilters); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java index 70f5ed2..85ae873 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java @@ -113,9 +113,15 @@ public class MapConstructor extends VariableWidthTypeConstructor<Map<Object,Obje } Object value = handler.parse(in); - if (value != null && !valueType.isAssignableFrom(value.getClass())) + if (value instanceof DescribedType + && SpecializedDescribedType.class.isAssignableFrom(valueType) + && SpecializedDescribedType.hasInvalidValue((Class<SpecializedDescribedType>)valueType)) { - String message = String.format("Expected key type is '%s' but got '%s'", + value = SpecializedDescribedType.getInvalidValue((Class<SpecializedDescribedType>)valueType, (DescribedType) value); + } + else if (value != null && !valueType.isAssignableFrom(value.getClass())) + { + String message = String.format("Expected value type is '%s' but got '%s'", valueType.getSimpleName(), value.getClass().getSimpleName()); throw new AmqpErrorException(AmqpError.DECODE_ERROR, message); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java new file mode 100644 index 0000000..bb913f3 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0.codec; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; + +public interface SpecializedDescribedType +{ + static <X extends SpecializedDescribedType> X getInvalidValue(Class<X> clazz, DescribedType value) { + for(Method method : clazz.getMethods()) + { + if(method.getName().equals("getInvalidValue") + && method.getParameterCount() == 1 + && method.getParameterTypes()[0] == DescribedType.class + && method.getReturnType() == clazz + && (method.getModifiers() & (Modifier.STATIC | Modifier.PUBLIC)) == (Modifier.STATIC | Modifier.PUBLIC)) + { + try + { + return (X) method.invoke(null, value); + } + catch (IllegalAccessException | InvocationTargetException e) + { + return null; + } + } + } + + return null; + } + + static <X extends SpecializedDescribedType> boolean hasInvalidValue(Class<X> clazz) + { + for(Method method : clazz.getMethods()) + { + if(method.getName().equals("getInvalidValue") + && method.getParameterCount() == 1 + && method.getParameterTypes()[0] == DescribedType.class + && method.getReturnType() == clazz + && (method.getModifiers() & (Modifier.STATIC | Modifier.PUBLIC)) == (Modifier.STATIC | Modifier.PUBLIC)) + { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java index c8dd0d9..caceb2d 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java @@ -23,6 +23,20 @@ package org.apache.qpid.server.protocol.v1_0.type.messaging; -public interface Filter +import org.apache.qpid.server.protocol.v1_0.codec.DescribedType; +import org.apache.qpid.server.protocol.v1_0.codec.SpecializedDescribedType; + +public interface Filter extends SpecializedDescribedType { + + interface InvalidFilter extends Filter + { + Object getDescriptor(); + } + + @SuppressWarnings("unused") + static Filter getInvalidValue(DescribedType describedType) + { + return (InvalidFilter) describedType::getDescriptor; + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java index 56d6e6f..748407f 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java @@ -30,6 +30,8 @@ import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame; import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.server.transport.ByteBufferSender; import org.apache.qpid.tests.protocol.OutputEncoder; +import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilterConstructor; +import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilterWriter; public class FrameEncoder implements OutputEncoder { @@ -39,6 +41,10 @@ public class FrameEncoder implements OutputEncoder .registerTransactionLayer() .registerSecurityLayer() .registerExtensionSoleconnLayer(); + static{ + TestFilterConstructor.register(TYPE_REGISTRY); + TestFilterWriter.register(TYPE_REGISTRY); + } @Override public ByteBuffer encode(final Object msg) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java new file mode 100644 index 0000000..9e610d9 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v1_0.extensions.type; + +import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter; + +public class TestFilter implements Filter +{ + private final String _value; + + public TestFilter(String value) + { + _value = value; + } + + public String getValue() + { + return _value; + } + + @Override + public String toString() + { + return "TestFilter{" + _value + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + TestFilter that = (TestFilter) o; + + return _value != null ? _value.equals(that._value) : that._value == null; + } + + @Override + public int hashCode() + { + return _value != null ? _value.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java new file mode 100644 index 0000000..de2cdb9 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v1_0.extensions.type; + +import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor; +import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; + +public class TestFilterConstructor extends AbstractDescribedTypeConstructor<TestFilter> +{ + private static final Object[] DESCRIPTORS = + {Symbol.valueOf("apache.org:test-filter:string"), UnsignedLong.valueOf(0x0000468C0000000AL)}; + private static final TestFilterConstructor INSTANCE = new TestFilterConstructor(); + + public static void register(DescribedTypeConstructorRegistry registry) + { + for (Object descriptor : DESCRIPTORS) + { + registry.register(descriptor, INSTANCE); + } + } + + @Override + public TestFilter construct(Object underlying) throws AmqpErrorException + { + if (underlying instanceof String) + { + return new TestFilter((String) underlying); + } + else + { + final String msg = String.format("Cannot decode 'apache.org:test-filter:string' from '%s'", + underlying == null ? null : underlying.getClass().getSimpleName()); + throw new AmqpErrorException(AmqpError.DECODE_ERROR, msg); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java new file mode 100644 index 0000000..093377e --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java @@ -0,0 +1,46 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +package org.apache.qpid.tests.protocol.v1_0.extensions.type; + +import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeWriter; +import org.apache.qpid.server.protocol.v1_0.codec.UnsignedLongWriter; +import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; + +public class TestFilterWriter extends AbstractDescribedTypeWriter<TestFilter> +{ + private static final ValueWriter<UnsignedLong> DESCRIPTOR_WRITER = UnsignedLongWriter.getWriter(0x0000468C0000000AL); + + private TestFilterWriter(final Registry registry, final TestFilter object) + { + super(DESCRIPTOR_WRITER, registry.getValueWriter(object.getValue())); + } + + private static final Factory<TestFilter> FACTORY = TestFilterWriter::new; + + public static void register(Registry registry) + { + registry.register(TestFilter.class, FACTORY); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/489a7b85/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java index 027c785..680c7b6 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java @@ -22,14 +22,19 @@ package org.apache.qpid.tests.protocol.v1_0.extensions.filter; import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.junit.After; @@ -42,19 +47,19 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter; import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter; -import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; -import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.protocol.v1_0.MessageEncoder; +import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilter; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; import org.apache.qpid.tests.utils.BrokerSpecific; @@ -154,6 +159,41 @@ public class FilterTest extends BrokerAdminUsingTestBase } + @Test + @SpecificationTest(section = "3.5.1", description = "") + public void unsupportedFilter() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + try (FrameTransport transport = new FrameTransport(addr).connect()) + { + final Interaction interaction = transport.newInteraction(); + final Map<Symbol, Filter> filters = new HashMap<>(); + filters.put(Symbol.valueOf("selector-filter"), new JMSSelectorFilter("index=1")); + filters.put(Symbol.valueOf("test-filter"), new TestFilter("foo")); + final Attach responseAttach = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.RECEIVER) + .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attachSourceFilter(filters) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + assertThat(responseAttach.getName(), is(notNullValue())); + assertThat(responseAttach.getHandle().longValue(), + is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue())))); + assertThat(responseAttach.getRole(), is(Role.SENDER)); + assertThat(responseAttach.getSource(), is(nullValue())); + assertThat(responseAttach.getTarget(), is(nullValue())); + + final Detach responseDetach = interaction.consumeResponse().getLatestResponse(Detach.class); + assertThat(responseDetach.getClosed(), is(true)); + assertThat(responseDetach.getError(), is(notNullValue())); + assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.NOT_IMPLEMENTED))); + + interaction.doCloseConnection(); + } + } + private QpidByteBuffer generateMessagePayloadWithApplicationProperties(final Map<String, Object> applicationProperties, String content) { MessageEncoder messageEncoder = new MessageEncoder(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org