NO-JIRA: [Java Broker] Refactor AMQP 1.0 protocol tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/813ecbbc Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/813ecbbc Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/813ecbbc Branch: refs/heads/master Commit: 813ecbbc1e5308b0ac55e1da64fb42a6c6506410 Parents: 92b79cc Author: Lorenz Quack <lqu...@apache.org> Authored: Wed Jun 21 10:50:34 2017 +0100 Committer: Lorenz Quack <lqu...@apache.org> Committed: Wed Jun 21 10:51:40 2017 +0100 ---------------------------------------------------------------------- .../tests/protocol/v1_0/FrameTransport.java | 219 +------ .../qpid/tests/protocol/v1_0/Interaction.java | 616 +++++++++++++++++++ .../apache/qpid/tests/protocol/v1_0/Utils.java | 71 +-- .../bindmapjms/TemporaryDestinationTest.java | 43 +- .../soleconn/CloseExistingPolicy.java | 155 ++--- .../v1_0/extensions/soleconn/MixedPolicy.java | 118 ++-- .../soleconn/RefuseConnectionPolicy.java | 253 ++++---- .../extensions/websocket/WebSocketTest.java | 27 +- .../v1_0/messaging/DeleteOnCloseTest.java | 138 ++--- .../protocol/v1_0/messaging/MessageFormat.java | 61 +- .../protocol/v1_0/messaging/TransferTest.java | 371 ++++------- .../v1_0/transaction/DischargeTest.java | 167 ++--- .../v1_0/transport/ProtocolHeaderTest.java | 20 +- .../v1_0/transport/connection/OpenTest.java | 84 +-- .../v1_0/transport/link/AttachTest.java | 75 +-- .../protocol/v1_0/transport/link/FlowTest.java | 116 ++-- .../v1_0/transport/security/sasl/SaslTest.java | 169 +++-- .../v1_0/transport/session/BeginTest.java | 68 +- 18 files changed, 1363 insertions(+), 1408 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java index b72ed4e..9aeb4e9 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java @@ -23,18 +23,12 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; @@ -53,30 +47,19 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import org.hamcrest.CoreMatchers; -import org.hamcrest.core.Is; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame; import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame; import org.apache.qpid.server.protocol.v1_0.type.FrameBody; import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody; -import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; -import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; -import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; -import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; -import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; import org.apache.qpid.server.protocol.v1_0.type.transport.Close; -import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; -import org.apache.qpid.server.protocol.v1_0.type.transport.Open; -import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; public class FrameTransport implements AutoCloseable { public static final long RESPONSE_TIMEOUT = 6000; - private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>()); private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse(); private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(100); @@ -87,8 +70,6 @@ public class FrameTransport implements AutoCloseable private Channel _channel; private volatile boolean _channelClosedSeen = false; - private int _amqpConnectionId; - private short _amqpChannelId; public FrameTransport(final InetSocketAddress brokerAddress) { @@ -158,7 +139,6 @@ public class FrameTransport implements AutoCloseable } finally { - AMQP_CONNECTION_IDS.remove(_amqpConnectionId); _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync(); } } @@ -166,21 +146,23 @@ public class FrameTransport implements AutoCloseable public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception { Preconditions.checkState(_channel != null, "Not connected"); + ChannelPromise promise = _channel.newPromise(); ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); buffer.writeBytes(bytes); - ChannelFuture channelFuture = _channel.writeAndFlush(buffer); - channelFuture.sync(); - return JdkFutureAdapters.listenInPoolThread(channelFuture); + _channel.write(buffer, promise); + _channel.flush(); + return JdkFutureAdapters.listenInPoolThread(promise); } public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception { Preconditions.checkState(_channel != null, "Not connected"); + ChannelPromise promise = _channel.newPromise(); final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null; TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload); - ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame); - channelFuture.sync(); - return JdkFutureAdapters.listenInPoolThread(channelFuture); + _channel.write(transportFrame, promise); + _channel.flush(); + return JdkFutureAdapters.listenInPoolThread(promise); } public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception @@ -191,104 +173,17 @@ public class FrameTransport implements AutoCloseable return JdkFutureAdapters.listenInPoolThread(channelFuture); } - public ListenableFuture<Void> sendPerformative(final FrameBody frameBody) throws Exception - { - return sendPerformative(frameBody, UnsignedShort.valueOf(_amqpChannelId)); - } - - public ListenableFuture<Void> sendPipelined(final byte[] protocolHeader, final TransportFrame... frames) - throws InterruptedException - { - ChannelPromise promise = _channel.newPromise(); - if (protocolHeader != null) - { - ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); - buffer.writeBytes(protocolHeader); - _channel.write(buffer); - } - for (TransportFrame frame : frames) - { - _channel.write(frame, promise); - } - _channel.flush(); - return JdkFutureAdapters.listenInPoolThread(promise); - } - - public ListenableFuture<Void> sendPipelined(final TransportFrame... frames) throws InterruptedException - { - return sendPipelined(null, frames); - } - public <T extends Response<?>> T getNextResponse() throws Exception { return (T)_queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS); } - public <R extends Response<?>> R getNextResponse(Class<R> expectedResponseClass) throws Exception - { - R actualResponse = getNextResponse(); - if (actualResponse == null) - { - throw new IllegalStateException(String.format("No response received within timeout %d - expecting %s", - RESPONSE_TIMEOUT, expectedResponseClass.getName())); - } - else if (!expectedResponseClass.isAssignableFrom(actualResponse.getClass())) - { - throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s", - expectedResponseClass.getName(), - actualResponse.getClass().getName())); - } - - return actualResponse; - } - - public <T> T getNextResponseBody(Class<T> expectedFrameBodyClass) throws Exception - { - Response<T> response = getNextResponse(); - T actualFrameBody = response.getBody(); - - if (!expectedFrameBodyClass.isAssignableFrom(actualFrameBody.getClass())) - { - throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s", - expectedFrameBodyClass.getName(), - actualFrameBody.getClass().getName())); - } - - return actualFrameBody; - } - - public void doProtocolNegotiation() throws Exception - { - byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); - sendProtocolHeader(bytes); - HeaderResponse response = (HeaderResponse) getNextResponse(); - - if (!Arrays.equals(bytes, response.getBody())) - { - throw new IllegalStateException("Unexpected protocol header"); - } - } - - public void doOpenConnection() throws Exception - { - doProtocolNegotiation(); - Open open = new Open(); - - open.setContainerId(String.format("testContainer-%d", getConnectionId())); - sendPerformative(open, UnsignedShort.valueOf((short) 0)); - PerformativeResponse response = (PerformativeResponse) getNextResponse(); - if (!(response.getBody() instanceof Open)) - { - throw new IllegalStateException("Unexpected response to connection Open"); - } - } - public void doCloseConnection() throws Exception { Close close = new Close(); sendPerformative(close, UnsignedShort.valueOf((short) 0)); - PerformativeResponse response = (PerformativeResponse) getNextResponse(); + PerformativeResponse response = getNextResponse(); if (!(response.getBody() instanceof Close)) { throw new IllegalStateException(String.format( @@ -296,84 +191,6 @@ public class FrameTransport implements AutoCloseable } } - public void doBeginSession() throws Exception - { - doOpenConnection(); - Begin begin = new Begin(); - begin.setNextOutgoingId(UnsignedInteger.ZERO); - begin.setIncomingWindow(UnsignedInteger.ZERO); - begin.setOutgoingWindow(UnsignedInteger.ZERO); - _amqpChannelId = (short) 1; - sendPerformative(begin, UnsignedShort.valueOf(_amqpChannelId)); - PerformativeResponse response = (PerformativeResponse) getNextResponse(); - if (!(response.getBody() instanceof Begin)) - { - throw new IllegalStateException(String.format( - "Unexpected response to connection Begin. Expected Begin got '%s'", response.getBody())); - } - } - - public void doAttachReceivingLink(String queueName) throws Exception - { - doAttachReceivingLink(UnsignedInteger.ZERO, queueName); - } - - public void doAttachReceivingLink(final UnsignedInteger handle, String queueName) throws Exception - { - doBeginSession(); - Role localRole = Role.RECEIVER; - Attach attach = new Attach(); - attach.setName("testReceivingLink"); - attach.setHandle(handle); - attach.setRole(localRole); - Source source = new Source(); - source.setAddress(queueName); - attach.setSource(source); - Target target = new Target(); - attach.setTarget(target); - - sendPerformative(attach); - PerformativeResponse response = (PerformativeResponse) getNextResponse(); - - assertThat(response, is(notNullValue())); - assertThat(response.getBody(), is(instanceOf(Attach.class))); - Attach responseAttach = (Attach) response.getBody(); - assertThat(responseAttach.getSource(), is(notNullValue())); - } - - public void doAttachSendingLink(final UnsignedInteger handle, - final String destination) throws Exception - { - Attach attach = new Attach(); - attach.setName("testSendingLink"); - attach.setHandle(handle); - attach.setRole(Role.SENDER); - attach.setInitialDeliveryCount(UnsignedInteger.ZERO); - Source source = new Source(); - attach.setSource(source); - Target target = new Target(); - target.setAddress(destination); - attach.setTarget(target); - doAttachSendingLink(attach); - } - - public void doAttachSendingLink(final Attach attach) throws Exception - { - doBeginSession(); - - sendPerformative(attach); - PerformativeResponse response = (PerformativeResponse) getNextResponse(); - - assertThat(response, is(notNullValue())); - assertThat(response.getBody(), is(instanceOf(Attach.class))); - Attach responseAttach = (Attach) response.getBody(); - assertThat(responseAttach.getTarget(), is(notNullValue())); - - PerformativeResponse flowResponse = (PerformativeResponse) getNextResponse(); - assertThat(flowResponse, Is.is(CoreMatchers.notNullValue())); - assertThat(flowResponse.getBody(), Is.is(CoreMatchers.instanceOf(Flow.class))); - } - public void assertNoMoreResponses() throws Exception { Response response = getNextResponse(); @@ -386,19 +203,6 @@ public class FrameTransport implements AutoCloseable assertThat(_channelClosedSeen, is(true)); } - private int getConnectionId() - { - if (_amqpConnectionId == 0) - { - _amqpConnectionId = 1; - while (!AMQP_CONNECTION_IDS.add(_amqpConnectionId)) - { - ++_amqpConnectionId; - } - } - return _amqpConnectionId; - } - private static class ChannelClosedResponse implements Response<Void> { @Override @@ -413,4 +217,9 @@ public class FrameTransport implements AutoCloseable return null; } } + + public Interaction newInteraction() + { + return new Interaction(this); + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java new file mode 100644 index 0000000..eac8770 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tests.protocol.v1_0; + +import static com.google.common.util.concurrent.Futures.allAsList; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.FrameBody; +import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; +import org.apache.qpid.server.protocol.v1_0.type.transport.Close; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; + +public class Interaction +{ + private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Begin _begin; + private final Open _open; + private final Close _close; + private final Attach _attach; + private final Detach _detach; + private final Flow _flow; + private final Transfer _transfer; + private final FrameTransport _transport; + private final SaslInit _saslInit; + private final SaslResponse _saslResponse; + private byte[] _protocolHeader; + private UnsignedShort _connectionChannel; + private UnsignedShort _sessionChannel; + private Response<?> _latestResponse; + private ListenableFuture<?> _latestFuture; + + Interaction(final FrameTransport frameTransport) + { + final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO; + _transport = frameTransport; + + _protocolHeader = "AMQP\0\1\0\0".getBytes(UTF_8); + + _saslInit = new SaslInit(); + _saslResponse = new SaslResponse(); + + _open = new Open(); + _open.setContainerId(getConnectionId()); + _close = new Close(); + _connectionChannel = UnsignedShort.valueOf(0); + + _begin = new Begin(); + _begin.setNextOutgoingId(UnsignedInteger.ZERO); + _begin.setIncomingWindow(UnsignedInteger.ZERO); + _begin.setOutgoingWindow(UnsignedInteger.ZERO); + _sessionChannel = UnsignedShort.valueOf(1); + + _attach = new Attach(); + _attach.setName("testLink"); + _attach.setHandle(defaultLinkHandle); + _attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + _attach.setSource(new Source()); + _attach.setTarget(new Target()); + + _detach = new Detach(); + _detach.setHandle(_attach.getHandle()); + + _flow = new Flow(); + _flow.setNextIncomingId(UnsignedInteger.ZERO); + _flow.setIncomingWindow(UnsignedInteger.ZERO); + _flow.setNextOutgoingId(UnsignedInteger.ZERO); + _flow.setOutgoingWindow(UnsignedInteger.ZERO); + + _transfer = new Transfer(); + _transfer.setHandle(defaultLinkHandle); + _transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8))); + _transfer.setDeliveryId(UnsignedInteger.ZERO); + } + + ///////////////////////// + // Protocol Negotiation // + ///////////////////////// + + public Interaction protocolHeader(byte[] header) + { + _protocolHeader = header; + return this; + } + + public Interaction negotiateProtocol() throws Exception + { + final ListenableFuture<Void> future = _transport.sendProtocolHeader(_protocolHeader); + if (_latestFuture != null) + { + _latestFuture = allAsList(_latestFuture, future); + } + else + { + _latestFuture = future; + } + return this; + } + + ////////// + // SASL // + ////////// + + public Interaction saslMechanism(final Symbol mechanism) + { + _saslInit.setMechanism(mechanism); + return this; + } + + public Interaction saslInitialResponse(final Binary initialResponse) + { + _saslInit.setInitialResponse(initialResponse); + return this; + } + + public Interaction saslInit() throws Exception + { + sendPerformativeAndChainFuture(_saslInit); + return this; + } + + public Interaction saslResponseResponse(Binary response) + { + _saslResponse.setResponse(response); + return this; + } + + public Interaction saslResponse() throws Exception + { + sendPerformativeAndChainFuture(_saslResponse); + return this; + } + + //////////////// + // Connection // + //////////////// + + public Interaction connectionChannel(UnsignedShort connectionChannel) + { + _connectionChannel = connectionChannel; + return this; + } + + public Interaction openContainerId(String containerId) + { + _open.setContainerId(containerId); + return this; + } + + public Interaction openHostname(String hostname) + { + _open.setHostname(hostname); + return this; + } + + public Interaction openChannelMax(UnsignedShort channelMax) + { + _open.setChannelMax(channelMax); + return this; + } + + public Interaction openDesiredCapabilities(final Symbol... desiredCapabilities) + { + _open.setDesiredCapabilities(desiredCapabilities); + return this; + } + + public Interaction openProperties(final Map<Symbol, Object> properties) + { + _open.setProperties(properties); + return this; + } + + public Interaction open() throws Exception + { + sendPerformativeAndChainFuture(_open, _connectionChannel); + return this; + } + + public Interaction close() throws Exception + { + sendPerformativeAndChainFuture(_close, _connectionChannel); + return this; + } + + private String getConnectionId() + { + int index = 1; + String containerId = String.format("testContainer-%d", index); + while (CONTAINER_IDS.contains(containerId)) + { + ++index; + containerId = String.format("testContainer-%d", index); + } + CONTAINER_IDS.add(containerId); + return containerId; + } + + ///////////// + // Session // + ///////////// + + public Interaction sessionChannel(UnsignedShort sessionChannel) + { + _sessionChannel = sessionChannel; + return this; + } + + public Interaction beginNextOutgoingId(UnsignedInteger nextOutgoingId) + { + _begin.setNextOutgoingId(nextOutgoingId); + return this; + } + + public Interaction beginIncomingWindow(UnsignedInteger incomingWindow) + { + _begin.setIncomingWindow(incomingWindow); + return this; + } + + public Interaction beginOutgoingWindow(UnsignedInteger outgoingWindow) + { + _begin.setOutgoingWindow(outgoingWindow); + return this; + } + + public Interaction begin() throws Exception + { + sendPerformativeAndChainFuture(_begin, _sessionChannel); + return this; + } + + ////////// + // Link // + ////////// + + + public Interaction attachName(String linkName) + { + _attach.setName(linkName); + return this; + } + + public Interaction attachRole(Role role) + { + _attach.setRole(role); + return this; + } + + public Interaction attachHandle(UnsignedInteger handle) + { + _attach.setHandle(handle); + _detach.setHandle(handle); + return this; + } + + public Interaction attachInitialDeliveryCount(UnsignedInteger initialDeliveryCount) + { + _attach.setInitialDeliveryCount(initialDeliveryCount); + return this; + } + + public Interaction attachRcvSettleMode(final ReceiverSettleMode rcvSettleMode) + { + _attach.setRcvSettleMode(rcvSettleMode); + return this; + } + + public Interaction attachSource(Source source) + { + _attach.setSource(source); + return this; + } + + public Interaction attachTarget(BaseTarget target) + { + _attach.setTarget(target); + return this; + } + + public Interaction attachSourceAddress(String address) + { + Source source = (Source) _attach.getSource(); + source.setAddress(address); + _attach.setSource(source); + return this; + } + + public Interaction attachSourceOutcomes(final Symbol... outcomes) + { + Source source = ((Source) _attach.getSource()); + source.setOutcomes(outcomes); + _attach.setSource(source); + return this; + } + + public Interaction attachTargetAddress(final String address) + { + final Target target = ((Target) _attach.getTarget()); + target.setAddress(address); + _attach.setTarget(target); + return this; + } + + public Interaction attach() throws Exception + { + sendPerformativeAndChainFuture(_attach, _sessionChannel); + return this; + } + + public Interaction detachClose(Boolean close) + { + _detach.setClosed(close); + return this; + } + + public Interaction detach() throws Exception + { + sendPerformativeAndChainFuture(_detach, _sessionChannel); + return this; + } + + ////////// + // FLow // + ////////// + + public Interaction flowIncomingWindow(final UnsignedInteger incomingWindow) + { + _flow.setIncomingWindow(incomingWindow); + return this; + } + + public Interaction flowNextIncomingId(final UnsignedInteger nextIncomingId) + { + _flow.setNextIncomingId(nextIncomingId); + return this; + } + + public Interaction flowOutgoingWindow(final UnsignedInteger outgoingWindow) + { + _flow.setOutgoingWindow(outgoingWindow); + return this; + } + + public Interaction flowNextOutgoingId(final UnsignedInteger nextNextOutgoingId) + { + _flow.setNextOutgoingId(nextNextOutgoingId); + return this; + } + + public Interaction flowEcho(final Boolean echo) + { + _flow.setEcho(echo); + return this; + } + + public Interaction flowHandle(final UnsignedInteger handle) + { + _flow.setHandle(handle); + return this; + } + + public Interaction flowAvailable(final UnsignedInteger available) + { + _flow.setAvailable(available); + return this; + } + + public Interaction flowDeliveryCount(final UnsignedInteger deliveryCount) + { + _flow.setDeliveryCount(deliveryCount); + return this; + } + + public Interaction flowLinkCredit(final UnsignedInteger linkCredit) + { + _flow.setLinkCredit(linkCredit); + return this; + } + + public Interaction flowDrain(final Boolean drain) + { + _flow.setDrain(drain); + return this; + } + + public Interaction flow() throws Exception + { + sendPerformativeAndChainFuture(_flow, _sessionChannel); + return this; + } + + ////////////// + // Transfer // + ////////////// + + public Interaction transferHandle(UnsignedInteger transferHandle) + { + _transfer.setHandle(transferHandle); + return this; + } + + public Interaction transferDeliveryTag(final Binary deliveryTag) + { + _transfer.setDeliveryTag(deliveryTag); + return this; + } + + public Interaction transferDeliveryId(final UnsignedInteger deliveryId) + { + _transfer.setDeliveryId(deliveryId); + return this; + } + + public Interaction transferRcvSettleMode(final ReceiverSettleMode receiverSettleMode) + { + _transfer.setRcvSettleMode(receiverSettleMode); + return this; + } + + public Interaction transferMore(final Boolean more) + { + _transfer.setMore(more); + return this; + } + + public Interaction transferMessageFormat(final UnsignedInteger messageFormat) + { + _transfer.setMessageFormat(messageFormat); + return this; + } + + public Interaction transferPayload(final List<QpidByteBuffer> payload) + { + _transfer.setPayload(payload); + return this; + } + + public Interaction transferPayloadData(final Object payload) + { + AmqpValue amqpValue = new AmqpValue(payload); + final AmqpValueSection section = amqpValue.createEncodingRetainingSection(); + final List<QpidByteBuffer> encodedForm = section.getEncodedForm(); + _transfer.setPayload(encodedForm); + + section.dispose(); + for (QpidByteBuffer qbb : encodedForm) + { + qbb.dispose(); + } + return this; + } + + public Interaction transferSettled(final Boolean settled) + { + _transfer.setSettled(settled); + return this; + } + + public Interaction transfer() throws Exception + { + sendPerformativeAndChainFuture(_transfer, _sessionChannel); + return this; + } + + ////////// + // misc // + ////////// + + public Interaction sendPerformative(final FrameBody frameBody, + final UnsignedShort channel) throws Exception + { + sendPerformativeAndChainFuture(frameBody, channel); + return this; + } + + public Interaction sendPerformative(final SaslFrameBody saslFrameBody) throws Exception + { + sendPerformativeAndChainFuture(saslFrameBody); + return this; + } + + private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception + { + final ListenableFuture<Void> future = _transport.sendPerformative(frameBody); + if (_latestFuture != null) + { + _latestFuture = allAsList(_latestFuture, future); + } + else + { + _latestFuture = future; + } + } + + private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception + { + final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, channel); + if (_latestFuture != null) + { + _latestFuture = allAsList(_latestFuture, future); + } + else + { + _latestFuture = future; + } + } + + public Interaction consumeResponse(final Class<?>... responseTypes) throws Exception + { + sync(); + _latestResponse = _transport.getNextResponse(); + final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes)); + if ((acceptableResponseClasses.isEmpty() && _latestResponse != null) + || (acceptableResponseClasses.contains(null) && _latestResponse == null)) + { + return this; + } + acceptableResponseClasses.remove(null); + for (Class<?> acceptableResponseClass : acceptableResponseClasses) + { + if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass())) + { + return this; + } + } + throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.", + acceptableResponseClasses, + _latestResponse == null ? null : _latestResponse.getBody())); + } + + public Interaction sync() throws InterruptedException, ExecutionException, TimeoutException + { + if (_latestFuture != null) + { + _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS); + _latestFuture = null; + } + return this; + } + + public Response<?> getLatestResponse() throws Exception + { + sync(); + return _latestResponse; + } + + public <T> T getLatestResponse(Class<T> type) throws Exception + { + sync(); + if (!type.isAssignableFrom(_latestResponse.getBody().getClass())) + { + throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.", + type.getSimpleName(), + _latestResponse.getBody())); + } + + return (T) _latestResponse.getBody(); + } + + public Interaction flowHandleFromLinkHandle() + { + _flow.setHandle(_attach.getHandle()); + return this; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java index 5d4c52f..bfe05b8 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java @@ -20,21 +20,15 @@ package org.apache.qpid.tests.protocol.v1_0; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import java.net.InetSocketAddress; -import org.hamcrest.core.Is; - import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; -import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; -import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; -import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; @@ -45,39 +39,25 @@ public class Utils { try (FrameTransport transport = new FrameTransport(brokerAddress).connect()) { - transport.doBeginSession(); - + final Interaction interaction = transport.newInteraction(); + final Attach attachValidationResponse = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse() + .begin().consumeResponse() + .attachName("validationAttach") + .attachRole(Role.RECEIVER) + .attachSourceAddress(nodeAddress) + .attach().consumeResponse() + .getLatestResponse(Attach.class); final boolean queueExists; - Attach validationAttach = new Attach(); - validationAttach.setName("validationAttach"); - validationAttach.setHandle(UnsignedInteger.ZERO); - validationAttach.setRole(Role.RECEIVER); - Source validationSource = new Source(); - validationSource.setAddress(nodeAddress); - validationAttach.setSource(validationSource); - validationAttach.setTarget(new Target()); - transport.sendPerformative(validationAttach); - PerformativeResponse validationResponse = (PerformativeResponse) transport.getNextResponse(); - assertThat(validationResponse, is(notNullValue())); - assertThat(validationResponse.getBody(), is(instanceOf(Attach.class))); - final Attach attachValidationResponse = (Attach) validationResponse.getBody(); if (attachValidationResponse.getSource() != null) { queueExists = true; - Detach validationDetach = new Detach(); - validationDetach.setHandle(validationAttach.getHandle()); - validationDetach.setClosed(true); - transport.sendPerformative(validationDetach); - PerformativeResponse validationDetachResponse = (PerformativeResponse) transport.getNextResponse(); - assertThat(validationDetachResponse, is(notNullValue())); - assertThat(validationDetachResponse.getBody(), is(instanceOf(Detach.class))); + interaction.detachClose(true).detach().consumeResponse().getLatestResponse(Detach.class); } else { queueExists = false; - PerformativeResponse validationDetachResponse = (PerformativeResponse) transport.getNextResponse(); - assertThat(validationDetachResponse, is(notNullValue())); - assertThat(validationDetachResponse.getBody(), is(instanceOf(Detach.class))); + interaction.consumeResponse().getLatestResponse(Detach.class); } return queueExists; } @@ -88,25 +68,26 @@ public class Utils { try (FrameTransport transport = new FrameTransport(brokerAddress).connect()) { - transport.doAttachReceivingLink(queueName); - Flow flow = new Flow(); - flow.setIncomingWindow(UnsignedInteger.ONE); - flow.setNextIncomingId(UnsignedInteger.ZERO); - flow.setOutgoingWindow(UnsignedInteger.ZERO); - flow.setNextOutgoingId(UnsignedInteger.ZERO); - flow.setHandle(UnsignedInteger.ZERO); - flow.setLinkCredit(UnsignedInteger.ONE); - - transport.sendPerformative(flow); + final Interaction interaction = transport.newInteraction() + .negotiateProtocol().consumeResponse() + .open().consumeResponse() + .begin().consumeResponse() + .attachRole(Role.RECEIVER) + .attachSourceAddress(queueName) + .attach().consumeResponse() + .flowIncomingWindow(UnsignedInteger.ONE) + .flowNextIncomingId(UnsignedInteger.ZERO) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowLinkCredit(UnsignedInteger.ONE) + .flowHandleFromLinkHandle() + .flow(); MessageDecoder messageDecoder = new MessageDecoder(); boolean hasMore; do { - PerformativeResponse response = (PerformativeResponse) transport.getNextResponse(); - assertThat(response, Is.is(notNullValue())); - assertThat(response.getBody(), Is.is(instanceOf(Transfer.class))); - Transfer responseTransfer = (Transfer) response.getBody(); + Transfer responseTransfer = interaction.consumeResponse().getLatestResponse(Transfer.class); messageDecoder.addTransfer(responseTransfer); hasMore = Boolean.TRUE.equals(responseTransfer.getMore()); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java index e9a6957..1a1c8bf 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java @@ -20,7 +20,6 @@ package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -33,17 +32,17 @@ import org.junit.Test; import org.apache.qpid.server.protocol.v1_0.Session_1_0; import org.apache.qpid.server.protocol.v1_0.type.Symbol; -import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose; -import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; -import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse; +import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase; import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.Utils; @@ -84,29 +83,21 @@ public class TemporaryDestinationTest extends ProtocolTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - transport.doBeginSession(); - - Attach attach = new Attach(); - attach.setName("testSendingLink"); - attach.setHandle(UnsignedInteger.ZERO); - attach.setRole(Role.SENDER); - attach.setInitialDeliveryCount(UnsignedInteger.ZERO); - - attach.setSource(new Source()); - Target target = new Target(); target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); target.setDynamic(true); target.setCapabilities(targetCapabilities); - attach.setTarget(target); - transport.sendPerformative(attach); + final Interaction interaction = transport.newInteraction(); + final Attach attachResponse = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachTarget(target) + .attach().consumeResponse() + .getLatestResponse(Attach.class); - PerformativeResponse response = (PerformativeResponse) transport.getNextResponse(); - assertThat(response, is(notNullValue())); - assertThat(response.getBody(), is(instanceOf(Attach.class))); - final Attach attachResponse = (Attach) response.getBody(); assertThat(attachResponse.getSource(), is(notNullValue())); assertThat(attachResponse.getTarget(), is(notNullValue())); @@ -115,19 +106,11 @@ public class TemporaryDestinationTest extends ProtocolTestBase assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(true)); - final PerformativeResponse flowResponse = ((PerformativeResponse) transport.getNextResponse()); - if (flowResponse != null) - { - assertThat(flowResponse.getBody(), is(instanceOf(Flow.class))); - } + interaction.consumeResponse().getLatestResponse(Flow.class); transport.doCloseConnection(); } - try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) - { - transport.doBeginSession(); - assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false)); - } + assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false)); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java index 6d8007a..83303ad 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java @@ -20,12 +20,11 @@ package org.apache.qpid.tests.protocol.v1_0.extensions.soleconn; -import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER; import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY; import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY; +import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER; import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy.CLOSE_EXISTING; import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -48,7 +47,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Close; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; -import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse; +import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase; public class CloseExistingPolicy extends ProtocolTestBase @@ -66,19 +65,16 @@ public class CloseExistingPolicy extends ProtocolTestBase { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - transport.doProtocolNegotiation(); - Open open = new Open(); - open.setContainerId("testContainerId"); - open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - CLOSE_EXISTING)); - - transport.sendPerformative(open); - PerformativeResponse response = (PerformativeResponse) transport.getNextResponse(); - - assertThat(response, is(notNullValue())); - assertThat(response.getBody(), is(instanceOf(Open.class))); - Open responseOpen = (Open) response.getBody(); + Open responseOpen = transport.newInteraction() + .negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap( + SOLE_CONNECTION_ENFORCEMENT_POLICY, + CLOSE_EXISTING)) + .open().consumeResponse() + .getLatestResponse(Open.class); + assertThat(Arrays.asList(responseOpen.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER)); if (responseOpen.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY)) { @@ -94,42 +90,30 @@ public class CloseExistingPolicy extends ProtocolTestBase { try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { - transport1.doProtocolNegotiation(); - Open open = new Open(); - open.setContainerId("testContainerId"); - open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - CLOSE_EXISTING)); - - transport1.sendPerformative(open); - PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse(); - - assertThat(response, is(notNullValue())); - assertThat(response.getBody(), is(instanceOf(Open.class))); + final Interaction interaction1 = transport1.newInteraction(); + interaction1.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, + CLOSE_EXISTING)) + .open().consumeResponse(Open.class); try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { - transport2.doProtocolNegotiation(); - Open open2 = new Open(); - open2.setContainerId("testContainerId"); - open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - CLOSE_EXISTING)); - - transport2.sendPerformative(open2); - - final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse(); - assertThat(closeResponse1, is(notNullValue())); - assertThat(closeResponse1.getBody(), is(instanceOf(Close.class))); - Close close1 = (Close) closeResponse1.getBody(); + final Interaction interaction2 = transport2.newInteraction(); + interaction2.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, + CLOSE_EXISTING)) + .open(); + + final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class); assertThat(close1.getError(), is(notNullValue())); assertThat(close1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED))); assertThat(close1.getError().getInfo(), is(equalTo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true)))); - PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse(); - assertThat(response2, is(notNullValue())); - assertThat(response2.getBody(), is(instanceOf(Open.class))); - Open responseOpen2 = (Open) response2.getBody(); + final Open responseOpen2 = interaction2.consumeResponse().getLatestResponse(Open.class); assertThat(Arrays.asList(responseOpen2.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER)); if (responseOpen2.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY)) { @@ -147,40 +131,28 @@ public class CloseExistingPolicy extends ProtocolTestBase { try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { - transport1.doProtocolNegotiation(); - Open open = new Open(); - open.setContainerId("testContainerId"); + final Interaction interaction1 = transport1.newInteraction(); // Omit setting the desired capability to test weak detection - - transport1.sendPerformative(open); - PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse(); - - assertThat(response, is(notNullValue())); - assertThat(response.getBody(), is(instanceOf(Open.class))); + interaction1.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .open().consumeResponse(Open.class); try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { - transport2.doProtocolNegotiation(); - Open open2 = new Open(); - open2.setContainerId("testContainerId"); - open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - CLOSE_EXISTING)); - - transport2.sendPerformative(open2); - - final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse(); - assertThat(closeResponse1, is(notNullValue())); - assertThat(closeResponse1.getBody(), is(instanceOf(Close.class))); - Close close1 = (Close) closeResponse1.getBody(); + final Interaction interaction2 = transport2.newInteraction(); + interaction2.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, + CLOSE_EXISTING)) + .open(); + + final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class); assertThat(close1.getError(), is(notNullValue())); assertThat(close1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED))); assertThat(close1.getError().getInfo(), is(equalTo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true)))); - PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse(); - assertThat(response2, is(notNullValue())); - assertThat(response2.getBody(), is(instanceOf(Open.class))); - Open responseOpen2 = (Open) response2.getBody(); + final Open responseOpen2 = interaction2.consumeResponse().getLatestResponse(Open.class); assertThat(Arrays.asList(responseOpen2.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER)); if (responseOpen2.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY)) { @@ -197,19 +169,15 @@ public class CloseExistingPolicy extends ProtocolTestBase { try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { - transport1.doProtocolNegotiation(); - Open open = new Open(); - open.setContainerId("testContainerId"); - open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - CLOSE_EXISTING)); - - transport1.sendPerformative(open); - PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse(); - - assertThat(response, is(notNullValue())); - assertThat(response.getBody(), is(instanceOf(Open.class))); - Open responseOpen = (Open) response.getBody(); + final Interaction interaction1 = transport1.newInteraction(); + Open responseOpen = interaction1.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap( + SOLE_CONNECTION_ENFORCEMENT_POLICY, + CLOSE_EXISTING)) + .open().consumeResponse() + .getLatestResponse(Open.class); assertThat(Arrays.asList(responseOpen.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER)); if (responseOpen.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY)) { @@ -219,25 +187,18 @@ public class CloseExistingPolicy extends ProtocolTestBase try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { - transport2.doProtocolNegotiation(); - Open open2 = new Open(); - open2.setContainerId("testContainerId"); + final Interaction interaction2 = transport2.newInteraction(); // Omit setting the desired capability to test strong detection + interaction2.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .open().sync(); - transport2.sendPerformative(open2); - - final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse(); - assertThat(closeResponse1, is(notNullValue())); - assertThat(closeResponse1.getBody(), is(instanceOf(Close.class))); - Close close1 = (Close) closeResponse1.getBody(); + final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class); assertThat(close1.getError(), is(notNullValue())); assertThat(close1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED))); assertThat(close1.getError().getInfo(), is(equalTo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true)))); - PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse(); - assertThat(response2, is(notNullValue())); - assertThat(response2.getBody(), is(instanceOf(Open.class))); - Open responseOpen2 = (Open) response2.getBody(); + final Open responseOpen2 = interaction2.consumeResponse().getLatestResponse(Open.class); assertThat(Arrays.asList(responseOpen2.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER)); if (responseOpen2.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY)) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java index c579bf8..d11e2fc 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java @@ -20,14 +20,10 @@ package org.apache.qpid.tests.protocol.v1_0.extensions.soleconn; -import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER; import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY; +import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER; import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy.CLOSE_EXISTING; import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy.REFUSE_CONNECTION; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; import java.net.InetSocketAddress; import java.util.Collections; @@ -35,12 +31,11 @@ import java.util.Collections; import org.junit.Before; import org.junit.Test; -import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.transport.Close; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; -import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse; +import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase; public class MixedPolicy extends ProtocolTestBase @@ -58,55 +53,39 @@ public class MixedPolicy extends ProtocolTestBase { try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { - transport1.doProtocolNegotiation(); - Open open = new Open(); - open.setContainerId("testContainerId"); - open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - CLOSE_EXISTING)); - - transport1.sendPerformative(open); - PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse(); - - assertThat(response, is(notNullValue())); - assertThat(response.getBody(), is(instanceOf(Open.class))); + final Interaction interaction1 = transport1.newInteraction(); + interaction1.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, + CLOSE_EXISTING)) + .open().consumeResponse(Open.class); try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { - transport2.doProtocolNegotiation(); - Open open2 = new Open(); - open2.setContainerId("testContainerId"); - open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - REFUSE_CONNECTION)); - - transport2.sendPerformative(open2); + final Interaction interaction2 = transport2.newInteraction(); + interaction2.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, + REFUSE_CONNECTION)) + .open().sync(); - final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse(); - assertThat(closeResponse1, is(notNullValue())); - assertThat(closeResponse1.getBody(), is(instanceOf(Close.class))); + interaction1.consumeResponse(Close.class); - PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse(); - assertThat(response2, is(notNullValue())); - assertThat(response2.getBody(), is(instanceOf(Open.class))); + interaction2.consumeResponse(Open.class); try (FrameTransport transport3 = new FrameTransport(_brokerAddress).connect()) { - transport3.doProtocolNegotiation(); - Open open3 = new Open(); - open3.setContainerId("testContainerId"); - open3.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open3.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - CLOSE_EXISTING)); - - transport3.sendPerformative(open3); - - PerformativeResponse closeResponse3 = (PerformativeResponse) transport3.getNextResponse(); - assertThat(closeResponse3, is(notNullValue())); - assertThat(closeResponse3.getBody(), is(instanceOf(Open.class))); - PerformativeResponse closeResponse3b = (PerformativeResponse) transport3.getNextResponse(); - assertThat(closeResponse3b, is(notNullValue())); - assertThat(closeResponse3b.getBody(), is(instanceOf(Close.class))); + final Interaction interaction3 = transport3.newInteraction(); + interaction3.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, + CLOSE_EXISTING)) + .open() + .consumeResponse(Open.class) + .consumeResponse(Close.class); } } } @@ -117,36 +96,25 @@ public class MixedPolicy extends ProtocolTestBase { try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect()) { - transport1.doProtocolNegotiation(); - Open open = new Open(); - open.setContainerId("testContainerId"); - open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - REFUSE_CONNECTION)); - - transport1.sendPerformative(open); - PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse(); - - assertThat(response, is(notNullValue())); - assertThat(response.getBody(), is(instanceOf(Open.class))); + final Interaction interaction1 = transport1.newInteraction(); + interaction1.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, + REFUSE_CONNECTION)) + .open().consumeResponse(Open.class); try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect()) { - transport2.doProtocolNegotiation(); - Open open2 = new Open(); - open2.setContainerId("testContainerId"); - open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER}); - open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, - CLOSE_EXISTING)); - - transport2.sendPerformative(open2); - - final PerformativeResponse openResponse2 = (PerformativeResponse) transport2.getNextResponse(); - assertThat(openResponse2, is(notNullValue())); - assertThat(openResponse2.getBody(), is(instanceOf(Open.class))); - final PerformativeResponse closeResponse2 = (PerformativeResponse) transport2.getNextResponse(); - assertThat(closeResponse2, is(notNullValue())); - assertThat(closeResponse2.getBody(), is(instanceOf(Close.class))); + final Interaction interaction2 = transport2.newInteraction(); + interaction2.negotiateProtocol().consumeResponse() + .openContainerId("testContainerId") + .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER) + .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY, + CLOSE_EXISTING)) + .open() + .consumeResponse(Open.class) + .consumeResponse(Close.class); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org