Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 0598f7ebc -> cdb696a9b


QPID-7842: Fix defect and improve 
TransferTest#receiveTransferReceiverIndicatesNonTerminalDeliveryState

Also, corrected Netty buffer management in protocol test suite to ensure that 
frames that arrived over more than one read were reconstituted correctly, and 
also ensured that
tests that negotiate the framesize were supported correctly.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/be9075be
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/be9075be
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/be9075be

Branch: refs/heads/master
Commit: be9075be5c4fc93027a0b6e37aea8a5ad7c2d176
Parents: 0598f7e
Author: Keith Wall <[email protected]>
Authored: Thu Jun 29 17:18:17 2017 +0100
Committer: Keith Wall <[email protected]>
Committed: Thu Jun 29 17:18:34 2017 +0100

----------------------------------------------------------------------
 .../qpid/tests/protocol/v1_0/InputHandler.java  | 19 ++++-
 .../qpid/tests/protocol/v1_0/Interaction.java   |  6 ++
 .../protocol/v1_0/messaging/TransferTest.java   | 79 +++++++++++++-------
 3 files changed, 75 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/be9075be/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
index a8663a4..e3acd24 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
@@ -69,7 +69,7 @@ public class InputHandler extends ChannelInboundHandlerAdapter
     {
         HEADER,
         PERFORMATIVES
-    };
+    }
 
     private final MyConnectionHandler _connectionHandler;
     private final ValueHandler _valueHandler;
