http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
index d98c4f1..3653c05 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
@@ -29,23 +29,19 @@ import java.lang.reflect.Method;
 import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
+import java.nio.InvalidMarkException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
 
 import com.google.common.io.ByteStreams;
 import org.junit.Assert;
 import org.mockito.internal.util.Primitives;
 
 import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.server.util.ByteBufferUtils;
 
 public class QpidByteBufferTest extends QpidTestCase
 {
+    private static final int BUFFER_FRAGMENT_SIZE = 5;
     private static final int BUFFER_SIZE = 10;
     private static final int POOL_SIZE = 20;
     private static final double SPARSITY_FRACTION = 0.5;
@@ -59,20 +55,29 @@ public class QpidByteBufferTest extends QpidTestCase
     {
         super.setUp();
         QpidByteBuffer.deinitialisePool();
-        QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE, 
SPARSITY_FRACTION);
+        QpidByteBuffer.initialisePool(BUFFER_FRAGMENT_SIZE, POOL_SIZE, 
SPARSITY_FRACTION);
         _parent = QpidByteBuffer.allocateDirect(BUFFER_SIZE);
     }
 
     @Override
     public void tearDown() throws Exception
     {
-        super.tearDown();
-        _parent.dispose();
-        if (_slicedBuffer != null)
+        try
         {
-            _slicedBuffer.dispose();
+            super.tearDown();
+        }
+        finally
+        {
+            if (_parent != null)
+            {
+                _parent.dispose();
+            }
+            if (_slicedBuffer != null)
+            {
+                _slicedBuffer.dispose();
+            }
+            QpidByteBuffer.deinitialisePool();
         }
-        QpidByteBuffer.deinitialisePool();
     }
 
     public void testPutGetByIndex() throws Exception
@@ -113,6 +118,25 @@ public class QpidByteBufferTest extends QpidTestCase
         assertEquals("Unexpected position after reset", 0, 
_slicedBuffer.position());
     }
 
+    public void testMarkResetAcrossFragmentBoundary() throws Exception
+    {
+        for (int i = 0; i < BUFFER_SIZE; ++i)
+        {
+            _parent.put((byte) i);
+        }
+        _parent.flip();
+        _parent.mark();
+        for (int i = 0; i < BUFFER_FRAGMENT_SIZE + 2; ++i)
+        {
+            assertEquals("Unexpected value", i, _parent.get());
+        }
+        _parent.reset();
+        for (int i = 0; i < BUFFER_SIZE; ++i)
+        {
+            assertEquals("Unexpected value", i, _parent.get());
+        }
+    }
+
     public void testPosition() throws Exception
     {
         _slicedBuffer = createSlice();
@@ -135,155 +159,102 @@ public class QpidByteBufferTest extends QpidTestCase
         }
     }
 
-    public void testBulkPutGet() throws Exception
+    public void testSettingPositionBackwardsResetsMark()
     {
-        _slicedBuffer = createSlice();
-
-        final byte[] source = getTestBytes(_slicedBuffer.remaining());
-
-        QpidByteBuffer rv = _slicedBuffer.put(source, 0, source.length);
-        assertEquals("Unexpected builder return value", _slicedBuffer, rv);
-
-        _slicedBuffer.flip();
-        byte[] target = new byte[_slicedBuffer.remaining()];
-        rv = _slicedBuffer.get(target, 0, target.length);
-        assertEquals("Unexpected builder return value", _slicedBuffer, rv);
-
-        Assert.assertArrayEquals("Unexpected bulk put/get result", source, 
target);
-
-
-        _slicedBuffer.clear();
-        _slicedBuffer.position(1);
-
+        _parent.position(8);
+        _parent.mark();
+        _parent.position(7);
         try
         {
-            _slicedBuffer.put(source, 0, source.length);
-            fail("Exception not thrown");
+            _parent.reset();
+            fail("Expected exception not thrown");
         }
-        catch (BufferOverflowException e)
+        catch (InvalidMarkException e)
         {
             // pass
         }
+    }
 
-        assertEquals("Position should be unchanged after failed put", 1, 
_slicedBuffer.position());
+    public void testSettingPositionForwardDoeNotResetMark()
+    {
+        final int originalPosition = 3;
+        _parent.position(originalPosition);
+        _parent.mark();
+        _parent.position(9);
+
+        _parent.reset();
+
+        assertEquals("Unexpected position", originalPosition, 
_parent.position());
+    }
+
+    public void testRewind() throws Exception
+    {
+        final int expectedLimit = 7;
+        _parent.position(1);
+        _parent.limit(expectedLimit);
+        _parent.mark();
+        _parent.position(3);
+
+        _parent.rewind();
 
+        assertEquals("Unexpected position", 0, _parent.position());
+        assertEquals("Unexpected limit", expectedLimit, _parent.limit());
         try
         {
-            _slicedBuffer.get(target, 0, target.length);
-            fail("Exception not thrown");
+            _parent.reset();
+            fail("Expected exception not thrown");
         }
-        catch (BufferUnderflowException e)
+        catch (InvalidMarkException e)
         {
             // pass
         }
-
-        assertEquals("Position should be unchanged after failed get", 1, 
_slicedBuffer.position());
-
-
     }
 
-    public void testByteBufferPutGet()
+    public void testBulkPutGet() throws Exception
     {
         _slicedBuffer = createSlice();
-        final byte[] source = getTestBytes(_slicedBuffer.remaining());
 
-        ByteBuffer sourceByteBuffer = ByteBuffer.wrap(source);
+        final byte[] source = getTestBytes(_slicedBuffer.remaining());
 
-        QpidByteBuffer rv = _slicedBuffer.put(sourceByteBuffer);
+        QpidByteBuffer rv = _slicedBuffer.put(source, 0, source.length);
         assertEquals("Unexpected builder return value", _slicedBuffer, rv);
 
-        assertEquals("Unexpected position", _slicedBuffer.capacity(), 
_slicedBuffer.position());
-        assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining());
-
-        assertEquals("Unexpected remaining in source ByteBuffer", 0, 
sourceByteBuffer.remaining());
-
         _slicedBuffer.flip();
+        byte[] target = new byte[_slicedBuffer.remaining()];
+        rv = _slicedBuffer.get(target, 0, target.length);
+        assertEquals("Unexpected builder return value", _slicedBuffer, rv);
 
-        ByteBuffer destinationByteBuffer =  ByteBuffer.allocate(source.length);
-        _slicedBuffer.get(destinationByteBuffer);
-
-
-        assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining());
-
-        assertEquals("Unexpected remaining in destination ByteBuffer", 0, 
destinationByteBuffer.remaining());
-        assertEquals("Unexpected position in destination ByteBuffer", 
source.length, destinationByteBuffer.position());
+        Assert.assertArrayEquals("Unexpected bulk put/get result", source, 
target);
 
-        Assert.assertArrayEquals("Unexpected ByteBuffer put/get result", 
source, destinationByteBuffer.array());
 
         _slicedBuffer.clear();
         _slicedBuffer.position(1);
 
-        sourceByteBuffer.clear();
         try
         {
-            _slicedBuffer.put(sourceByteBuffer);
-            fail("Exception should be thrown");
+            _slicedBuffer.put(source, 0, source.length);
+            fail("Exception not thrown");
         }
-        catch(BufferOverflowException e)
+        catch (BufferOverflowException e)
         {
             // pass
         }
 
