Repository: qpid-broker-j Updated Branches: refs/heads/master 0598f7ebc -> cdb696a9b
QPID-7842: Fix defect and improve TransferTest#receiveTransferReceiverIndicatesNonTerminalDeliveryState Also, corrected Netty buffer management in protocol test suite to ensure that frames that arrived over more than one read were reconstituted correctly, and also ensured that tests that negotiate the framesize were supported correctly. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/be9075be Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/be9075be Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/be9075be Branch: refs/heads/master Commit: be9075be5c4fc93027a0b6e37aea8a5ad7c2d176 Parents: 0598f7e Author: Keith Wall <[email protected]> Authored: Thu Jun 29 17:18:17 2017 +0100 Committer: Keith Wall <[email protected]> Committed: Thu Jun 29 17:18:34 2017 +0100 ---------------------------------------------------------------------- .../qpid/tests/protocol/v1_0/InputHandler.java | 19 ++++- .../qpid/tests/protocol/v1_0/Interaction.java | 6 ++ .../protocol/v1_0/messaging/TransferTest.java | 79 +++++++++++++------- 3 files changed, 75 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/be9075be/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java index a8663a4..e3acd24 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java @@ -69,7 +69,7 @@ public class InputHandler extends ChannelInboundHandlerAdapter { HEADER, PERFORMATIVES - }; + } private final MyConnectionHandler _connectionHandler; private final ValueHandler _valueHandler; @@ -92,9 +92,11 @@ public class InputHandler extends ChannelInboundHandlerAdapter @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { - // TODO does Netty take care of saving the remaining bytes??? ByteBuf buf = (ByteBuf) msg; - QpidByteBuffer qpidBuf = QpidByteBuffer.wrap(buf.nioBuffer()); + QpidByteBuffer qpidBuf = QpidByteBuffer.allocate(buf.readableBytes()); + qpidBuf.put(buf.nioBuffer()); + qpidBuf.flip(); + LOGGER.debug("Incoming {} byte(s)", qpidBuf.remaining()); if (_inputBuffer.hasRemaining()) { @@ -114,9 +116,12 @@ public class InputHandler extends ChannelInboundHandlerAdapter doParsing(); + LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining()); + if (_inputBuffer.hasRemaining()) { _inputBuffer.compact(); + _inputBuffer.flip(); } ReferenceCountUtil.release(msg); @@ -152,6 +157,8 @@ public class InputHandler extends ChannelInboundHandlerAdapter private class MyConnectionHandler implements ConnectionHandler { + private volatile int _frameSize = 512; + @Override public void receiveOpen(final int channel, final Open close) { @@ -208,7 +215,7 @@ public class InputHandler extends ChannelInboundHandlerAdapter @Override public int getMaxFrameSize() { - return 512; + return _frameSize; } @Override @@ -240,6 +247,10 @@ public class InputHandler extends ChannelInboundHandlerAdapter if (val instanceof FrameBody) { FrameBody frameBody = (FrameBody) val; + if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null) + { + _frameSize = ((Open) frameBody).getMaxFrameSize().intValue(); + } response = new PerformativeResponse((short) channel, frameBody); } else if (val instanceof SaslFrameBody) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/be9075be/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java index 9da8a8e..008b016 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java @@ -234,6 +234,12 @@ public class Interaction return this; } + public Interaction openMaxFrameSize(UnsignedInteger maxFrameSize) + { + _open.setMaxFrameSize(maxFrameSize); + return this; + } + public Interaction openDesiredCapabilities(final Symbol... desiredCapabilities) { _open.setDesiredCapabilities(desiredCapabilities); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/be9075be/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index 0d375e4..c63e3c3 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -27,6 +27,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.fail; import static org.junit.Assume.assumeThat; @@ -34,6 +35,8 @@ import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.hamcrest.CoreMatchers; import org.hamcrest.core.Is; @@ -42,10 +45,12 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; import org.apache.qpid.server.protocol.v1_0.type.Outcome; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; import org.apache.qpid.server.protocol.v1_0.type.messaging.Received; @@ -571,37 +576,62 @@ public class TransferTest extends ProtocolTestBase @Test @SpecificationTest(section = "2.6.12", description = "[...] the receiving application MAY wish to indicate" + " non-terminal delivery states to the sender") - public void receiveTransferReceiverIndicateNonTerminalDeliveryState() throws Exception + public void receiveTransferReceiverIndicatesNonTerminalDeliveryState() throws Exception { - getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA); try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - final Interaction interaction = transport.newInteraction() - .negotiateProtocol().consumeResponse() - .open().consumeResponse() - .begin().consumeResponse() - .attachRole(Role.RECEIVER) - .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) - .attachRcvSettleMode(ReceiverSettleMode.SECOND) - .attach().consumeResponse() - .flowIncomingWindow(UnsignedInteger.ONE) - .flowNextIncomingId(UnsignedInteger.ZERO) - .flowOutgoingWindow(UnsignedInteger.ZERO) - .flowNextOutgoingId(UnsignedInteger.ZERO) - .flowLinkCredit(UnsignedInteger.ONE) - .flowHandleFromLinkHandle() - .flow() - .receiveDelivery() - .decodeLatestDelivery(); + final Interaction interaction = transport.newInteraction(); - Object data = interaction.getDecodedLatestDelivery(); - assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA))); + Open open = interaction.negotiateProtocol().consumeResponse() + .openMaxFrameSize(UnsignedInteger.valueOf(4096)) + .open().consumeResponse() + .getLatestResponse(Open.class); + + int negotiatedFrameSize = open.getMaxFrameSize().intValue(); + String testMessageData = Stream.generate(() -> "*").limit(negotiatedFrameSize).collect(Collectors.joining()); + + getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageData); + + interaction.begin().consumeResponse() + .attachRole(Role.RECEIVER) + .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attachRcvSettleMode(ReceiverSettleMode.SECOND) + .attach().consumeResponse() + .flowIncomingWindow(UnsignedInteger.ONE) + .flowNextIncomingId(UnsignedInteger.ZERO) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowLinkCredit(UnsignedInteger.ONE) + .flowHandleFromLinkHandle() + .flow() + .sync(); + + MessageDecoder messageDecoder = new MessageDecoder(); + + Transfer first = interaction.consumeResponse(Transfer.class) + .getLatestResponse(Transfer.class); + assertThat(first.getMore(), is(equalTo(true))); + messageDecoder.addTransfer(first); + + final long firstRemaining = QpidByteBufferUtils.remaining(first.getPayload()); + + Received state = new Received(); + state.setSectionNumber(UnsignedInteger.ZERO); + state.setSectionOffset(UnsignedLong.valueOf(firstRemaining + 1)); interaction.dispositionSettled(false) .dispositionRole(Role.RECEIVER) - .dispositionState(new Received()) - .disposition(); + .dispositionState(state) + .disposition() + .sync(); + + Transfer second = interaction.consumeResponse(Transfer.class) + .getLatestResponse(Transfer.class); + assertThat(second.getMore(), isOneOf(false, null)); + messageDecoder.addTransfer(second); + + assertThat(messageDecoder.getData(), is(equalTo(testMessageData))); Disposition disposition = interaction.dispositionSettled(false) .dispositionRole(Role.RECEIVER) @@ -614,10 +644,9 @@ public class TransferTest extends ProtocolTestBase } } - @Test @SpecificationTest(section = "2.7.3", description = "The sender SHOULD respect the receiverâs desired settlement mode if" - + "the receiver initiates the attach exchange and the sender supports the desired mode.") + + " the receiver initiates the attach exchange and the sender supports the desired mode.") public void receiveTransferSenderSettleModeSettled() throws Exception { getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