@@ -92,9 +92,11 @@ public class InputHandler extends 
ChannelInboundHandlerAdapter
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception
     {
-        // TODO does Netty take care of saving the remaining bytes???
         ByteBuf buf = (ByteBuf) msg;
-        QpidByteBuffer qpidBuf = QpidByteBuffer.wrap(buf.nioBuffer());
+        QpidByteBuffer qpidBuf = QpidByteBuffer.allocate(buf.readableBytes());
+        qpidBuf.put(buf.nioBuffer());
+        qpidBuf.flip();
+        LOGGER.debug("Incoming {} byte(s)", qpidBuf.remaining());
 
         if (_inputBuffer.hasRemaining())
         {
@@ -114,9 +116,12 @@ public class InputHandler extends 
ChannelInboundHandlerAdapter
 
         doParsing();
 
+        LOGGER.debug("After parsing, {} byte(s) remained", 
_inputBuffer.remaining());
+
         if (_inputBuffer.hasRemaining())
         {
             _inputBuffer.compact();
+            _inputBuffer.flip();
         }
 
         ReferenceCountUtil.release(msg);
@@ -152,6 +157,8 @@ public class InputHandler extends 
ChannelInboundHandlerAdapter
 
     private class MyConnectionHandler implements ConnectionHandler
     {
+        private volatile int _frameSize = 512;
+
         @Override
         public void receiveOpen(final int channel, final Open close)
         {
@@ -208,7 +215,7 @@ public class InputHandler extends 
ChannelInboundHandlerAdapter
         @Override
         public int getMaxFrameSize()
         {
-            return 512;
+            return _frameSize;
         }
 
         @Override
@@ -240,6 +247,10 @@ public class InputHandler extends 
ChannelInboundHandlerAdapter
                 if (val instanceof FrameBody)
                 {
                     FrameBody frameBody = (FrameBody) val;
+                    if (frameBody instanceof Open && ((Open) 
frameBody).getMaxFrameSize() != null)
+                    {
+                        _frameSize = ((Open) 
frameBody).getMaxFrameSize().intValue();
+                    }
                     response = new PerformativeResponse((short) channel, 
frameBody);
                 }
                 else if (val instanceof SaslFrameBody)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/be9075be/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 9da8a8e..008b016 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -234,6 +234,12 @@ public class Interaction
         return this;
     }
 
+    public Interaction openMaxFrameSize(UnsignedInteger maxFrameSize)
+    {
+        _open.setMaxFrameSize(maxFrameSize);
+        return this;
+    }
+
     public Interaction openDesiredCapabilities(final Symbol... 
desiredCapabilities)
     {
         _open.setDesiredCapabilities(desiredCapabilities);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/be9075be/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 0d375e4..c63e3c3 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -27,6 +27,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
@@ -34,6 +35,8 @@ import static org.junit.Assume.assumeThat;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.core.Is;
@@ -42,10 +45,12 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Received;
@@ -571,37 +576,62 @@ public class TransferTest extends ProtocolTestBase
     @Test
     @SpecificationTest(section = "2.6.12", description = "[...] the receiving 
application MAY wish to indicate"
                                                          + " non-terminal 
delivery states to the sender")
-    public void receiveTransferReceiverIndicateNonTerminalDeliveryState() 
throws Exception
+    public void receiveTransferReceiverIndicatesNonTerminalDeliveryState() 
throws Exception
     {
-        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, 
TEST_MESSAGE_DATA);
 
         try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
         {
-            final Interaction interaction = transport.newInteraction()
-                                                     
.negotiateProtocol().consumeResponse()
-                                                     .open().consumeResponse()
-                                                     .begin().consumeResponse()
-                                                     .attachRole(Role.RECEIVER)
-                                                     
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
-                                                     
.attachRcvSettleMode(ReceiverSettleMode.SECOND)
-                                                     
.attach().consumeResponse()
-                                                     
.flowIncomingWindow(UnsignedInteger.ONE)
-                                                     
.flowNextIncomingId(UnsignedInteger.ZERO)
-                                                     
.flowOutgoingWindow(UnsignedInteger.ZERO)
-                                                     
.flowNextOutgoingId(UnsignedInteger.ZERO)
-                                                     
.flowLinkCredit(UnsignedInteger.ONE)
-                                                     
.flowHandleFromLinkHandle()
-                                                     .flow()
-                                                     .receiveDelivery()
-                                                     .decodeLatestDelivery();
+            final Interaction interaction = transport.newInteraction();
 
-            Object data = interaction.getDecodedLatestDelivery();
-            assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+            Open open = interaction.negotiateProtocol().consumeResponse()
+                                   
.openMaxFrameSize(UnsignedInteger.valueOf(4096))
+                                   .open().consumeResponse()
+                                   .getLatestResponse(Open.class);
+
+            int negotiatedFrameSize = open.getMaxFrameSize().intValue();
+            String testMessageData = Stream.generate(() -> 
"*").limit(negotiatedFrameSize).collect(Collectors.joining());
+
+            getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, 
testMessageData);
+
+            interaction.begin().consumeResponse()
+                       .attachRole(Role.RECEIVER)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attach().consumeResponse()
+                       .flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .sync();
+
+            MessageDecoder messageDecoder = new MessageDecoder();
+
+            Transfer first = interaction.consumeResponse(Transfer.class)
+                                        .getLatestResponse(Transfer.class);
+            assertThat(first.getMore(), is(equalTo(true)));
+            messageDecoder.addTransfer(first);
+
+            final long firstRemaining = 
QpidByteBufferUtils.remaining(first.getPayload());
+
+            Received state = new Received();
+            state.setSectionNumber(UnsignedInteger.ZERO);
+            state.setSectionOffset(UnsignedLong.valueOf(firstRemaining + 1));
 
             interaction.dispositionSettled(false)
                        .dispositionRole(Role.RECEIVER)
-                       .dispositionState(new Received())
-                       .disposition();
+                       .dispositionState(state)
+                       .disposition()
+                       .sync();
+
+            Transfer second = interaction.consumeResponse(Transfer.class)
+                                         .getLatestResponse(Transfer.class);
+            assertThat(second.getMore(), isOneOf(false, null));
+            messageDecoder.addTransfer(second);
+
+            assertThat(messageDecoder.getData(), is(equalTo(testMessageData)));
 
             Disposition disposition = interaction.dispositionSettled(false)
                                                  
.dispositionRole(Role.RECEIVER)
@@ -614,10 +644,9 @@ public class TransferTest extends ProtocolTestBase
         }
     }
 
-
     @Test
     @SpecificationTest(section = "2.7.3", description = "The sender SHOULD 
respect the receiver’s desired settlement mode if"
-                                                        + "the receiver 
initiates the attach exchange and the sender supports the desired mode.")
+                                                        + " the receiver 
initiates the attach exchange and the sender supports the desired mode.")
     public void receiveTransferSenderSettleModeSettled() throws Exception
     {
         getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, 
TEST_MESSAGE_DATA);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to