-        assertEquals("Position should not be changed after failed put", 1, 
_slicedBuffer.position());
-        assertEquals("Source position should not changed after failed put", 
source.length, sourceByteBuffer.remaining());
-
-        _slicedBuffer.clear();
-        destinationByteBuffer.position(1);
+        assertEquals("Position should be unchanged after failed put", 1, 
_slicedBuffer.position());
 
         try
         {
-            _slicedBuffer.get(destinationByteBuffer);
-            fail("Exception should be thrown");
+            _slicedBuffer.get(target, 0, target.length);
+            fail("Exception not thrown");
         }
-        catch(BufferUnderflowException e )
+        catch (BufferUnderflowException e)
         {
             // pass
         }
-    }
-
-    public void testQpidByteBufferPutGet()
-    {
-        _slicedBuffer = createSlice();
-        final byte[] source = getTestBytes(_slicedBuffer.remaining());
-
-        QpidByteBuffer sourceQpidByteBuffer = QpidByteBuffer.wrap(source);
 
-        QpidByteBuffer rv = _slicedBuffer.put(sourceQpidByteBuffer);
-        assertEquals("Unexpected builder return value", _slicedBuffer, rv);
-
-        assertEquals("Unexpected position", _slicedBuffer.capacity(), 
_slicedBuffer.position());
-        assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining());
-
-        assertEquals("Unexpected remaining in source QpidByteBuffer", 0, 
sourceQpidByteBuffer.remaining());
-
-        _slicedBuffer.flip();
-
-        ByteBuffer destinationByteBuffer =  ByteBuffer.allocate(source.length);
-        _slicedBuffer.get(destinationByteBuffer);
-
-        assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining());
-
-        assertEquals("Unexpected remaining in destination ByteBuffer", 0, 
destinationByteBuffer.remaining());
-        assertEquals("Unexpected position in destination ByteBuffer", 
source.length, destinationByteBuffer.position());
-
-        Assert.assertArrayEquals("Unexpected ByteBuffer put/get result", 
source, destinationByteBuffer.array());
-
-        _slicedBuffer.clear();
-        _slicedBuffer.position(1);
+        assertEquals("Position should be unchanged after failed get", 1, 
_slicedBuffer.position());
 
-        sourceQpidByteBuffer.clear();
-        try
-        {
-            _slicedBuffer.put(sourceQpidByteBuffer);
-            fail("Exception should be thrown");
-        }
-        catch(BufferOverflowException e)
-        {
-            // pass
-        }
 
-        assertEquals("Position should not be changed after failed put", 1, 
_slicedBuffer.position());
-        assertEquals("Source position should not changed after failed put", 
source.length, sourceQpidByteBuffer.remaining());
     }
 
     public void testDuplicate()
@@ -293,8 +264,7 @@ public class QpidByteBufferTest extends QpidTestCase
         int originalLimit = _slicedBuffer.limit();
         _slicedBuffer.limit(originalLimit - 1);
 
-        QpidByteBuffer duplicate = _slicedBuffer.duplicate();
-        try
+        try (QpidByteBuffer duplicate = _slicedBuffer.duplicate())
         {
             assertEquals("Unexpected position", _slicedBuffer.position(), 
duplicate.position() );
             assertEquals("Unexpected limit", _slicedBuffer.limit(), 
duplicate.limit() );
@@ -306,10 +276,6 @@ public class QpidByteBufferTest extends QpidTestCase
             assertEquals("Unexpected position in the original", 1, 
_slicedBuffer.position());
             assertEquals("Unexpected limit in the original", originalLimit -1, 
_slicedBuffer.limit());
         }
-        finally
-        {
-            duplicate.dispose();
-        }
     }
 
     public void testCopyToByteBuffer()
@@ -400,8 +366,8 @@ public class QpidByteBufferTest extends QpidTestCase
         _slicedBuffer.limit(_slicedBuffer.limit() - 1);
 
         int remaining = _slicedBuffer.remaining();
-        QpidByteBuffer newSlice = _slicedBuffer.slice();
-        try
+
+        try (QpidByteBuffer newSlice = _slicedBuffer.slice())
         {
             assertEquals("Unexpected position in original", 1, 
_slicedBuffer.position());
             assertEquals("Unexpected limit in original", source.length - 1, 
_slicedBuffer.limit());
@@ -416,10 +382,6 @@ public class QpidByteBufferTest extends QpidTestCase
             System.arraycopy(source, 1, expected, 0, expected.length);
             Assert.assertArrayEquals("Unexpected slice result", expected, 
destination);
         }
-        finally
-        {
-            newSlice.dispose();
-        }
     }
 
     public void testViewOfSlice()
@@ -431,8 +393,7 @@ public class QpidByteBufferTest extends QpidTestCase
         _slicedBuffer.position(1);
         _slicedBuffer.limit(_slicedBuffer.limit() - 1);
 
-        QpidByteBuffer view = _slicedBuffer.view(0, _slicedBuffer.remaining());
-        try
+        try (QpidByteBuffer view = _slicedBuffer.view(0, 
_slicedBuffer.remaining()))
         {
             assertEquals("Unexpected position in original", 1, 
_slicedBuffer.position());
             assertEquals("Unexpected limit in original", source.length - 1, 
_slicedBuffer.limit());
@@ -448,13 +409,8 @@ public class QpidByteBufferTest extends QpidTestCase
             System.arraycopy(source, 1, expected, 0, expected.length);
             Assert.assertArrayEquals("Unexpected view result", expected, 
destination);
         }
-        finally
-        {
-            view.dispose();
-        }
 
-        view = _slicedBuffer.view(1, _slicedBuffer.remaining() - 2);
-        try
+        try (QpidByteBuffer view = _slicedBuffer.view(1, 
_slicedBuffer.remaining() - 2))
         {
             assertEquals("Unexpected position in original", 1, 
_slicedBuffer.position());
             assertEquals("Unexpected limit in original", source.length - 1, 
_slicedBuffer.limit());
@@ -470,10 +426,6 @@ public class QpidByteBufferTest extends QpidTestCase
             System.arraycopy(source, 2, expected, 0, expected.length);
             Assert.assertArrayEquals("Unexpected view result", expected, 
destination);
         }
-        finally
-        {
-            view.dispose();
-        }
     }
 
     public void testAsInputStream() throws Exception
@@ -496,40 +448,6 @@ public class QpidByteBufferTest extends QpidTestCase
         Assert.assertArrayEquals("Unexpected view result", expected, 
destination.toByteArray());
     }
 
-    public void testAsByteBuffer() throws Exception
-    {
-        _slicedBuffer = createSlice();
-
-        _slicedBuffer.position(1);
-        _slicedBuffer.limit(_slicedBuffer.limit() - 1);
-
-        _slicedBuffer.mark();
-        int remaining = _slicedBuffer.remaining();
-        byte[] source = getTestBytes(remaining);
-        _slicedBuffer.put(source);
-        _slicedBuffer.reset();
-
-        ByteBuffer buffer = _slicedBuffer.asByteBuffer();
-        assertEquals("Unexpected remaining", remaining, buffer.remaining());
-
-        byte[] target = new byte[remaining];
-        buffer.get(target);
-        Assert.assertArrayEquals("Unexpected asByteBuffer result", source, 
target);
-    }
-
-    public void testDecode()
-    {
-        _slicedBuffer = createSlice();
-        final String input = "ABC";
-        _slicedBuffer.put(input.getBytes());
-        _slicedBuffer.flip();
-
-        final CharBuffer charBuffer = 
_slicedBuffer.decode(StandardCharsets.US_ASCII);
-        final char[] destination = new char[charBuffer.remaining()];
-        charBuffer.get(destination);
-        Assert.assertArrayEquals("Unexpected char buffer", 
input.toCharArray(), destination);
-    }
-
     private byte[] getTestBytes(final int length)
     {
         final byte[] source = new byte[length];
@@ -598,6 +516,8 @@ public class QpidByteBufferTest extends QpidTestCase
         {
             // pass
         }
+        _slicedBuffer.dispose();
+        _slicedBuffer = null;
     }
 
     private void testPutGetByIndex(final Class<?> primitiveTargetClass, Object 
value) throws Exception
@@ -659,6 +579,8 @@ public class QpidByteBufferTest extends QpidTestCase
         {
             // pass
         }
