http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java index d98c4f1..3653c05 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java @@ -29,23 +29,19 @@ import java.lang.reflect.Method; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; -import java.nio.CharBuffer; +import java.nio.InvalidMarkException; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; import com.google.common.io.ByteStreams; import org.junit.Assert; import org.mockito.internal.util.Primitives; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.server.util.ByteBufferUtils; public class QpidByteBufferTest extends QpidTestCase { + private static final int BUFFER_FRAGMENT_SIZE = 5; private static final int BUFFER_SIZE = 10; private static final int POOL_SIZE = 20; private static final double SPARSITY_FRACTION = 0.5; @@ -59,20 +55,29 @@ public class QpidByteBufferTest extends QpidTestCase { super.setUp(); QpidByteBuffer.deinitialisePool(); - QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE, SPARSITY_FRACTION); + QpidByteBuffer.initialisePool(BUFFER_FRAGMENT_SIZE, POOL_SIZE, SPARSITY_FRACTION); _parent = QpidByteBuffer.allocateDirect(BUFFER_SIZE); } @Override public void tearDown() throws Exception { - super.tearDown(); - _parent.dispose(); - if (_slicedBuffer != null) + try { - _slicedBuffer.dispose(); + super.tearDown(); + } + finally + { + if (_parent != null) + { + _parent.dispose(); + } + if (_slicedBuffer != null) + { + _slicedBuffer.dispose(); + } + QpidByteBuffer.deinitialisePool(); } - QpidByteBuffer.deinitialisePool(); } public void testPutGetByIndex() throws Exception @@ -113,6 +118,25 @@ public class QpidByteBufferTest extends QpidTestCase assertEquals("Unexpected position after reset", 0, _slicedBuffer.position()); } + public void testMarkResetAcrossFragmentBoundary() throws Exception + { + for (int i = 0; i < BUFFER_SIZE; ++i) + { + _parent.put((byte) i); + } + _parent.flip(); + _parent.mark(); + for (int i = 0; i < BUFFER_FRAGMENT_SIZE + 2; ++i) + { + assertEquals("Unexpected value", i, _parent.get()); + } + _parent.reset(); + for (int i = 0; i < BUFFER_SIZE; ++i) + { + assertEquals("Unexpected value", i, _parent.get()); + } + } + public void testPosition() throws Exception { _slicedBuffer = createSlice(); @@ -135,155 +159,102 @@ public class QpidByteBufferTest extends QpidTestCase } } - public void testBulkPutGet() throws Exception + public void testSettingPositionBackwardsResetsMark() { - _slicedBuffer = createSlice(); - - final byte[] source = getTestBytes(_slicedBuffer.remaining()); - - QpidByteBuffer rv = _slicedBuffer.put(source, 0, source.length); - assertEquals("Unexpected builder return value", _slicedBuffer, rv); - - _slicedBuffer.flip(); - byte[] target = new byte[_slicedBuffer.remaining()]; - rv = _slicedBuffer.get(target, 0, target.length); - assertEquals("Unexpected builder return value", _slicedBuffer, rv); - - Assert.assertArrayEquals("Unexpected bulk put/get result", source, target); - - - _slicedBuffer.clear(); - _slicedBuffer.position(1); - + _parent.position(8); + _parent.mark(); + _parent.position(7); try { - _slicedBuffer.put(source, 0, source.length); - fail("Exception not thrown"); + _parent.reset(); + fail("Expected exception not thrown"); } - catch (BufferOverflowException e) + catch (InvalidMarkException e) { // pass } + } - assertEquals("Position should be unchanged after failed put", 1, _slicedBuffer.position()); + public void testSettingPositionForwardDoeNotResetMark() + { + final int originalPosition = 3; + _parent.position(originalPosition); + _parent.mark(); + _parent.position(9); + + _parent.reset(); + + assertEquals("Unexpected position", originalPosition, _parent.position()); + } + + public void testRewind() throws Exception + { + final int expectedLimit = 7; + _parent.position(1); + _parent.limit(expectedLimit); + _parent.mark(); + _parent.position(3); + + _parent.rewind(); + assertEquals("Unexpected position", 0, _parent.position()); + assertEquals("Unexpected limit", expectedLimit, _parent.limit()); try { - _slicedBuffer.get(target, 0, target.length); - fail("Exception not thrown"); + _parent.reset(); + fail("Expected exception not thrown"); } - catch (BufferUnderflowException e) + catch (InvalidMarkException e) { // pass } - - assertEquals("Position should be unchanged after failed get", 1, _slicedBuffer.position()); - - } - public void testByteBufferPutGet() + public void testBulkPutGet() throws Exception { _slicedBuffer = createSlice(); - final byte[] source = getTestBytes(_slicedBuffer.remaining()); - ByteBuffer sourceByteBuffer = ByteBuffer.wrap(source); + final byte[] source = getTestBytes(_slicedBuffer.remaining()); - QpidByteBuffer rv = _slicedBuffer.put(sourceByteBuffer); + QpidByteBuffer rv = _slicedBuffer.put(source, 0, source.length); assertEquals("Unexpected builder return value", _slicedBuffer, rv); - assertEquals("Unexpected position", _slicedBuffer.capacity(), _slicedBuffer.position()); - assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining()); - - assertEquals("Unexpected remaining in source ByteBuffer", 0, sourceByteBuffer.remaining()); - _slicedBuffer.flip(); + byte[] target = new byte[_slicedBuffer.remaining()]; + rv = _slicedBuffer.get(target, 0, target.length); + assertEquals("Unexpected builder return value", _slicedBuffer, rv); - ByteBuffer destinationByteBuffer = ByteBuffer.allocate(source.length); - _slicedBuffer.get(destinationByteBuffer); - - - assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining()); - - assertEquals("Unexpected remaining in destination ByteBuffer", 0, destinationByteBuffer.remaining()); - assertEquals("Unexpected position in destination ByteBuffer", source.length, destinationByteBuffer.position()); + Assert.assertArrayEquals("Unexpected bulk put/get result", source, target); - Assert.assertArrayEquals("Unexpected ByteBuffer put/get result", source, destinationByteBuffer.array()); _slicedBuffer.clear(); _slicedBuffer.position(1); - sourceByteBuffer.clear(); try { - _slicedBuffer.put(sourceByteBuffer); - fail("Exception should be thrown"); + _slicedBuffer.put(source, 0, source.length); + fail("Exception not thrown"); } - catch(BufferOverflowException e) + catch (BufferOverflowException e) { // pass } - assertEquals("Position should not be changed after failed put", 1, _slicedBuffer.position()); - assertEquals("Source position should not changed after failed put", source.length, sourceByteBuffer.remaining()); - - _slicedBuffer.clear(); - destinationByteBuffer.position(1); + assertEquals("Position should be unchanged after failed put", 1, _slicedBuffer.position()); try { - _slicedBuffer.get(destinationByteBuffer); - fail("Exception should be thrown"); + _slicedBuffer.get(target, 0, target.length); + fail("Exception not thrown"); } - catch(BufferUnderflowException e ) + catch (BufferUnderflowException e) { // pass } - } - - public void testQpidByteBufferPutGet() - { - _slicedBuffer = createSlice(); - final byte[] source = getTestBytes(_slicedBuffer.remaining()); - - QpidByteBuffer sourceQpidByteBuffer = QpidByteBuffer.wrap(source); - QpidByteBuffer rv = _slicedBuffer.put(sourceQpidByteBuffer); - assertEquals("Unexpected builder return value", _slicedBuffer, rv); - - assertEquals("Unexpected position", _slicedBuffer.capacity(), _slicedBuffer.position()); - assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining()); - - assertEquals("Unexpected remaining in source QpidByteBuffer", 0, sourceQpidByteBuffer.remaining()); - - _slicedBuffer.flip(); - - ByteBuffer destinationByteBuffer = ByteBuffer.allocate(source.length); - _slicedBuffer.get(destinationByteBuffer); - - assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining()); - - assertEquals("Unexpected remaining in destination ByteBuffer", 0, destinationByteBuffer.remaining()); - assertEquals("Unexpected position in destination ByteBuffer", source.length, destinationByteBuffer.position()); - - Assert.assertArrayEquals("Unexpected ByteBuffer put/get result", source, destinationByteBuffer.array()); - - _slicedBuffer.clear(); - _slicedBuffer.position(1); + assertEquals("Position should be unchanged after failed get", 1, _slicedBuffer.position()); - sourceQpidByteBuffer.clear(); - try - { - _slicedBuffer.put(sourceQpidByteBuffer); - fail("Exception should be thrown"); - } - catch(BufferOverflowException e) - { - // pass - } - assertEquals("Position should not be changed after failed put", 1, _slicedBuffer.position()); - assertEquals("Source position should not changed after failed put", source.length, sourceQpidByteBuffer.remaining()); } public void testDuplicate() @@ -293,8 +264,7 @@ public class QpidByteBufferTest extends QpidTestCase int originalLimit = _slicedBuffer.limit(); _slicedBuffer.limit(originalLimit - 1); - QpidByteBuffer duplicate = _slicedBuffer.duplicate(); - try + try (QpidByteBuffer duplicate = _slicedBuffer.duplicate()) { assertEquals("Unexpected position", _slicedBuffer.position(), duplicate.position() ); assertEquals("Unexpected limit", _slicedBuffer.limit(), duplicate.limit() ); @@ -306,10 +276,6 @@ public class QpidByteBufferTest extends QpidTestCase assertEquals("Unexpected position in the original", 1, _slicedBuffer.position()); assertEquals("Unexpected limit in the original", originalLimit -1, _slicedBuffer.limit()); } - finally - { - duplicate.dispose(); - } } public void testCopyToByteBuffer() @@ -400,8 +366,8 @@ public class QpidByteBufferTest extends QpidTestCase _slicedBuffer.limit(_slicedBuffer.limit() - 1); int remaining = _slicedBuffer.remaining(); - QpidByteBuffer newSlice = _slicedBuffer.slice(); - try + + try (QpidByteBuffer newSlice = _slicedBuffer.slice()) { assertEquals("Unexpected position in original", 1, _slicedBuffer.position()); assertEquals("Unexpected limit in original", source.length - 1, _slicedBuffer.limit()); @@ -416,10 +382,6 @@ public class QpidByteBufferTest extends QpidTestCase System.arraycopy(source, 1, expected, 0, expected.length); Assert.assertArrayEquals("Unexpected slice result", expected, destination); } - finally - { - newSlice.dispose(); - } } public void testViewOfSlice() @@ -431,8 +393,7 @@ public class QpidByteBufferTest extends QpidTestCase _slicedBuffer.position(1); _slicedBuffer.limit(_slicedBuffer.limit() - 1); - QpidByteBuffer view = _slicedBuffer.view(0, _slicedBuffer.remaining()); - try + try (QpidByteBuffer view = _slicedBuffer.view(0, _slicedBuffer.remaining())) { assertEquals("Unexpected position in original", 1, _slicedBuffer.position()); assertEquals("Unexpected limit in original", source.length - 1, _slicedBuffer.limit()); @@ -448,13 +409,8 @@ public class QpidByteBufferTest extends QpidTestCase System.arraycopy(source, 1, expected, 0, expected.length); Assert.assertArrayEquals("Unexpected view result", expected, destination); } - finally - { - view.dispose(); - } - view = _slicedBuffer.view(1, _slicedBuffer.remaining() - 2); - try + try (QpidByteBuffer view = _slicedBuffer.view(1, _slicedBuffer.remaining() - 2)) { assertEquals("Unexpected position in original", 1, _slicedBuffer.position()); assertEquals("Unexpected limit in original", source.length - 1, _slicedBuffer.limit()); @@ -470,10 +426,6 @@ public class QpidByteBufferTest extends QpidTestCase System.arraycopy(source, 2, expected, 0, expected.length); Assert.assertArrayEquals("Unexpected view result", expected, destination); } - finally - { - view.dispose(); - } } public void testAsInputStream() throws Exception @@ -496,40 +448,6 @@ public class QpidByteBufferTest extends QpidTestCase Assert.assertArrayEquals("Unexpected view result", expected, destination.toByteArray()); } - public void testAsByteBuffer() throws Exception - { - _slicedBuffer = createSlice(); - - _slicedBuffer.position(1); - _slicedBuffer.limit(_slicedBuffer.limit() - 1); - - _slicedBuffer.mark(); - int remaining = _slicedBuffer.remaining(); - byte[] source = getTestBytes(remaining); - _slicedBuffer.put(source); - _slicedBuffer.reset(); - - ByteBuffer buffer = _slicedBuffer.asByteBuffer(); - assertEquals("Unexpected remaining", remaining, buffer.remaining()); - - byte[] target = new byte[remaining]; - buffer.get(target); - Assert.assertArrayEquals("Unexpected asByteBuffer result", source, target); - } - - public void testDecode() - { - _slicedBuffer = createSlice(); - final String input = "ABC"; - _slicedBuffer.put(input.getBytes()); - _slicedBuffer.flip(); - - final CharBuffer charBuffer = _slicedBuffer.decode(StandardCharsets.US_ASCII); - final char[] destination = new char[charBuffer.remaining()]; - charBuffer.get(destination); - Assert.assertArrayEquals("Unexpected char buffer", input.toCharArray(), destination); - } - private byte[] getTestBytes(final int length) { final byte[] source = new byte[length]; @@ -598,6 +516,8 @@ public class QpidByteBufferTest extends QpidTestCase { // pass } + _slicedBuffer.dispose(); + _slicedBuffer = null; } private void testPutGetByIndex(final Class<?> primitiveTargetClass, Object value) throws Exception @@ -659,6 +579,8 @@ public class QpidByteBufferTest extends QpidTestCase { // pass } + _slicedBuffer.dispose(); + _slicedBuffer = null; } private void invokeMethod(final Method method, final Object... value) @@ -734,41 +656,49 @@ public class QpidByteBufferTest extends QpidTestCase public void testPooledBufferIsZeroedLoan() throws Exception { - QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE); - - buffer.put((byte) 0xFF); - buffer.dispose(); + try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE)) + { + buffer.put((byte) 0xFF); + } - buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE); - buffer.limit(1); - assertEquals("Pooled QpidByteBuffer is not zeroed.", (byte) 0x0, buffer.get()); + try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE)) + { + buffer.limit(1); + assertEquals("Pooled QpidByteBuffer is not zeroed.", (byte) 0x0, buffer.get()); + } } public void testAllocateDirectOfSameSize() throws Exception { int bufferSize = BUFFER_SIZE; - QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize); - assertEquals("Unexpected buffer size", bufferSize, buffer.capacity()); - assertEquals("Unexpected position on newly created buffer", 0, buffer.position()); - assertEquals("Unexpected limit on newly created buffer", bufferSize, buffer.limit()); + try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize)) + { + assertEquals("Unexpected buffer size", bufferSize, buffer.capacity()); + assertEquals("Unexpected position on newly created buffer", 0, buffer.position()); + assertEquals("Unexpected limit on newly created buffer", bufferSize, buffer.limit()); + } } public void testAllocateDirectOfSmallerSize() throws Exception { int bufferSize = BUFFER_SIZE - 1; - QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize); - assertEquals("Unexpected buffer size", bufferSize, buffer.capacity()); - assertEquals("Unexpected position on newly created buffer", 0, buffer.position()); - assertEquals("Unexpected limit on newly created buffer", bufferSize, buffer.limit()); + try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize)) + { + assertEquals("Unexpected buffer size", bufferSize, buffer.capacity()); + assertEquals("Unexpected position on newly created buffer", 0, buffer.position()); + assertEquals("Unexpected limit on newly created buffer", bufferSize, buffer.limit()); + } } public void testAllocateDirectOfLargerSize() throws Exception { int bufferSize = BUFFER_SIZE + 1; - QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize); - assertEquals("Unexpected buffer size", bufferSize, buffer.capacity()); - assertEquals("Unexpected position on newly created buffer", 0, buffer.position()); - assertEquals("Unexpected limit on newly created buffer", bufferSize, buffer.limit()); + try (QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize)) + { + assertEquals("Unexpected buffer size", bufferSize, buffer.capacity()); + assertEquals("Unexpected position on newly created buffer", 0, buffer.position()); + assertEquals("Unexpected limit on newly created buffer", bufferSize, buffer.limit()); + } } public void testAllocateDirectWithNegativeSize() throws Exception @@ -800,37 +730,34 @@ public class QpidByteBufferTest extends QpidTestCase public void testDeflateInflateDirect() throws Exception { byte[] input = "aaabbbcccddddeeeffff".getBytes(); - Collection<QpidByteBuffer> inputBufs = QpidByteBuffer.allocateDirectCollection(input.length); - - int offset = 0; - for (QpidByteBuffer buf : inputBufs) + try (QpidByteBuffer inputBuf = QpidByteBuffer.allocateDirect(input.length)) { - int len = buf.remaining(); - buf.put(input, offset, len); - buf.flip(); - offset += len; - } - assertEquals(input.length, ByteBufferUtils.remaining(inputBufs)); + inputBuf.put(input); + inputBuf.flip(); + + assertEquals(input.length, inputBuf.remaining()); - doDeflateInflate(input, inputBufs, true); + doDeflateInflate(input, inputBuf, true); + } } public void testDeflateInflateHeap() throws Exception { byte[] input = "aaabbbcccddddeeeffff".getBytes(); - Collection<QpidByteBuffer> inputBufs = Collections.singleton(QpidByteBuffer.wrap(input)); - doDeflateInflate(input, inputBufs, false); + try (QpidByteBuffer buffer = QpidByteBuffer.wrap(input)) + { + doDeflateInflate(input, buffer, false); + } } public void testInflatingUncompressedBytes_ThrowsZipException() throws Exception { byte[] input = "not_a_compressed_stream".getBytes(); - QpidByteBuffer original = QpidByteBuffer.wrap(input); - try + try (QpidByteBuffer original = QpidByteBuffer.wrap(input)) { - QpidByteBuffer.inflate(Collections.singleton(original)); + QpidByteBuffer.inflate(original); fail("Exception not thrown"); } catch(java.util.zip.ZipException ze) @@ -841,109 +768,127 @@ public class QpidByteBufferTest extends QpidTestCase public void testSlice() throws Exception { - QpidByteBuffer directBuffer = QpidByteBuffer.allocate(true, 6); - directBuffer.position(2); - directBuffer.limit(5); - QpidByteBuffer directSlice = directBuffer.slice(); - - assertTrue("Direct slice should be direct too", directSlice.isDirect()); - assertEquals("Unexpected capacity", 3, directSlice.capacity()); - assertEquals("Unexpected limit", 3, directSlice.limit()); - assertEquals("Unexpected position", 0, directSlice.position()); - - directBuffer.dispose(); - directSlice.dispose(); + try (QpidByteBuffer directBuffer = QpidByteBuffer.allocate(true, 6)) + { + directBuffer.position(2); + directBuffer.limit(5); + try (QpidByteBuffer directSlice = directBuffer.slice()) + { + assertTrue("Direct slice should be direct too", directSlice.isDirect()); + assertEquals("Unexpected capacity", 3, directSlice.capacity()); + assertEquals("Unexpected limit", 3, directSlice.limit()); + assertEquals("Unexpected position", 0, directSlice.position()); + } + } } public void testView() throws Exception { byte[] content = "ABCDEF".getBytes(); - QpidByteBuffer buffer = QpidByteBuffer.allocate(true, content.length); - buffer.put(content); - buffer.position(2); - buffer.limit(5); - - QpidByteBuffer view = buffer.view(0, buffer.remaining()); - - assertTrue("Unexpected view direct", view.isDirect()); - - assertEquals("Unexpected capacity", 3, view.capacity()); - assertEquals("Unexpected limit", 3, view.limit()); - assertEquals("Unexpected position", 0, view.position()); - - byte[] destination = new byte[view.remaining()]; - view.get(destination); - - Assert.assertArrayEquals("CDE".getBytes(), destination); - - QpidByteBuffer viewWithOffset = buffer.view(1, 1); - destination = new byte[viewWithOffset.remaining()]; - viewWithOffset.get(destination); + try (QpidByteBuffer buffer = QpidByteBuffer.allocate(true, content.length)) + { + buffer.put(content); + buffer.position(2); + buffer.limit(5); - Assert.assertArrayEquals("D".getBytes(), destination); + try (QpidByteBuffer view = buffer.view(0, buffer.remaining())) + { + assertTrue("Unexpected view direct", view.isDirect()); + assertEquals("Unexpected capacity", 3, view.capacity()); + assertEquals("Unexpected limit", 3, view.limit()); + assertEquals("Unexpected position", 0, view.position()); + + final byte[] destination = new byte[view.remaining()]; + view.get(destination); + Assert.assertArrayEquals("CDE".getBytes(), destination); + } - buffer.dispose(); - view.dispose(); - viewWithOffset.dispose(); + try (QpidByteBuffer viewWithOffset = buffer.view(1, 1)) + { + final byte[] destination = new byte[viewWithOffset.remaining()]; + viewWithOffset.get(destination); + Assert.assertArrayEquals("D".getBytes(), destination); + } + } } public void testSparsity() { assertFalse("Unexpected sparsity after creation", _parent.isSparse()); - QpidByteBuffer child = _parent.view(0, 6); + QpidByteBuffer child = _parent.view(0, 8); QpidByteBuffer grandChild = child.view(0, 2); assertFalse("Unexpected sparsity after child creation", _parent.isSparse()); _parent.dispose(); + _parent = null; assertFalse("Unexpected sparsity after parent disposal", child.isSparse()); child.dispose(); assertTrue("Buffer should be sparse", grandChild.isSparse()); + grandChild.dispose(); } public void testAsQpidByteBuffers() throws IOException { byte[] dataForTwoBufs = "01234567890".getBytes(StandardCharsets.US_ASCII); - Collection<QpidByteBuffer> qpidByteBuffers = QpidByteBuffer.asQpidByteBuffers(new ByteArrayInputStream(dataForTwoBufs)); - assertEquals("Unexpected number of bufs", 2, qpidByteBuffers.size()); - Iterator<QpidByteBuffer> itr = qpidByteBuffers.iterator(); - assertEquals("Unexpected remaining in first buf", 10, itr.next().remaining()); - assertEquals("Unexpected remaining in second buf", 1, itr.next().remaining()); + try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.asQpidByteBuffer(new ByteArrayInputStream(dataForTwoBufs))) + { + assertEquals("Unexpected remaining in buf", 11, qpidByteBuffer.remaining()); + } + + try (QpidByteBuffer bufsForEmptyBytes = QpidByteBuffer.asQpidByteBuffer(new ByteArrayInputStream(new byte[]{}))) + { + assertEquals("Unexpected remaining in buf for empty buffer", 0, bufsForEmptyBytes.remaining()); + } + } + + public void testConcatenate() throws Exception + { + try (QpidByteBuffer buffer1 = QpidByteBuffer.allocateDirect(10); + QpidByteBuffer buffer2 = QpidByteBuffer.allocateDirect(10)) + { + final short buffer1Value = (short) (1 << 15); + buffer1.putShort(buffer1Value); + buffer1.flip(); + final char buffer2Value = 'x'; + buffer2.putChar(2, buffer2Value); + buffer2.position(4); + buffer2.flip(); - Collection<QpidByteBuffer> bufsForEmptyBytes = QpidByteBuffer.asQpidByteBuffers(new ByteArrayInputStream(new byte[]{})); - assertEquals("Unexpected number of bufs for empty buffer", 0, bufsForEmptyBytes.size()); + try (QpidByteBuffer concatenate = QpidByteBuffer.concatenate(buffer1, buffer2)) + { + assertEquals("Unexpected capacity", 6, concatenate.capacity()); + assertEquals("Unexpected position", 0, concatenate.position()); + assertEquals("Unexpected limit", 6, concatenate.limit()); + assertEquals("Unexpected value 1", buffer1Value, concatenate.getShort()); + assertEquals("Unexpected value 2", buffer2Value, concatenate.getChar(4)); + } + } } private void doDeflateInflate(byte[] input, - Collection<QpidByteBuffer> inputBufs, + QpidByteBuffer inputBuf, boolean direct) throws IOException { - Collection<QpidByteBuffer> deflatedBufs = QpidByteBuffer.deflate(inputBufs); - assertNotNull(deflatedBufs); + try (QpidByteBuffer deflatedBuf = QpidByteBuffer.deflate(inputBuf)) + { + assertNotNull(deflatedBuf); - Collection<QpidByteBuffer> inflatedBufs = QpidByteBuffer.inflate(deflatedBufs); - assertNotNull(inflatedBufs); - assertTrue("Expected at least on buffer", inflatedBufs.size() >= 1); + try (QpidByteBuffer inflatedBuf = QpidByteBuffer.inflate(deflatedBuf)) + { + assertNotNull(inflatedBuf); - int bufNum = 1; - int inputOffset = 0; - int inflatedBytesTotal = 0; - for(QpidByteBuffer inflatedBuf : inflatedBufs) - { - int inflatedBytesCount = inflatedBuf.remaining(); - inflatedBytesTotal += inflatedBytesCount; + int inflatedBytesCount = inflatedBuf.remaining(); - byte[] inflatedBytes = new byte[inflatedBytesCount]; - inflatedBuf.get(inflatedBytes); - byte[] expectedBytes = Arrays.copyOfRange(input, inputOffset, inputOffset + inflatedBytes.length); - Assert.assertArrayEquals("Inflated buf" + bufNum + " has unexpected content", expectedBytes, inflatedBytes); + byte[] inflatedBytes = new byte[inflatedBytesCount]; + inflatedBuf.get(inflatedBytes); + byte[] expectedBytes = Arrays.copyOfRange(input, 0, inflatedBytes.length); + Assert.assertArrayEquals("Inflated buf has unexpected content", expectedBytes, inflatedBytes); - inputOffset += inflatedBytes.length; - bufNum++; + assertEquals("Unexpected number of inflated bytes", input.length, inflatedBytesCount); + } } - - assertEquals("Unexpected number of inflated bytes", input.length, inflatedBytesTotal); } }
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java index 8a85614..40241c9 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/FieldTableTest.java @@ -598,11 +598,11 @@ public class FieldTableTest extends QpidTestCase buf.flip(); long length = buf.getInt() & 0xFFFFFFFFL; - buf = buf.slice(); - buf.limit((int)length); + QpidByteBuffer bufSlice = buf.slice(); + bufSlice.limit((int)length); - FieldTable table2 = new FieldTable(buf); + FieldTable table2 = new FieldTable(bufSlice); Assert.assertEquals((Boolean) true, table2.getBoolean("bool")); Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte")); @@ -615,6 +615,10 @@ public class FieldTableTest extends QpidTestCase Assert.assertEquals(Short.valueOf(Short.MAX_VALUE), table2.getShort("short")); Assert.assertEquals("hello", table2.getString("string")); Assert.assertNull(table2.getString("null-string")); + buf.dispose(); + bufSlice.dispose(); + table.dispose(); + table2.dispose(); } public void testEncodingSize() http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java b/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java index 19c16e4..978008e 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java @@ -19,10 +19,8 @@ package org.apache.qpid.server.security; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; @@ -31,19 +29,18 @@ import java.io.ObjectInputStream; import java.security.cert.Certificate; import java.security.cert.CertificateEncodingException; import java.util.ArrayList; -import java.util.Collection; import java.util.EnumSet; import java.util.List; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.consumer.ConsumerOption; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.message.MessageContainer; import org.apache.qpid.server.message.MessageInstanceConsumer; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.message.MessageContainer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.test.utils.QpidTestCase; @@ -96,19 +93,16 @@ public class TrustStoreMessageSourceTest extends QpidTestCase { final int bodySize = (int) message.getSize(); byte[] msgContent = new byte[bodySize]; - final Collection<QpidByteBuffer> allData = message.getStoredMessage().getContent(0, bodySize); - int total = 0; - for(QpidByteBuffer b : allData) + final List<String> certificates; + final ByteArrayInputStream bytesIn; + try (QpidByteBuffer allData = message.getStoredMessage().getContent(0, bodySize)) { - int len = b.remaining(); - b.get(msgContent, total, len); - b.dispose(); - total += len; + assertEquals("Unexpected message size was retrieved", bodySize, allData.remaining()); + allData.get(msgContent); } - assertEquals("Unexpected message size was retrieved", bodySize, total); - List<String> certificates = new ArrayList<>(); - ByteArrayInputStream bytesIn = new ByteArrayInputStream(msgContent); + certificates = new ArrayList<>(); + bytesIn = new ByteArrayInputStream(msgContent); try (ObjectInputStream is = new ObjectInputStream(bytesIn)) { ArrayList<byte[]> encodedCertificates = (ArrayList<byte[]>) is.readObject(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index f2750f4..7f7f16a 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -366,18 +366,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3)); } - public void testStoreIgnoresTransientMessage() throws Exception - { - long messageId = 1; - int contentSize = 0; - final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)).allContentAdded(); - - MessageHandler handler = mock(MessageHandler.class); - _storeReader.visitMessages(handler); - - verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId))); - } - public void testAddAndRemoveMessageWithoutContent() throws Exception { long messageId = 1; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java index ee67dd2..1abf0c0 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java +++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataFactory.java @@ -20,19 +20,16 @@ */ package org.apache.qpid.server.store; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.plugin.MessageMetaDataType; public class TestMessageMetaDataFactory implements MessageMetaDataType.Factory<TestMessageMetaData> { @Override - public TestMessageMetaData createMetaData(List<QpidByteBuffer> bufs) + public TestMessageMetaData createMetaData(QpidByteBuffer buf) { - long id = QpidByteBufferUtils.getLong(bufs); - int size = QpidByteBufferUtils.getInt(bufs); + long id = buf.getLong(); + int size = buf.getInt(); return new TestMessageMetaData(id, size); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index 50d9428..231dd35 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.server.store; -import java.util.Collection; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; @@ -42,9 +39,9 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM } @Override - public TestMessageMetaData createMetaData(List<QpidByteBuffer> bufs) + public TestMessageMetaData createMetaData(QpidByteBuffer buf) { - return TestMessageMetaData.FACTORY.createMetaData(bufs); + return TestMessageMetaData.FACTORY.createMetaData(buf); } @Override @@ -108,7 +105,13 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM } @Override - public Collection<QpidByteBuffer> getContent(int offset, int length) + public QpidByteBuffer getContent() + { + return null; + } + + @Override + public QpidByteBuffer getContent(int offset, int length) { return null; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index da61a64..3ccc94b 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.txn; -import java.util.Collection; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; @@ -127,7 +125,13 @@ class MockServerMessage implements ServerMessage } @Override - public Collection<QpidByteBuffer> getContent(int offset, int length) + public QpidByteBuffer getContent() + { + throw new UnsupportedOperationException(); + } + + @Override + public QpidByteBuffer getContent(int offset, int length) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java index d5e184d..9af3300 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java @@ -58,5 +58,9 @@ public class VirtualHostPropertiesNodeTest extends QpidTestCase MessageInstanceConsumer consumer = _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0); final MessageContainer messageContainer = consumer.pullMessage(); assertNotNull("Could not pull message from VirtualHostPropertyNode", messageContainer); + if (messageContainer.getMessageReference() != null) + { + messageContainer.getMessageReference().release(); + } } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index e342db7..ce5551f 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -21,10 +21,8 @@ package org.apache.qpid.server.protocol.v0_10; import java.io.IOException; -import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,16 +37,9 @@ import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState; import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.message.MessageInstanceConsumer; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties; import org.apache.qpid.server.protocol.v0_10.transport.Header; import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode; @@ -59,8 +50,13 @@ import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer; import org.apache.qpid.server.protocol.v0_10.transport.Method; import org.apache.qpid.server.protocol.v0_10.transport.Option; -import org.apache.qpid.server.util.ByteBufferUtils; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.GZIPUtils; +import org.apache.qpid.server.util.StateChangeListener; public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0_10> { @@ -254,50 +250,41 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0 boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding()); - Collection<QpidByteBuffer> bodyBuffers = msg.getBody(); + QpidByteBuffer bodyBuffer = msg.getBody(); boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported(); - if(msgCompressed && !compressionSupported && bodyBuffers != null) + if(msgCompressed && !compressionSupported && bodyBuffer != null) { - Collection<QpidByteBuffer> uncompressedBuffers = inflateIfPossible(bodyBuffers); + QpidByteBuffer uncompressedBuffer = inflateIfPossible(bodyBuffer); messageProps.setContentEncoding(null); - for (QpidByteBuffer buf : bodyBuffers) - { - buf.dispose(); - } - bodyBuffers = uncompressedBuffers; + bodyBuffer.dispose(); + bodyBuffer = uncompressedBuffer; } else if(!msgCompressed && compressionSupported && (messageProps == null || messageProps.getContentEncoding() == null) - && bodyBuffers != null - && ByteBufferUtils.remaining(bodyBuffers) > _session.getConnection().getMessageCompressionThreshold()) + && bodyBuffer != null + && bodyBuffer.remaining() > _session.getConnection().getMessageCompressionThreshold()) { - Collection<QpidByteBuffer> compressedBuffers = deflateIfPossible(bodyBuffers); + QpidByteBuffer compressedBuffers = deflateIfPossible(bodyBuffer); if(messageProps == null) { messageProps = new MessageProperties(); } messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING); - for (QpidByteBuffer buf : bodyBuffers) - { - buf.dispose(); - } - bodyBuffers = compressedBuffers; + bodyBuffer.dispose(); + bodyBuffer = compressedBuffers; } Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, bodyBuffers, BATCHED) - : new MessageTransfer(_name,_acceptMode,_acquireMode,header, bodyBuffers); - if (bodyBuffers != null) + xfr = batch ? new MessageTransfer(_name, _acceptMode, _acquireMode, header, bodyBuffer, BATCHED) + : new MessageTransfer(_name, _acceptMode, _acquireMode, header, bodyBuffer); + if (bodyBuffer != null) { - for (QpidByteBuffer buf : bodyBuffers) - { - buf.dispose(); - } - bodyBuffers = null; + bodyBuffer.dispose(); + bodyBuffer = null; } if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) { @@ -612,11 +599,11 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0 } - private Collection<QpidByteBuffer> deflateIfPossible(final Collection<QpidByteBuffer> buffers) + private QpidByteBuffer deflateIfPossible(final QpidByteBuffer buffer) { try { - return QpidByteBuffer.deflate(buffers); + return QpidByteBuffer.deflate(buffer); } catch (IOException e) { @@ -625,11 +612,11 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0 } } - private Collection<QpidByteBuffer> inflateIfPossible(final Collection<QpidByteBuffer> buffers) + private QpidByteBuffer inflateIfPossible(final QpidByteBuffer buffer) { try { - return QpidByteBuffer.inflate(buffers); + return QpidByteBuffer.inflate(buffer); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index 4ffbd3f..114b0f2 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.protocol.v0_10; import static java.nio.charset.StandardCharsets.UTF_8; -import java.util.Collection; -import java.util.Collections; import java.util.UUID; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; @@ -107,9 +105,9 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte } @Override - public Collection<QpidByteBuffer> getContent(final int offset, final int length) + public QpidByteBuffer getContent(final int offset, final int length) { - return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length)); + return QpidByteBuffer.wrap(messageContent, offset, length); } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index dfdbc9a..8c9db87 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.protocol.v0_10; import static java.nio.charset.StandardCharsets.UTF_8; -import java.util.Collection; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.NamedAddressSpace; @@ -83,7 +81,7 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M } @Override - public Collection<QpidByteBuffer> getContent(final int offset, final int length) + public QpidByteBuffer getContent(final int offset, final int length) { return serverMsg.getContent(offset, length); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java index 0610d09..8121dd6 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java @@ -67,13 +67,9 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess { final String mimeType = serverMessage.getMessageHeader().getMimeType(); byte[] data = new byte[(int) serverMessage.getSize()]; - int total = 0; - for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize())) + try (QpidByteBuffer content = serverMessage.getContent()) { - int len = b.remaining(); - b.get(data, total, len); - b.dispose(); - total += len; + content.get(data); } String encoding = serverMessage.getMessageHeader().getEncoding(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java index f74e291..de9afcc 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageFormat_0_10.java @@ -27,14 +27,14 @@ import java.util.List; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.plugin.MessageFormat; import org.apache.qpid.server.plugin.PluggableService; -import org.apache.qpid.server.store.MessageHandle; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties; import org.apache.qpid.server.protocol.v0_10.transport.Header; import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; import org.apache.qpid.server.protocol.v0_10.transport.Struct; +import org.apache.qpid.server.store.MessageHandle; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @PluggableService public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage> @@ -63,7 +63,7 @@ public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage> // format: <int header count> <headers> <body> @Override - public List<QpidByteBuffer> convertToMessageFormat(final MessageTransferMessage message) + public QpidByteBuffer convertToMessageFormat(final MessageTransferMessage message) { ServerEncoder encoder = new ServerEncoder(4096, true); Struct[] structs = message.getHeader().getStructs(); @@ -72,22 +72,21 @@ public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage> { encoder.writeStruct32(struct); } - final QpidByteBuffer headerBuf = encoder.getBuffer(); - List<QpidByteBuffer> bufs = new ArrayList<>(); - bufs.add(headerBuf); - bufs.addAll(message.getContent(0, (int) message.getSize())); - - return bufs; + try (QpidByteBuffer headerBuf = encoder.getBuffer(); + QpidByteBuffer content = message.getContent()) + { + return QpidByteBuffer.concatenate(headerBuf, content); + } } @Override - public MessageTransferMessage createMessage(final List<QpidByteBuffer> buf, + public MessageTransferMessage createMessage(final QpidByteBuffer payload, final MessageStore store, final Object connectionReference) { try { - ServerDecoder serverDecoder = new ServerDecoder(buf); + ServerDecoder serverDecoder = new ServerDecoder(payload); int headerCount = serverDecoder.readInt32(); DeliveryProperties deliveryProperties = null; MessageProperties messageProperties = null; @@ -113,20 +112,11 @@ public class MessageFormat_0_10 implements MessageFormat<MessageTransferMessage> } } Header header = new Header(deliveryProperties, messageProperties, nonStandard); - int bodySize = 0; - for(QpidByteBuffer content : buf) - { - bodySize += content.remaining(); - } - MessageMetaData_0_10 metaData = new MessageMetaData_0_10(header, bodySize, System.currentTimeMillis()); + MessageMetaData_0_10 metaData = new MessageMetaData_0_10(header, + payload.remaining(), + System.currentTimeMillis()); final MessageHandle<MessageMetaData_0_10> handle = store.addMessage(metaData); - for (QpidByteBuffer content : buf) - { - if (content.hasRemaining()) - { - handle.addContent(content); - } - } + handle.addContent(payload); final StoredMessage<MessageMetaData_0_10> storedMessage = handle.allContentAdded(); return new MessageTransferMessage(storedMessage, connectionReference); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java index 2dcb5f5..bc3cbed 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.protocol.v0_10; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageMetaDataType; @@ -42,9 +40,9 @@ public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMeta } @Override - public MessageMetaData_0_10 createMetaData(List<QpidByteBuffer> bufs) + public MessageMetaData_0_10 createMetaData(QpidByteBuffer buf) { - return MessageMetaData_0_10.FACTORY.createMetaData(bufs); + return MessageMetaData_0_10.FACTORY.createMetaData(buf); } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java index 2861cc2..d51b158 100755 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java @@ -263,7 +263,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10> { @Override - public MessageMetaData_0_10 createMetaData(List<QpidByteBuffer> buf) + public MessageMetaData_0_10 createMetaData(QpidByteBuffer buf) { ServerDecoder decoder = new ServerDecoder(buf); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java index e6b2c8d..ba35c01 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java @@ -20,14 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_10; -import java.util.Collection; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; +import org.apache.qpid.server.protocol.v0_10.transport.Header; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.protocol.v0_10.transport.Header; public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> @@ -102,8 +100,8 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra return getMetaData().getHeader(); } - public Collection<QpidByteBuffer> getBody() + public QpidByteBuffer getBody() { - return getContent(0, (int) getSize()); + return getContent(); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java index f3dde0d..abe0842 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java @@ -26,9 +26,7 @@ import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties; +import org.apache.qpid.server.protocol.v0_10.transport.Frame; import org.apache.qpid.server.protocol.v0_10.transport.Header; import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; import org.apache.qpid.server.protocol.v0_10.transport.Method; @@ -46,7 +45,6 @@ import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError; import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent; import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader; import org.apache.qpid.server.protocol.v0_10.transport.Struct; -import org.apache.qpid.server.protocol.v0_10.transport.Frame; public class ServerAssembler { @@ -77,35 +75,54 @@ public class ServerAssembler { PeekingIterator<ServerFrame> itr = Iterators.peekingIterator(frames.iterator()); - while(itr.hasNext()) + boolean cleanExit = false; + try { - final ServerFrame frame = itr.next(); - final int frameChannel = frame.getChannel(); - - ServerSession channel = _connection.getSession(frameChannel); - if (channel != null) + while(itr.hasNext()) { - final AccessControlContext context = channel.getAccessControllerContext(); - AccessController.doPrivileged((PrivilegedAction<Void>) () -> + final ServerFrame frame = itr.next(); + final int frameChannel = frame.getChannel(); + + ServerSession channel = _connection.getSession(frameChannel); + if (channel != null) { - ServerFrame channelFrame = frame; - boolean nextIsSameChannel; - do + final AccessControlContext context = channel.getAccessControllerContext(); + AccessController.doPrivileged((PrivilegedAction<Void>) () -> { - received(channelFrame); - nextIsSameChannel = itr.hasNext() && frameChannel == itr.peek().getChannel(); - if (nextIsSameChannel) + ServerFrame channelFrame = frame; + boolean nextIsSameChannel; + do { - channelFrame = itr.next(); + received(channelFrame); + nextIsSameChannel = itr.hasNext() && frameChannel == itr.peek().getChannel(); + if (nextIsSameChannel) + { + channelFrame = itr.next(); + } } - } - while (nextIsSameChannel); - return null; - }, context); + while (nextIsSameChannel); + return null; + }, context); + } + else + { + received(frame); + } } - else + cleanExit = true; + } + finally + { + if (!cleanExit) { - received(frame); + while (itr.hasNext()) + { + final QpidByteBuffer body = itr.next().getBody(); + if (body != null) + { + body.dispose(); + } + } } } } @@ -183,11 +200,9 @@ public class ServerAssembler public void frame(ServerFrame frame) { - List<QpidByteBuffer> frameBuffers; if (frame.isFirstFrame() && frame.isLastFrame()) { - frameBuffers = Collections.singletonList(frame.getBody()); - assemble(frame, frameBuffers); + assemble(frame, frame.getBody()); } else { @@ -207,98 +222,103 @@ public class ServerAssembler if (frame.isLastFrame()) { clearSegment(frame); - frameBuffers = new ArrayList<>(frames.size()); + List<QpidByteBuffer> frameBuffers = new ArrayList<>(frames.size()); for (ServerFrame f : frames) { - frameBuffers.add(f.getBody()); } - assemble(frame, frameBuffers); + QpidByteBuffer combined = QpidByteBuffer.concatenate(frameBuffers); + for (QpidByteBuffer buffer : frameBuffers) + { + buffer.dispose(); + } + assemble(frame, combined); } } } - private void assemble(ServerFrame frame, List<QpidByteBuffer> frameBuffers) + private void assemble(ServerFrame frame, QpidByteBuffer frameBuffer) { - ServerDecoder dec = new ServerDecoder(frameBuffers); + try + { + ServerDecoder dec = new ServerDecoder(frameBuffer); - int channel = frame.getChannel(); - Method command; + int channel = frame.getChannel(); + Method command; - switch (frame.getType()) - { - case CONTROL: - int controlType = dec.readUint16(); - Method control = Method.create(controlType); - control.read(dec); - emit(channel, control); - break; - case COMMAND: - int commandType = dec.readUint16(); - // read in the session header, right now we don't use it - int hdr = dec.readUint16(); - command = Method.create(commandType); - command.setSync((0x0001 & hdr) != 0); - command.read(dec); - if (command.hasPayload() && !frame.isLastSegment()) - { - setIncompleteCommand(channel, command); - } - else - { - emit(channel, command); - } - break; - case HEADER: - command = getIncompleteCommand(channel); - List<Struct> structs = null; - DeliveryProperties deliveryProps = null; - MessageProperties messageProps = null; - - while (dec.hasRemaining()) - { - Struct struct = dec.readStruct32(); - if(struct instanceof DeliveryProperties && deliveryProps == null) + switch (frame.getType()) + { + case CONTROL: + int controlType = dec.readUint16(); + Method control = Method.create(controlType); + control.read(dec); + emit(channel, control); + break; + case COMMAND: + int commandType = dec.readUint16(); + // read in the session header, right now we don't use it + int hdr = dec.readUint16(); + command = Method.create(commandType); + command.setSync((0x0001 & hdr) != 0); + command.read(dec); + if (command.hasPayload() && !frame.isLastSegment()) { - deliveryProps = (DeliveryProperties) struct; + setIncompleteCommand(channel, command); } - else if(struct instanceof MessageProperties && messageProps == null) + else { - messageProps = (MessageProperties) struct; + emit(channel, command); } - else + break; + case HEADER: + command = getIncompleteCommand(channel); + List<Struct> structs = null; + DeliveryProperties deliveryProps = null; + MessageProperties messageProps = null; + + while (dec.hasRemaining()) { - if(structs == null) + Struct struct = dec.readStruct32(); + if (struct instanceof DeliveryProperties && deliveryProps == null) + { + deliveryProps = (DeliveryProperties) struct; + } + else if (struct instanceof MessageProperties && messageProps == null) + { + messageProps = (MessageProperties) struct; + } + else { - structs = new ArrayList<>(2); + if (structs == null) + { + structs = new ArrayList<>(2); + } + structs.add(struct); } - structs.add(struct); } + command.setHeader(new Header(deliveryProps, messageProps, structs)); - } - command.setHeader(new Header(deliveryProps,messageProps,structs)); - - if (frame.isLastSegment()) - { + if (frame.isLastSegment()) + { + setIncompleteCommand(channel, null); + emit(channel, command); + } + break; + case BODY: + command = getIncompleteCommand(channel); + command.setBody(frameBuffer); setIncompleteCommand(channel, null); emit(channel, command); - } - break; - case BODY: - command = getIncompleteCommand(channel); - command.setBody(frameBuffers); - setIncompleteCommand(channel, null); - emit(channel, command); - - break; - default: - throw new IllegalStateException("unknown frame type: " + frame.getType()); - } - for(QpidByteBuffer buf : frameBuffers) + break; + default: + throw new IllegalStateException("unknown frame type: " + frame.getType()); + } + } + finally { - buf.dispose(); + frameBuffer.dispose(); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 11e2ab5..8032b35 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -61,13 +61,15 @@ import org.apache.qpid.server.protocol.v0_10.transport.ConnectionCloseOk; import org.apache.qpid.server.protocol.v0_10.transport.ConnectionException; import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode; import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException; +import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer; import org.apache.qpid.server.protocol.v0_10.transport.Method; import org.apache.qpid.server.protocol.v0_10.transport.Option; import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent; import org.apache.qpid.server.protocol.v0_10.transport.SessionDetachCode; import org.apache.qpid.server.protocol.v0_10.transport.SessionDetached; import org.apache.qpid.server.session.AMQPSession; -import org.apache.qpid.server.transport.*; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.transport.ServerNetworkConnection; import org.apache.qpid.server.transport.network.NetworkConnection; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -276,6 +278,13 @@ public class ServerConnection extends ConnectionInvoker } event.delegate(this, delegate); } + else + { + if (event instanceof MessageTransfer) + { + ((MessageTransfer) event).dispose(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java index 04819f0..5d26d59 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java @@ -20,137 +20,58 @@ */ package org.apache.qpid.server.protocol.v0_10; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v0_10.transport.AbstractDecoder; final class ServerDecoder extends AbstractDecoder { - private final List<QpidByteBuffer> _underlying; - private int _bufferIndex; + private final QpidByteBuffer _underlying; - ServerDecoder(List<QpidByteBuffer> in) + ServerDecoder(QpidByteBuffer in) { _underlying = in; - _bufferIndex = 0; - } - - private void advanceIfNecessary() - { - while(!getCurrentBuffer().hasRemaining() && _bufferIndex != _underlying.size()-1) - { - _bufferIndex++; - } - } - - private QpidByteBuffer getBuffer(int size) - { - advanceIfNecessary(); - final QpidByteBuffer currentBuffer = getCurrentBuffer(); - if(currentBuffer.remaining()>= size) - { - return currentBuffer; - } - else - { - return readAsQpidByteBuffer(size); - } - } - - private QpidByteBuffer readAsQpidByteBuffer(int len) - { - QpidByteBuffer currentBuffer = getCurrentBuffer(); - if(currentBuffer.remaining()>=len) - { - QpidByteBuffer buf = currentBuffer.slice(); - buf.limit(len); - currentBuffer.position(currentBuffer.position()+len); - return buf; - } - else - { - QpidByteBuffer dest = QpidByteBuffer.allocate(len); - while(dest.hasRemaining() && available()>0) - { - advanceIfNecessary(); - currentBuffer = getCurrentBuffer(); - final int remaining = dest.remaining(); - if(currentBuffer.remaining()>= remaining) - { - QpidByteBuffer buf = currentBuffer.slice(); - buf.limit(remaining); - currentBuffer.position(currentBuffer.position()+remaining); - dest.put(buf); - buf.dispose(); - } - else - { - dest.put(currentBuffer); - } - } - - dest.flip(); - return dest; - } - } - - private int available() - { - int remaining = 0; - for(int i = _bufferIndex; i < _underlying.size(); i++) - { - remaining += _underlying.get(i).remaining(); - } - return remaining; - } - - - private QpidByteBuffer getCurrentBuffer() - { - return _underlying.get(_bufferIndex); } @Override protected byte doGet() { - return getBuffer(1).get(); + return _underlying.get(); } @Override protected void doGet(byte[] bytes) { - getBuffer(bytes.length).get(bytes); + _underlying.get(bytes); } @Override public boolean hasRemaining() { - return available() != 0; + return _underlying.remaining() != 0; } @Override public short readUint8() { - return (short) (0xFF & getBuffer(1).get()); + return _underlying.getUnsignedByte(); } @Override public int readUint16() { - return 0xFFFF & getBuffer(2).getShort(); + return _underlying.getUnsignedShort(); } @Override public long readUint32() { - return 0xFFFFFFFFL & getBuffer(4).getInt(); + return _underlying.getUnsignedInt(); } @Override public long readUint64() { - return getBuffer(8).getLong(); + return _underlying.getLong(); } @Override @@ -172,44 +93,44 @@ final class ServerDecoder extends AbstractDecoder @Override public double readDouble() { - return getBuffer(8).getDouble(); + return _underlying.getDouble(); } @Override public float readFloat() { - return getBuffer(4).getFloat(); + return _underlying.getFloat(); } @Override public short readInt16() { - return getBuffer(2).getShort(); + return _underlying.getShort(); } @Override public int readInt32() { - return getBuffer(4).getInt(); + return _underlying.getInt(); } @Override public byte readInt8() { - return getBuffer(1).get(); + return _underlying.get(); } @Override public byte[] readRemainingBytes() { - byte[] result = new byte[available()]; - get(result); - return result; - } + byte[] result = new byte[_underlying.remaining()]; + get(result); + return result; + } @Override public long readInt64() { - return getBuffer(8).getLong(); + return _underlying.getLong(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org