+        _slicedBuffer.dispose();
+        _slicedBuffer = null;
     }
 
     private void invokeMethod(final Method method, final Object... value)
@@ -734,41 +656,49 @@ public class QpidByteBufferTest extends QpidTestCase
 
     public void testPooledBufferIsZeroedLoan() throws Exception
     {
-        QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE);
-
-        buffer.put((byte) 0xFF);
-        buffer.dispose();
+        try (QpidByteBuffer buffer = 
QpidByteBuffer.allocateDirect(BUFFER_SIZE))
+        {
+            buffer.put((byte) 0xFF);
+        }
 
-        buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE);
-        buffer.limit(1);
-        assertEquals("Pooled QpidByteBuffer is not zeroed.", (byte) 0x0, 
buffer.get());
+        try (QpidByteBuffer buffer = 
QpidByteBuffer.allocateDirect(BUFFER_SIZE))
+        {
+            buffer.limit(1);
+            assertEquals("Pooled QpidByteBuffer is not zeroed.", (byte) 0x0, 
buffer.get());
+        }
     }
 
     public void testAllocateDirectOfSameSize() throws Exception
     {
         int bufferSize = BUFFER_SIZE;
-        QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize);
-        assertEquals("Unexpected buffer size", bufferSize, buffer.capacity());
-        assertEquals("Unexpected position on newly created buffer", 0, 
buffer.position());
-        assertEquals("Unexpected limit on newly created buffer", bufferSize, 
buffer.limit());
+        try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize))
+        {
+            assertEquals("Unexpected buffer size", bufferSize, 
buffer.capacity());
+            assertEquals("Unexpected position on newly created buffer", 0, 
buffer.position());
+            assertEquals("Unexpected limit on newly created buffer", 
bufferSize, buffer.limit());
+        }
     }
 
     public void testAllocateDirectOfSmallerSize() throws Exception
     {
         int bufferSize = BUFFER_SIZE - 1;
-        QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize);
-        assertEquals("Unexpected buffer size", bufferSize, buffer.capacity());
-        assertEquals("Unexpected position on newly created buffer", 0, 
buffer.position());
-        assertEquals("Unexpected limit on newly created buffer", bufferSize, 
buffer.limit());
+        try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize))
+        {
+            assertEquals("Unexpected buffer size", bufferSize, 
buffer.capacity());
+            assertEquals("Unexpected position on newly created buffer", 0, 
buffer.position());
+            assertEquals("Unexpected limit on newly created buffer", 
bufferSize, buffer.limit());
+        }
     }
 
     public void testAllocateDirectOfLargerSize() throws Exception
     {
         int bufferSize = BUFFER_SIZE + 1;
-        QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize);
-        assertEquals("Unexpected buffer size", bufferSize, buffer.capacity());
-        assertEquals("Unexpected position on newly created buffer", 0, 
buffer.position());
-        assertEquals("Unexpected limit on newly created buffer", bufferSize, 
buffer.limit());
+        try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize))
+        {
+            assertEquals("Unexpected buffer size", bufferSize, 
buffer.capacity());
+            assertEquals("Unexpected position on newly created buffer", 0, 
buffer.position());
+            assertEquals("Unexpected limit on newly created buffer", 
bufferSize, buffer.limit());
+        }
     }
 
     public void testAllocateDirectWithNegativeSize() throws Exception
@@ -800,37 +730,34 @@ public class QpidByteBufferTest extends QpidTestCase
     public void testDeflateInflateDirect() throws Exception
     {
         byte[] input = "aaabbbcccddddeeeffff".getBytes();
-        Collection<QpidByteBuffer> inputBufs = 
QpidByteBuffer.allocateDirectCollection(input.length);
-
-        int offset = 0;
-        for (QpidByteBuffer buf : inputBufs)
+        try (QpidByteBuffer inputBuf = 
QpidByteBuffer.allocateDirect(input.length))
         {
-            int len = buf.remaining();
-            buf.put(input, offset, len);
-            buf.flip();
-            offset += len;
-        }
-        assertEquals(input.length, ByteBufferUtils.remaining(inputBufs));
+            inputBuf.put(input);
+            inputBuf.flip();
+
+            assertEquals(input.length, inputBuf.remaining());
 
-        doDeflateInflate(input, inputBufs, true);
+            doDeflateInflate(input, inputBuf, true);
+        }
     }
 
     public void testDeflateInflateHeap() throws Exception
     {
         byte[] input = "aaabbbcccddddeeeffff".getBytes();
-        Collection<QpidByteBuffer> inputBufs = 
Collections.singleton(QpidByteBuffer.wrap(input));
 
-        doDeflateInflate(input, inputBufs, false);
+        try (QpidByteBuffer buffer = QpidByteBuffer.wrap(input))
+        {
+            doDeflateInflate(input, buffer, false);
+        }
     }
 
     public void testInflatingUncompressedBytes_ThrowsZipException() throws 
Exception
     {
         byte[] input = "not_a_compressed_stream".getBytes();
-        QpidByteBuffer original = QpidByteBuffer.wrap(input);
 
-        try
+        try (QpidByteBuffer original = QpidByteBuffer.wrap(input))
         {
-            QpidByteBuffer.inflate(Collections.singleton(original));
+            QpidByteBuffer.inflate(original);
             fail("Exception not thrown");
         }
         catch(java.util.zip.ZipException ze)
@@ -841,109 +768,127 @@ public class QpidByteBufferTest extends QpidTestCase
 
     public void testSlice() throws Exception
     {
-        QpidByteBuffer directBuffer = QpidByteBuffer.allocate(true, 6);
-        directBuffer.position(2);
-        directBuffer.limit(5);
-        QpidByteBuffer directSlice = directBuffer.slice();
-
-        assertTrue("Direct slice should be direct too", 
directSlice.isDirect());
-        assertEquals("Unexpected capacity", 3, directSlice.capacity());
-        assertEquals("Unexpected limit", 3, directSlice.limit());
-        assertEquals("Unexpected position", 0, directSlice.position());
-
-        directBuffer.dispose();
-        directSlice.dispose();
+        try (QpidByteBuffer directBuffer = QpidByteBuffer.allocate(true, 6))
+        {
+            directBuffer.position(2);
+            directBuffer.limit(5);
+            try (QpidByteBuffer directSlice = directBuffer.slice())
+            {
+                assertTrue("Direct slice should be direct too", 
directSlice.isDirect());
+                assertEquals("Unexpected capacity", 3, directSlice.capacity());
+                assertEquals("Unexpected limit", 3, directSlice.limit());
+                assertEquals("Unexpected position", 0, directSlice.position());
+            }
+        }
     }
 
     public void testView() throws Exception
     {
         byte[] content = "ABCDEF".getBytes();
-        QpidByteBuffer buffer = QpidByteBuffer.allocate(true, content.length);
-        buffer.put(content);
-        buffer.position(2);
-        buffer.limit(5);
-
-        QpidByteBuffer view = buffer.view(0, buffer.remaining());
-
-        assertTrue("Unexpected view direct", view.isDirect());
-
-        assertEquals("Unexpected capacity", 3, view.capacity());
-        assertEquals("Unexpected limit", 3, view.limit());
-        assertEquals("Unexpected position", 0, view.position());
-
-        byte[] destination = new byte[view.remaining()];
-        view.get(destination);
-
-        Assert.assertArrayEquals("CDE".getBytes(), destination);
-
-        QpidByteBuffer viewWithOffset = buffer.view(1, 1);
-        destination = new byte[viewWithOffset.remaining()];
-        viewWithOffset.get(destination);
+        try (QpidByteBuffer buffer = QpidByteBuffer.allocate(true, 
content.length))
+        {
+            buffer.put(content);
+            buffer.position(2);
+            buffer.limit(5);
 
-        Assert.assertArrayEquals("D".getBytes(), destination);
+            try (QpidByteBuffer view = buffer.view(0, buffer.remaining()))
+            {
+                assertTrue("Unexpected view direct", view.isDirect());
+                assertEquals("Unexpected capacity", 3, view.capacity());
+                assertEquals("Unexpected limit", 3, view.limit());
+                assertEquals("Unexpected position", 0, view.position());
+
+                final byte[] destination = new byte[view.remaining()];
+                view.get(destination);
+                Assert.assertArrayEquals("CDE".getBytes(), destination);
+            }
 
-        buffer.dispose();
-        view.dispose();
-        viewWithOffset.dispose();
+            try (QpidByteBuffer viewWithOffset = buffer.view(1, 1))
+            {
+                final byte[] destination = new 
byte[viewWithOffset.remaining()];
+                viewWithOffset.get(destination);
+                Assert.assertArrayEquals("D".getBytes(), destination);
+            }
+        }
     }
 
     public void testSparsity()
     {
         assertFalse("Unexpected sparsity after creation", _parent.isSparse());
-        QpidByteBuffer child = _parent.view(0, 6);
+        QpidByteBuffer child = _parent.view(0, 8);
         QpidByteBuffer grandChild = child.view(0, 2);
 
         assertFalse("Unexpected sparsity after child creation", 
_parent.isSparse());
         _parent.dispose();
+        _parent = null;
 
         assertFalse("Unexpected sparsity after parent disposal", 
child.isSparse());
 
         child.dispose();
         assertTrue("Buffer should be sparse", grandChild.isSparse());
+        grandChild.dispose();
     }
 
     public void testAsQpidByteBuffers() throws IOException
     {
         byte[] dataForTwoBufs = 
"01234567890".getBytes(StandardCharsets.US_ASCII);
-        Collection<QpidByteBuffer> qpidByteBuffers = 
QpidByteBuffer.asQpidByteBuffers(new ByteArrayInputStream(dataForTwoBufs));
-        assertEquals("Unexpected number of bufs", 2, qpidByteBuffers.size());
-        Iterator<QpidByteBuffer> itr = qpidByteBuffers.iterator();
-        assertEquals("Unexpected remaining in first buf", 10, 
itr.next().remaining());
-        assertEquals("Unexpected remaining in second buf", 1, 
itr.next().remaining());
+        try (QpidByteBuffer qpidByteBuffer = 
QpidByteBuffer.asQpidByteBuffer(new ByteArrayInputStream(dataForTwoBufs)))
+        {
+            assertEquals("Unexpected remaining in buf", 11, 
qpidByteBuffer.remaining());
+        }
+
+        try (QpidByteBuffer bufsForEmptyBytes = 
QpidByteBuffer.asQpidByteBuffer(new ByteArrayInputStream(new byte[]{})))
+        {
+            assertEquals("Unexpected remaining in buf for empty buffer", 0, 
bufsForEmptyBytes.remaining());
+        }
+    }
+
+    public void testConcatenate() throws Exception
+    {
+        try (QpidByteBuffer buffer1 = QpidByteBuffer.allocateDirect(10);
+             QpidByteBuffer buffer2 = QpidByteBuffer.allocateDirect(10))
+        {
+            final short buffer1Value = (short) (1 << 15);
+            buffer1.putShort(buffer1Value);
+            buffer1.flip();
+            final char buffer2Value = 'x';
+            buffer2.putChar(2, buffer2Value);
+            buffer2.position(4);
+            buffer2.flip();
 
-        Collection<QpidByteBuffer> bufsForEmptyBytes = 
QpidByteBuffer.asQpidByteBuffers(new ByteArrayInputStream(new byte[]{}));
-        assertEquals("Unexpected number of bufs for empty buffer", 0, 
bufsForEmptyBytes.size());
+            try (QpidByteBuffer concatenate = 
QpidByteBuffer.concatenate(buffer1, buffer2))
+            {
+                assertEquals("Unexpected capacity", 6, concatenate.capacity());
+                assertEquals("Unexpected position", 0, concatenate.position());
+                assertEquals("Unexpected limit", 6, concatenate.limit());
+                assertEquals("Unexpected value 1", buffer1Value, 
concatenate.getShort());
+                assertEquals("Unexpected value 2", buffer2Value, 
concatenate.getChar(4));
+            }
+        }
     }
 
     private void doDeflateInflate(byte[] input,
-                                  Collection<QpidByteBuffer> inputBufs,
+                                  QpidByteBuffer inputBuf,
                                   boolean direct) throws IOException
     {
-        Collection<QpidByteBuffer> deflatedBufs = 
QpidByteBuffer.deflate(inputBufs);
-        assertNotNull(deflatedBufs);
+        try (QpidByteBuffer deflatedBuf = QpidByteBuffer.deflate(inputBuf))
+        {
+            assertNotNull(deflatedBuf);
 
-        Collection<QpidByteBuffer> inflatedBufs = 
QpidByteBuffer.inflate(deflatedBufs);
-        assertNotNull(inflatedBufs);
-        assertTrue("Expected at least on buffer", inflatedBufs.size() >= 1);
+            try (QpidByteBuffer inflatedBuf = 
QpidByteBuffer.inflate(deflatedBuf))
+            {
+                assertNotNull(inflatedBuf);
 
-        int bufNum = 1;
-        int inputOffset = 0;
-        int inflatedBytesTotal = 0;
-        for(QpidByteBuffer inflatedBuf : inflatedBufs)
-        {
-            int inflatedBytesCount = inflatedBuf.remaining();
-            inflatedBytesTotal += inflatedBytesCount;
+                int inflatedBytesCount = inflatedBuf.remaining();
 
-            byte[] inflatedBytes = new byte[inflatedBytesCount];
-            inflatedBuf.get(inflatedBytes);
-            byte[] expectedBytes = Arrays.copyOfRange(input, inputOffset, 
inputOffset + inflatedBytes.length);
-            Assert.assertArrayEquals("Inflated buf" + bufNum + " has 
unexpected content", expectedBytes, inflatedBytes);
+                byte[] inflatedBytes = new byte[inflatedBytesCount];
+                inflatedBuf.get(inflatedBytes);
+                byte[] expectedBytes = Arrays.copyOfRange(input, 0, 
inflatedBytes.length);
+                Assert.assertArrayEquals("Inflated buf has unexpected 
content", expectedBytes, inflatedBytes);
 
-            inputOffset += inflatedBytes.length;
-            bufNum++;
+                assertEquals("Unexpected number of inflated bytes", 
input.length, inflatedBytesCount);
+            }
         }
-
-        assertEquals("Unexpected number of inflated bytes", input.length, 
inflatedBytesTotal);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java
index 8a85614..40241c9 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java
@@ -598,11 +598,11 @@ public class FieldTableTest extends QpidTestCase
         buf.flip();
 
         long length = buf.getInt() & 0xFFFFFFFFL;
-        buf = buf.slice();
-        buf.limit((int)length);
+        QpidByteBuffer bufSlice = buf.slice();
+        bufSlice.limit((int)length);
 
 
-        FieldTable table2 = new FieldTable(buf);
+        FieldTable table2 = new FieldTable(bufSlice);
 
         Assert.assertEquals((Boolean) true, table2.getBoolean("bool"));
         Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte"));
@@ -615,6 +615,10 @@ public class FieldTableTest extends QpidTestCase
         Assert.assertEquals(Short.valueOf(Short.MAX_VALUE), 
table2.getShort("short"));
         Assert.assertEquals("hello", table2.getString("string"));
         Assert.assertNull(table2.getString("null-string"));
+        buf.dispose();
+        bufSlice.dispose();
+        table.dispose();
+        table2.dispose();
     }
 
     public void testEncodingSize()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
index 19c16e4..978008e 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
@@ -19,10 +19,8 @@
 package org.apache.qpid.server.security;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
@@ -31,19 +29,18 @@ import java.io.ObjectInputStream;
 import java.security.cert.Certificate;
 import java.security.cert.CertificateEncodingException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.TrustStore;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -96,19 +93,16 @@ public class TrustStoreMessageSourceTest extends 
QpidTestCase
     {
         final int bodySize = (int) message.getSize();
         byte[] msgContent = new byte[bodySize];
-        final Collection<QpidByteBuffer> allData = 
message.getStoredMessage().getContent(0, bodySize);
-        int total = 0;
-        for(QpidByteBuffer b : allData)
+        final List<String> certificates;
+        final ByteArrayInputStream bytesIn;
+        try (QpidByteBuffer allData = message.getStoredMessage().getContent(0, 
bodySize))
         {
-            int len = b.remaining();
-            b.get(msgContent, total, len);
-            b.dispose();
-            total += len;
+            assertEquals("Unexpected message size was retrieved", bodySize, 
allData.remaining());
+            allData.get(msgContent);
         }
-        assertEquals("Unexpected message size was retrieved", bodySize, total);
 
-        List<String> certificates = new ArrayList<>();
-        ByteArrayInputStream bytesIn = new ByteArrayInputStream(msgContent);
+        certificates = new ArrayList<>();
+        bytesIn = new ByteArrayInputStream(msgContent);
         try (ObjectInputStream is = new ObjectInputStream(bytesIn))
         {
             ArrayList<byte[]> encodedCertificates = (ArrayList<byte[]>) 
is.readObject();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
 
b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index f2750f4..7f7f16a 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -366,18 +366,6 @@ public abstract class MessageStoreTestCase extends 
QpidTestCase
         assertTrue("Message with id " + messageId3 + " is not found", 
enqueuedIds.contains(messageId3));
     }
 
-    public void testStoreIgnoresTransientMessage() throws Exception
-    {
-        long messageId = 1;
-        int contentSize = 0;
-        final StoredMessage<TestMessageMetaData> message = 
_store.addMessage(new TestMessageMetaData(messageId, contentSize, 
false)).allContentAdded();
-
-        MessageHandler handler = mock(MessageHandler.class);
-        _storeReader.visitMessages(handler);
-
-        verify(handler, times(0)).handle(argThat(new 
MessageMetaDataMatcher(messageId)));
-    }
-
     public void testAddAndRemoveMessageWithoutContent() throws Exception
     {
         long messageId = 1;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
 
b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
index ee67dd2..1abf0c0 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java
@@ -20,19 +20,16 @@
  */
 package org.apache.qpid.server.store;
 
-import java.util.List;
-
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 
 public class TestMessageMetaDataFactory implements 
MessageMetaDataType.Factory<TestMessageMetaData>
 {
     @Override
-    public TestMessageMetaData createMetaData(List<QpidByteBuffer> bufs)
+    public TestMessageMetaData createMetaData(QpidByteBuffer buf)
     {
-        long id = QpidByteBufferUtils.getLong(bufs);
-        int size = QpidByteBufferUtils.getInt(bufs);
+        long id = buf.getLong();
+        int size = buf.getInt();
 
         return new TestMessageMetaData(id, size);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
 
b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 50d9428..231dd35 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -20,9 +20,6 @@
  */
 package org.apache.qpid.server.store;
 
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
@@ -42,9 +39,9 @@ public class TestMessageMetaDataType implements 
MessageMetaDataType<TestMessageM
     }
 
     @Override
-    public TestMessageMetaData createMetaData(List<QpidByteBuffer> bufs)
+    public TestMessageMetaData createMetaData(QpidByteBuffer buf)
     {
-        return TestMessageMetaData.FACTORY.createMetaData(bufs);
+        return TestMessageMetaData.FACTORY.createMetaData(buf);
     }
 
     @Override
@@ -108,7 +105,13 @@ public class TestMessageMetaDataType implements 
MessageMetaDataType<TestMessageM
         }
 
         @Override
-        public Collection<QpidByteBuffer> getContent(int offset, int length)
+        public QpidByteBuffer getContent()
+        {
+            return null;
+        }
+
+        @Override
+        public QpidByteBuffer getContent(int offset, int length)
         {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java 
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index da61a64..3ccc94b 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.txn;
 
-import java.util.Collection;
-
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
@@ -127,7 +125,13 @@ class MockServerMessage implements ServerMessage
     }
 
     @Override
-    public Collection<QpidByteBuffer> getContent(int offset, int length)
+    public QpidByteBuffer getContent()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public QpidByteBuffer getContent(int offset, int length)
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
index d5e184d..9af3300 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
@@ -58,5 +58,9 @@ public class VirtualHostPropertiesNodeTest extends 
QpidTestCase
         MessageInstanceConsumer consumer = 
_virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, 
getTestName(), options, 0);
         final MessageContainer messageContainer = consumer.pullMessage();
         assertNotNull("Could not pull message from VirtualHostPropertyNode", 
messageContainer);
+        if (messageContainer.getMessageReference() != null)
+        {
+            messageContainer.getMessageReference().release();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index e342db7..ce5551f 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -21,10 +21,8 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,16 +37,9 @@ import 
org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
@@ -59,8 +50,13 @@ import 
org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
 import org.apache.qpid.server.protocol.v0_10.transport.Method;
 import org.apache.qpid.server.protocol.v0_10.transport.Option;
-import org.apache.qpid.server.util.ByteBufferUtils;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.GZIPUtils;
+import org.apache.qpid.server.util.StateChangeListener;
 
 public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0_10>
 {
@@ -254,50 +250,41 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
         boolean msgCompressed = messageProps != null && 
GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
 
 
-        Collection<QpidByteBuffer> bodyBuffers = msg.getBody();
+        QpidByteBuffer bodyBuffer = msg.getBody();
 
         boolean compressionSupported = 
_session.getConnection().getConnectionDelegate().isCompressionSupported();
 
-        if(msgCompressed && !compressionSupported && bodyBuffers != null)
+        if(msgCompressed && !compressionSupported && bodyBuffer != null)
         {
-            Collection<QpidByteBuffer> uncompressedBuffers = 
inflateIfPossible(bodyBuffers);
+            QpidByteBuffer uncompressedBuffer = inflateIfPossible(bodyBuffer);
             messageProps.setContentEncoding(null);
-            for (QpidByteBuffer buf : bodyBuffers)
-            {
-                buf.dispose();
-            }
-            bodyBuffers = uncompressedBuffers;
+            bodyBuffer.dispose();
+            bodyBuffer = uncompressedBuffer;
         }
         else if(!msgCompressed
                 && compressionSupported
                 && (messageProps == null || messageProps.getContentEncoding() 
== null)
-                && bodyBuffers != null
-                && ByteBufferUtils.remaining(bodyBuffers) > 
_session.getConnection().getMessageCompressionThreshold())
+                && bodyBuffer != null
+                && bodyBuffer.remaining() > 
_session.getConnection().getMessageCompressionThreshold())
         {
-            Collection<QpidByteBuffer> compressedBuffers = 
deflateIfPossible(bodyBuffers);
+            QpidByteBuffer compressedBuffers = deflateIfPossible(bodyBuffer);
             if(messageProps == null)
             {
                 messageProps = new MessageProperties();
             }
             messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
-            for (QpidByteBuffer buf : bodyBuffers)
-            {
-                buf.dispose();
-            }
-            bodyBuffers = compressedBuffers;
+            bodyBuffer.dispose();
+            bodyBuffer = compressedBuffers;
         }
 
         Header header = new Header(deliveryProps, messageProps, 
msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
 
-        xfr = batch ? new 
MessageTransfer(_name,_acceptMode,_acquireMode,header, bodyBuffers, BATCHED)
-                    : new 
MessageTransfer(_name,_acceptMode,_acquireMode,header, bodyBuffers);
-        if (bodyBuffers != null)
+        xfr = batch ? new MessageTransfer(_name, _acceptMode, _acquireMode, 
header, bodyBuffer, BATCHED)
+                    : new MessageTransfer(_name, _acceptMode, _acquireMode, 
header, bodyBuffer);
+        if (bodyBuffer != null)
         {
-            for (QpidByteBuffer buf : bodyBuffers)
-            {
-                buf.dispose();
-            }
-            bodyBuffers = null;
+            bodyBuffer.dispose();
+            bodyBuffer = null;
         }
         if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != 
MessageAcquireMode.PRE_ACQUIRED)
         {
@@ -612,11 +599,11 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
     }
 
 
-    private Collection<QpidByteBuffer> deflateIfPossible(final 
Collection<QpidByteBuffer> buffers)
+    private QpidByteBuffer deflateIfPossible(final QpidByteBuffer buffer)
     {
         try
         {
-            return QpidByteBuffer.deflate(buffers);
+            return QpidByteBuffer.deflate(buffer);
         }
         catch (IOException e)
         {
@@ -625,11 +612,11 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
         }
     }
 
-    private Collection<QpidByteBuffer> inflateIfPossible(final 
Collection<QpidByteBuffer> buffers)
+    private QpidByteBuffer inflateIfPossible(final QpidByteBuffer buffer)
     {
         try
         {
-            return QpidByteBuffer.inflate(buffers);
+            return QpidByteBuffer.inflate(buffer);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 4ffbd3f..114b0f2 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -22,8 +22,6 @@ package org.apache.qpid.server.protocol.v0_10;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.UUID;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -107,9 +105,9 @@ public class MessageConverter_Internal_to_v0_10 implements 
MessageConverter<Inte
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent(final int 
offset, final int length)
+                    public QpidByteBuffer getContent(final int offset, final 
int length)
                     {
-                        return 
Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length));
+                        return QpidByteBuffer.wrap(messageContent, offset, 
length);
                     }
 
                     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index dfdbc9a..8c9db87 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -22,8 +22,6 @@ package org.apache.qpid.server.protocol.v0_10;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import java.util.Collection;
-
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -83,7 +81,7 @@ public class MessageConverter_v0_10 implements 
MessageConverter<ServerMessage, M
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent(final int 
offset, final int length)
+                    public QpidByteBuffer getContent(final int offset, final 
int length)
                     {
                         return serverMsg.getContent(offset, length);
                     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index 0610d09..8121dd6 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -67,13 +67,9 @@ public class MessageConverter_v0_10_to_Internal implements 
MessageConverter<Mess
     {
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
-        int total = 0;
-        for(QpidByteBuffer b : serverMessage.getContent(0, (int) 
serverMessage.getSize()))
+        try (QpidByteBuffer content = serverMessage.getContent())
         {
-            int len = b.remaining();
-            b.get(data, total, len);
-            b.dispose();
-            total += len;
+            content.get(data);
         }
 
         String encoding = serverMessage.getMessageHeader().getEncoding();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
index f74e291..de9afcc 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java
@@ -27,14 +27,14 @@ import java.util.List;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.MessageHandle;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+import org.apache.qpid.server.store.MessageHandle;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 @PluggableService
 public class MessageFormat_0_10 implements 
MessageFormat<MessageTransferMessage>
@@ -63,7 +63,7 @@ public class MessageFormat_0_10 implements 
MessageFormat<MessageTransferMessage>
     // format: <int header count> <headers> <body>
 
     @Override
-    public List<QpidByteBuffer> convertToMessageFormat(final 
MessageTransferMessage message)
+    public QpidByteBuffer convertToMessageFormat(final MessageTransferMessage 
message)
     {
         ServerEncoder encoder = new ServerEncoder(4096, true);
         Struct[] structs = message.getHeader().getStructs();
@@ -72,22 +72,21 @@ public class MessageFormat_0_10 implements 
MessageFormat<MessageTransferMessage>
         {
             encoder.writeStruct32(struct);
         }
-        final QpidByteBuffer headerBuf = encoder.getBuffer();
-        List<QpidByteBuffer> bufs = new ArrayList<>();
-        bufs.add(headerBuf);
-        bufs.addAll(message.getContent(0, (int) message.getSize()));
-
-        return bufs;
+        try (QpidByteBuffer headerBuf = encoder.getBuffer();
+             QpidByteBuffer content = message.getContent())
+        {
+            return QpidByteBuffer.concatenate(headerBuf, content);
+        }
     }
 
     @Override
-    public MessageTransferMessage createMessage(final List<QpidByteBuffer> buf,
+    public MessageTransferMessage createMessage(final QpidByteBuffer payload,
                                                 final MessageStore store,
                                                 final Object 
connectionReference)
     {
         try
         {
-            ServerDecoder serverDecoder = new ServerDecoder(buf);
+            ServerDecoder serverDecoder = new ServerDecoder(payload);
             int headerCount = serverDecoder.readInt32();
             DeliveryProperties deliveryProperties = null;
             MessageProperties messageProperties = null;
@@ -113,20 +112,11 @@ public class MessageFormat_0_10 implements 
MessageFormat<MessageTransferMessage>
                 }
             }
             Header header = new Header(deliveryProperties, messageProperties, 
nonStandard);
-            int bodySize = 0;
-            for(QpidByteBuffer content : buf)
-            {
-                bodySize += content.remaining();
-            }
-            MessageMetaData_0_10 metaData = new MessageMetaData_0_10(header, 
bodySize, System.currentTimeMillis());
+            MessageMetaData_0_10 metaData = new MessageMetaData_0_10(header,
+                                                                     
payload.remaining(),
+                                                                     
System.currentTimeMillis());
             final MessageHandle<MessageMetaData_0_10> handle = 
store.addMessage(metaData);
-            for (QpidByteBuffer content : buf)
-            {
-                if (content.hasRemaining())
-                {
-                    handle.addContent(content);
-                }
-            }
+            handle.addContent(payload);
             final StoredMessage<MessageMetaData_0_10> storedMessage = 
handle.allContentAdded();
             return new MessageTransferMessage(storedMessage, 
connectionReference);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
index 2dcb5f5..bc3cbed 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import java.util.List;
-
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -42,9 +40,9 @@ public class MessageMetaDataType_0_10 implements 
MessageMetaDataType<MessageMeta
     }
 
     @Override
-    public MessageMetaData_0_10 createMetaData(List<QpidByteBuffer> bufs)
+    public MessageMetaData_0_10 createMetaData(QpidByteBuffer buf)
     {
-        return MessageMetaData_0_10.FACTORY.createMetaData(bufs);
+        return MessageMetaData_0_10.FACTORY.createMetaData(buf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 2861cc2..d51b158 100755
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -263,7 +263,7 @@ public class MessageMetaData_0_10 implements 
StorableMessageMetaData
     private static class MetaDataFactory implements 
MessageMetaDataType.Factory<MessageMetaData_0_10>
     {
         @Override
-        public MessageMetaData_0_10 createMetaData(List<QpidByteBuffer> buf)
+        public MessageMetaData_0_10 createMetaData(QpidByteBuffer buf)
         {
             ServerDecoder decoder = new ServerDecoder(buf);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index e6b2c8d..ba35c01 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -20,14 +20,12 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import java.util.Collection;
-
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.protocol.v0_10.transport.Header;
 
 
 public class MessageTransferMessage extends 
AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10>
@@ -102,8 +100,8 @@ public class MessageTransferMessage extends 
AbstractServerMessageImpl<MessageTra
         return getMetaData().getHeader();
     }
 
-    public Collection<QpidByteBuffer> getBody()
+    public QpidByteBuffer getBody()
     {
-        return  getContent(0, (int) getSize());
+        return  getContent();
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
index f3dde0d..abe0842 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
@@ -26,9 +26,7 @@ import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -39,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Method;
@@ -46,7 +45,6 @@ import 
org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
 import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
 import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
 import org.apache.qpid.server.protocol.v0_10.transport.Struct;
-import org.apache.qpid.server.protocol.v0_10.transport.Frame;
 
 public class ServerAssembler
 {
@@ -77,35 +75,54 @@ public class ServerAssembler
         {
             PeekingIterator<ServerFrame> itr = 
Iterators.peekingIterator(frames.iterator());
 
-            while(itr.hasNext())
+            boolean cleanExit = false;
+            try
             {
-                final ServerFrame frame = itr.next();
-                final int frameChannel = frame.getChannel();
-
-                ServerSession channel = _connection.getSession(frameChannel);
-                if (channel != null)
+                while(itr.hasNext())
                 {
-                    final AccessControlContext context = 
channel.getAccessControllerContext();
-                    AccessController.doPrivileged((PrivilegedAction<Void>) () 
->
+                    final ServerFrame frame = itr.next();
+                    final int frameChannel = frame.getChannel();
+
+                    ServerSession channel = 
_connection.getSession(frameChannel);
+                    if (channel != null)
                     {
-                        ServerFrame channelFrame = frame;
-                        boolean nextIsSameChannel;
-                        do
+                        final AccessControlContext context = 
channel.getAccessControllerContext();
+                        AccessController.doPrivileged((PrivilegedAction<Void>) 
() ->
                         {
-                            received(channelFrame);
-                            nextIsSameChannel = itr.hasNext() && frameChannel 
== itr.peek().getChannel();
-                            if (nextIsSameChannel)
+                            ServerFrame channelFrame = frame;
+                            boolean nextIsSameChannel;
+                            do
                             {
-                                channelFrame = itr.next();
+                                received(channelFrame);
+                                nextIsSameChannel = itr.hasNext() && 
frameChannel == itr.peek().getChannel();
+                                if (nextIsSameChannel)
+                                {
+                                    channelFrame = itr.next();
+                                }
                             }
-                        }
-                        while (nextIsSameChannel);
-                        return null;
-                    }, context);
+                            while (nextIsSameChannel);
+                            return null;
+                        }, context);
+                    }
+                    else
+                    {
+                        received(frame);
+                    }
                 }
-                else
+                cleanExit = true;
+            }
+            finally
+            {
+                if (!cleanExit)
                 {
-                    received(frame);
+                    while (itr.hasNext())
+                    {
+                        final QpidByteBuffer body = itr.next().getBody();
+                        if (body != null)
+                        {
+                            body.dispose();
+                        }
+                    }
                 }
             }
         }
@@ -183,11 +200,9 @@ public class ServerAssembler
 
     public void frame(ServerFrame frame)
     {
-        List<QpidByteBuffer> frameBuffers;
         if (frame.isFirstFrame() && frame.isLastFrame())
         {
-            frameBuffers = Collections.singletonList(frame.getBody());
-            assemble(frame, frameBuffers);
+            assemble(frame, frame.getBody());
         }
         else
         {
@@ -207,98 +222,103 @@ public class ServerAssembler
             if (frame.isLastFrame())
             {
                 clearSegment(frame);
-                frameBuffers = new ArrayList<>(frames.size());
+                List<QpidByteBuffer> frameBuffers = new 
ArrayList<>(frames.size());
                 for (ServerFrame f : frames)
                 {
-
                     frameBuffers.add(f.getBody());
                 }
-                assemble(frame, frameBuffers);
+                QpidByteBuffer combined = 
QpidByteBuffer.concatenate(frameBuffers);
+                for (QpidByteBuffer buffer : frameBuffers)
+                {
+                    buffer.dispose();
+                }
+                assemble(frame, combined);
             }
         }
 
     }
 
-    private void assemble(ServerFrame frame, List<QpidByteBuffer> frameBuffers)
+    private void assemble(ServerFrame frame, QpidByteBuffer frameBuffer)
     {
-        ServerDecoder dec = new ServerDecoder(frameBuffers);
+        try
+        {
+            ServerDecoder dec = new ServerDecoder(frameBuffer);
 
-        int channel = frame.getChannel();
-        Method command;
+            int channel = frame.getChannel();
+            Method command;
 
-        switch (frame.getType())
-        {
-            case CONTROL:
-                int controlType = dec.readUint16();
-                Method control = Method.create(controlType);
-                control.read(dec);
-                emit(channel, control);
-                break;
-            case COMMAND:
-                int commandType = dec.readUint16();
-                // read in the session header, right now we don't use it
-                int hdr = dec.readUint16();
-                command = Method.create(commandType);
-                command.setSync((0x0001 & hdr) != 0);
-                command.read(dec);
-                if (command.hasPayload() && !frame.isLastSegment())
-                {
-                    setIncompleteCommand(channel, command);
-                }
-                else
-                {
-                    emit(channel, command);
-                }
-                break;
-            case HEADER:
-                command = getIncompleteCommand(channel);
-                List<Struct> structs = null;
-                DeliveryProperties deliveryProps = null;
-                MessageProperties messageProps = null;
-
-                while (dec.hasRemaining())
-                {
-                    Struct struct = dec.readStruct32();
-                    if(struct instanceof  DeliveryProperties && deliveryProps 
== null)
+            switch (frame.getType())
+            {
+                case CONTROL:
+                    int controlType = dec.readUint16();
+                    Method control = Method.create(controlType);
+                    control.read(dec);
+                    emit(channel, control);
+                    break;
+                case COMMAND:
+                    int commandType = dec.readUint16();
+                    // read in the session header, right now we don't use it
+                    int hdr = dec.readUint16();
+                    command = Method.create(commandType);
+                    command.setSync((0x0001 & hdr) != 0);
+                    command.read(dec);
+                    if (command.hasPayload() && !frame.isLastSegment())
                     {
-                        deliveryProps = (DeliveryProperties) struct;
+                        setIncompleteCommand(channel, command);
                     }
-                    else if(struct instanceof MessageProperties && 
messageProps == null)
+                    else
                     {
-                        messageProps = (MessageProperties) struct;
+                        emit(channel, command);
                     }
-                    else
+                    break;
+                case HEADER:
+                    command = getIncompleteCommand(channel);
+                    List<Struct> structs = null;
+                    DeliveryProperties deliveryProps = null;
+                    MessageProperties messageProps = null;
+
+                    while (dec.hasRemaining())
                     {
-                        if(structs == null)
+                        Struct struct = dec.readStruct32();
+                        if (struct instanceof DeliveryProperties && 
deliveryProps == null)
+                        {
+                            deliveryProps = (DeliveryProperties) struct;
+                        }
+                        else if (struct instanceof MessageProperties && 
messageProps == null)
+                        {
+                            messageProps = (MessageProperties) struct;
+                        }
+                        else
                         {
-                            structs = new ArrayList<>(2);
+                            if (structs == null)
+                            {
+                                structs = new ArrayList<>(2);
+                            }
+                            structs.add(struct);
                         }
-                        structs.add(struct);
                     }
+                    command.setHeader(new Header(deliveryProps, messageProps, 
structs));
 
-                }
-                command.setHeader(new 
Header(deliveryProps,messageProps,structs));
-
-                if (frame.isLastSegment())
-                {
+                    if (frame.isLastSegment())
+                    {
+                        setIncompleteCommand(channel, null);
+                        emit(channel, command);
+                    }
+                    break;
+                case BODY:
+                    command = getIncompleteCommand(channel);
+                    command.setBody(frameBuffer);
                     setIncompleteCommand(channel, null);
                     emit(channel, command);
-                }
-                break;
-            case BODY:
-                command = getIncompleteCommand(channel);
-                command.setBody(frameBuffers);
-                setIncompleteCommand(channel, null);
-                emit(channel, command);
-
-                break;
-            default:
-                throw new IllegalStateException("unknown frame type: " + 
frame.getType());
-        }
 
-        for(QpidByteBuffer buf : frameBuffers)
+                    break;
+                default:
+                    throw new IllegalStateException("unknown frame type: " + 
frame.getType());
+            }
+        }
+        finally
         {
-            buf.dispose();
+            frameBuffer.dispose();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 11e2ab5..8032b35 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -61,13 +61,15 @@ import 
org.apache.qpid.server.protocol.v0_10.transport.ConnectionCloseOk;
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionException;
 import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
 import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
 import org.apache.qpid.server.protocol.v0_10.transport.Method;
 import org.apache.qpid.server.protocol.v0_10.transport.Option;
 import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionDetachCode;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionDetached;
 import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.transport.*;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.transport.network.NetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -276,6 +278,13 @@ public class ServerConnection extends ConnectionInvoker
             }
             event.delegate(this, delegate);
         }
+        else
+        {
+            if (event instanceof MessageTransfer)
+            {
+                ((MessageTransfer) event).dispose();
+            }
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
index 04819f0..5d26d59 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
@@ -20,137 +20,58 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import java.util.List;
-
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v0_10.transport.AbstractDecoder;
 
 final class ServerDecoder extends AbstractDecoder
 {
-    private final List<QpidByteBuffer> _underlying;
-    private int _bufferIndex;
+    private final QpidByteBuffer _underlying;
 
-    ServerDecoder(List<QpidByteBuffer> in)
+    ServerDecoder(QpidByteBuffer in)
     {
         _underlying = in;
-        _bufferIndex = 0;
-    }
-
-    private void advanceIfNecessary()
-    {
-        while(!getCurrentBuffer().hasRemaining() && _bufferIndex != 
_underlying.size()-1)
-        {
-            _bufferIndex++;
-        }
-    }
-
-    private QpidByteBuffer getBuffer(int size)
-    {
-        advanceIfNecessary();
-        final QpidByteBuffer currentBuffer = getCurrentBuffer();
-        if(currentBuffer.remaining()>= size)
-        {
-            return currentBuffer;
-        }
-        else
-        {
-            return readAsQpidByteBuffer(size);
-        }
-    }
-
-    private QpidByteBuffer readAsQpidByteBuffer(int len)
-    {
-        QpidByteBuffer currentBuffer = getCurrentBuffer();
-        if(currentBuffer.remaining()>=len)
-        {
-            QpidByteBuffer buf = currentBuffer.slice();
-            buf.limit(len);
-            currentBuffer.position(currentBuffer.position()+len);
-            return buf;
-        }
-        else
-        {
-            QpidByteBuffer dest = QpidByteBuffer.allocate(len);
-            while(dest.hasRemaining() && available()>0)
-            {
-                advanceIfNecessary();
-                currentBuffer = getCurrentBuffer();
-                final int remaining = dest.remaining();
-                if(currentBuffer.remaining()>= remaining)
-                {
-                    QpidByteBuffer buf = currentBuffer.slice();
-                    buf.limit(remaining);
-                    currentBuffer.position(currentBuffer.position()+remaining);
-                    dest.put(buf);
-                    buf.dispose();
-                }
-                else
-                {
-                    dest.put(currentBuffer);
-                }
-            }
-
-            dest.flip();
-            return dest;
-        }
-    }
-
-    private int available()
-    {
-        int remaining = 0;
-        for(int i = _bufferIndex; i < _underlying.size(); i++)
-        {
-            remaining += _underlying.get(i).remaining();
-        }
-        return remaining;
-    }
-
-
-    private QpidByteBuffer getCurrentBuffer()
-    {
-        return _underlying.get(_bufferIndex);
     }
 
     @Override
     protected byte doGet()
     {
-        return getBuffer(1).get();
+        return _underlying.get();
     }
 
     @Override
     protected void doGet(byte[] bytes)
     {
-        getBuffer(bytes.length).get(bytes);
+        _underlying.get(bytes);
     }
 
     @Override
     public boolean hasRemaining()
     {
-        return available() != 0;
+        return _underlying.remaining() != 0;
     }
 
     @Override
     public short readUint8()
     {
-        return (short) (0xFF & getBuffer(1).get());
+        return _underlying.getUnsignedByte();
     }
 
     @Override
     public int readUint16()
     {
-        return 0xFFFF & getBuffer(2).getShort();
+        return _underlying.getUnsignedShort();
     }
 
     @Override
     public long readUint32()
     {
-        return 0xFFFFFFFFL & getBuffer(4).getInt();
+        return _underlying.getUnsignedInt();
     }
 
     @Override
     public long readUint64()
     {
-        return getBuffer(8).getLong();
+        return _underlying.getLong();
     }
 
        @Override
@@ -172,44 +93,44 @@ final class ServerDecoder extends AbstractDecoder
        @Override
     public double readDouble()
        {
-               return getBuffer(8).getDouble();
+        return _underlying.getDouble();
        }
 
        @Override
     public float readFloat()
        {
-               return getBuffer(4).getFloat();
+        return _underlying.getFloat();
        }
 
        @Override
     public short readInt16()
        {
-               return getBuffer(2).getShort();
+        return _underlying.getShort();
        }
 
        @Override
     public int readInt32()
        {
-               return getBuffer(4).getInt();
+        return _underlying.getInt();
        }
 
        @Override
     public byte readInt8()
        {
-               return getBuffer(1).get();
+        return _underlying.get();
        }
 
        @Override
     public byte[] readRemainingBytes()
        {
-      byte[] result = new byte[available()];
-      get(result);
-      return result;           
-       }
+        byte[] result = new byte[_underlying.remaining()];
+        get(result);
+        return result;
+    }
 
        @Override
     public long readInt64()
        {
-               return getBuffer(8).getLong();
+        return _underlying.getLong();
        }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to