QPID-7832: [Java Broker] Introduce QpidByteBuffer interface and split implementation into separate classes
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e64e2826 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e64e2826 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e64e2826 Branch: refs/heads/master Commit: e64e282688da8963133dc09c630482c686d8be7a Parents: cd12e2d Author: Alex Rudyy <oru...@apache.org> Authored: Tue Oct 24 14:37:51 2017 +0100 Committer: Alex Rudyy <oru...@apache.org> Committed: Tue Oct 24 17:06:38 2017 +0100 ---------------------------------------------------------------------- .../server/bytebuffer/MultiQpidByteBuffer.java | 1020 +++++++++++ .../server/bytebuffer/PooledByteBufferRef.java | 2 - .../qpid/server/bytebuffer/QpidByteBuffer.java | 1646 ++---------------- .../bytebuffer/QpidByteBufferFactory.java | 524 ++++++ .../bytebuffer/QpidByteBufferInputStream.java | 99 ++ .../bytebuffer/QpidByteBufferOutputStream.java | 2 +- .../server/bytebuffer/SingleQpidByteBuffer.java | 598 +++++++ .../qpid/server/store/StoredMemoryMessage.java | 5 - .../NonBlockingConnectionTLSDelegate.java | 2 +- .../server/bytebuffer/QpidByteBufferTest.java | 84 +- .../server/protocol/v0_10/ServerEncoder.java | 2 +- 11 files changed, 2454 insertions(+), 1530 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java new file mode 100644 index 0000000..1209abf --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/MultiQpidByteBuffer.java @@ -0,0 +1,1020 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.bytebuffer; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.InvalidMarkException; +import java.nio.channels.ScatteringByteChannel; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.primitives.Chars; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; + +class MultiQpidByteBuffer implements QpidByteBuffer +{ + private final SingleQpidByteBuffer[] _fragments; + private volatile int _resetFragmentIndex = -1; + + private MultiQpidByteBuffer(final SingleQpidByteBuffer... fragments) + { + if (fragments == null) + { + throw new IllegalArgumentException(); + } + _fragments = fragments; + } + + MultiQpidByteBuffer(final List<SingleQpidByteBuffer> fragments) + { + if (fragments == null) + { + throw new IllegalArgumentException(); + } + _fragments = fragments.toArray(new SingleQpidByteBuffer[fragments.size()]); + } + + ////////////////// + // Absolute puts + ////////////////// + + @Override + public QpidByteBuffer put(final int index, final byte b) + { + return put(index, new byte[]{b}); + } + + @Override + public QpidByteBuffer putShort(final int index, final short value) + { + byte[] valueArray = Shorts.toByteArray(value); + return put(index, valueArray); + } + + @Override + public QpidByteBuffer putChar(final int index, final char value) + { + byte[] valueArray = Chars.toByteArray(value); + return put(index, valueArray); + } + + @Override + public QpidByteBuffer putInt(final int index, final int value) + { + byte[] valueArray = Ints.toByteArray(value); + return put(index, valueArray); + } + + @Override + public QpidByteBuffer putLong(final int index, final long value) + { + byte[] valueArray = Longs.toByteArray(value); + return put(index, valueArray); + } + + @Override + public QpidByteBuffer putFloat(final int index, final float value) + { + int intValue = Float.floatToRawIntBits(value); + return putInt(index, intValue); + } + + @Override + public QpidByteBuffer putDouble(final int index, final double value) + { + long longValue = Double.doubleToRawLongBits(value); + return putLong(index, longValue); + } + + private QpidByteBuffer put(final int index, final byte[] src) + { + final int valueWidth = src.length; + if (index < 0 || index + valueWidth > limit()) + { + throw new IndexOutOfBoundsException(String.format("index %d is out of bounds [%d, %d)", index, 0, limit())); + } + + int written = 0; + int bytesToSkip = index; + for (int i = 0; i < _fragments.length && written != valueWidth; i++) + { + final SingleQpidByteBuffer buffer = _fragments[i]; + final int limit = buffer.limit(); + boolean isLastFragmentToConsider = valueWidth + bytesToSkip - written <= limit; + if (!isLastFragmentToConsider && limit != buffer.capacity()) + { + throw new IllegalStateException(String.format("Unexpected limit %d on fragment %d", limit, i)); + } + + if (bytesToSkip >= limit) + { + bytesToSkip -= limit; + } + else + { + final int bytesToCopy = Math.min(limit - bytesToSkip, valueWidth - written); + final int originalPosition = buffer.position(); + buffer.position(bytesToSkip); + buffer.put(src, written, bytesToCopy); + buffer.position(originalPosition); + written += bytesToCopy; + bytesToSkip = 0; + } + } + if (valueWidth != written) + { + throw new BufferOverflowException(); + } + return this; + } + + //////////////// + // Relative Puts + //////////////// + + @Override + public final QpidByteBuffer put(final byte b) + { + return put(new byte[]{b}); + } + + @Override + public final QpidByteBuffer putUnsignedByte(final short s) + { + put((byte) s); + return this; + } + + @Override + public final QpidByteBuffer putShort(final short value) + { + byte[] valueArray = Shorts.toByteArray(value); + return put(valueArray); + } + + @Override + public final QpidByteBuffer putUnsignedShort(final int i) + { + putShort((short) i); + return this; + } + + @Override + public final QpidByteBuffer putChar(final char value) + { + byte[] valueArray = Chars.toByteArray(value); + return put(valueArray); + } + + @Override + public final QpidByteBuffer putInt(final int value) + { + byte[] valueArray = Ints.toByteArray(value); + return put(valueArray); + } + + @Override + public final QpidByteBuffer putUnsignedInt(final long value) + { + putInt((int) value); + return this; + } + + @Override + public final QpidByteBuffer putLong(final long value) + { + byte[] valueArray = Longs.toByteArray(value); + return put(valueArray); + } + + @Override + public final QpidByteBuffer putFloat(final float value) + { + int intValue = Float.floatToRawIntBits(value); + return putInt(intValue); + } + + @Override + public final QpidByteBuffer putDouble(final double value) + { + long longValue = Double.doubleToRawLongBits(value); + return putLong(longValue); + } + + @Override + public final QpidByteBuffer put(byte[] src) + { + return put(src, 0, src.length); + } + + @Override + public final QpidByteBuffer put(final byte[] src, final int offset, final int length) + { + if (!hasRemaining(length)) + { + throw new BufferOverflowException(); + } + + int written = 0; + for (int i = 0; i < _fragments.length && written != length; i++) + { + final SingleQpidByteBuffer buffer = _fragments[i]; + int bytesToWrite = Math.min(buffer.remaining(), length - written); + buffer.put(src, offset + written, bytesToWrite); + written += bytesToWrite; + } + if (written != length) + { + throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, length)); + } + return this; + } + + @Override + public final QpidByteBuffer put(final ByteBuffer src) + { + final int valueWidth = src.remaining(); + if (!hasRemaining(valueWidth)) + { + throw new BufferOverflowException(); + } + + int written = 0; + for (int i = 0; i < _fragments.length && written != valueWidth; i++) + { + final SingleQpidByteBuffer dstFragment = _fragments[i]; + if (dstFragment.hasRemaining()) + { + final int srcFragmentRemaining = src.remaining(); + final int dstFragmentRemaining = dstFragment.remaining(); + if (dstFragmentRemaining >= srcFragmentRemaining) + { + dstFragment.put(src); + written += srcFragmentRemaining; + } + else + { + int srcOriginalLimit = src.limit(); + src.limit(src.position() + dstFragmentRemaining); + dstFragment.put(src); + src.limit(srcOriginalLimit); + written += dstFragmentRemaining; + } + } + } + if (written != valueWidth) + { + throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", written, valueWidth)); + } + return this; + } + + @Override + public final QpidByteBuffer put(final QpidByteBuffer qpidByteBuffer) + { + final int valueWidth = qpidByteBuffer.remaining(); + if (!hasRemaining(valueWidth)) + { + throw new BufferOverflowException(); + } + + int written = 0; + final SingleQpidByteBuffer[] fragments; + if (qpidByteBuffer instanceof SingleQpidByteBuffer) + { + final SingleQpidByteBuffer srcFragment = (SingleQpidByteBuffer) qpidByteBuffer; + for (int i = 0; i < _fragments.length && written != valueWidth; i++) + { + final SingleQpidByteBuffer dstFragment = _fragments[i]; + if (dstFragment.hasRemaining()) + { + final int dstFragmentRemaining = dstFragment.remaining(); + if (dstFragmentRemaining >= valueWidth) + { + dstFragment.put(srcFragment); + written += valueWidth; + } + else + { + int srcOriginalLimit = srcFragment.limit(); + srcFragment.limit(srcFragment.position() + dstFragmentRemaining); + dstFragment.put(srcFragment); + srcFragment.limit(srcOriginalLimit); + written += dstFragmentRemaining; + } + } + } + } + else if (qpidByteBuffer instanceof MultiQpidByteBuffer) + { + fragments = ((MultiQpidByteBuffer) qpidByteBuffer)._fragments; + int i = 0; + for (int i1 = 0; i1 < fragments.length; i1++) + { + final SingleQpidByteBuffer srcFragment = fragments[i1]; + for (; i < _fragments.length; i++) + { + final SingleQpidByteBuffer dstFragment = _fragments[i]; + if (dstFragment.hasRemaining()) + { + final int srcFragmentRemaining = srcFragment.remaining(); + final int dstFragmentRemaining = dstFragment.remaining(); + if (dstFragmentRemaining >= srcFragmentRemaining) + { + dstFragment.put(srcFragment); + written += srcFragmentRemaining; + break; + } + else + { + int srcOriginalLimit = srcFragment.limit(); + srcFragment.limit(srcFragment.position() + dstFragmentRemaining); + dstFragment.put(srcFragment); + srcFragment.limit(srcOriginalLimit); + written += dstFragmentRemaining; + } + } + } + } + } + else + { + throw new IllegalStateException("unknown QBB implementation"); + } + + if (written != valueWidth) + { + throw new IllegalStateException(String.format("Unexpectedly only wrote %d of %d bytes.", + written, + valueWidth)); + } + return this; + } + + /////////////////// + // Absolute Gets + /////////////////// + + @Override + public byte get(final int index) + { + final byte[] byteArray = getByteArray(index, 1); + return byteArray[0]; + } + + @Override + public short getShort(final int index) + { + final byte[] byteArray = getByteArray(index, 2); + return Shorts.fromByteArray(byteArray); + } + + @Override + public final int getUnsignedShort(int index) + { + return ((int) getShort(index)) & 0xFFFF; + } + + @Override + public char getChar(final int index) + { + final byte[] byteArray = getByteArray(index, 2); + return Chars.fromByteArray(byteArray); + } + + @Override + public int getInt(final int index) + { + final byte[] byteArray = getByteArray(index, 4); + return Ints.fromByteArray(byteArray); + } + + @Override + public long getLong(final int index) + { + final byte[] byteArray = getByteArray(index, 8); + return Longs.fromByteArray(byteArray); + } + + @Override + public float getFloat(final int index) + { + final int intValue = getInt(index); + return Float.intBitsToFloat(intValue); + } + + @Override + public double getDouble(final int index) + { + final long longValue = getLong(index); + return Double.longBitsToDouble(longValue); + } + + private byte[] getByteArray(final int index, final int length) + { + if (index < 0 || index + length > limit()) + { + throw new IndexOutOfBoundsException(String.format("%d bytes at index %d do not fit into bounds [%d, %d)", length, index, 0, limit())); + } + + byte[] value = new byte[length]; + + int consumed = 0; + int bytesToSkip = index; + for (int i = 0; i < _fragments.length && consumed != length; i++) + { + final SingleQpidByteBuffer buffer = _fragments[i]; + final int limit = buffer.limit(); + boolean isLastFragmentToConsider = length + bytesToSkip - consumed <= limit; + if (!isLastFragmentToConsider && limit != buffer.capacity()) + { + throw new IllegalStateException(String.format("Unexpectedly limit %d on fragment %d.", limit, i)); + } + + if (bytesToSkip >= limit) + { + bytesToSkip -= limit; + } + else + { + final int bytesToCopy = Math.min(limit - bytesToSkip, length - consumed); + final int originalPosition = buffer.position(); + buffer.position(bytesToSkip); + buffer.get(value, consumed, bytesToCopy); + buffer.position(originalPosition); + consumed += bytesToCopy; + bytesToSkip = 0; + } + } + if (consumed != length) + { + throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length)); + } + return value; + } + + ////////////////// + // Relative Gets + ////////////////// + + @Override + public final byte get() + { + byte[] value = new byte[1]; + get(value, 0, 1); + return value[0]; + } + + @Override + public final short getUnsignedByte() + { + return (short) (get() & 0xFF); + } + + @Override + public final short getShort() + { + byte[] value = new byte[2]; + get(value, 0, value.length); + return Shorts.fromByteArray(value); + } + + @Override + public final int getUnsignedShort() + { + return ((int) getShort()) & 0xFFFF; + } + + @Override + public final char getChar() + { + byte[] value = new byte[2]; + get(value, 0, value.length); + return Chars.fromByteArray(value); + } + + @Override + public final int getInt() + { + byte[] value = new byte[4]; + get(value, 0, value.length); + return Ints.fromByteArray(value); + } + + @Override + public final long getUnsignedInt() + { + return ((long) getInt()) & 0xFFFFFFFFL; + } + + @Override + public final long getLong() + { + byte[] value = new byte[8]; + get(value, 0, value.length); + return Longs.fromByteArray(value); + } + + @Override + public final float getFloat() + { + final int intValue = getInt(); + return Float.intBitsToFloat(intValue); + } + + @Override + public final double getDouble() + { + final long longValue = getLong(); + return Double.longBitsToDouble(longValue); + } + + @Override + public final QpidByteBuffer get(final byte[] dst) + { + return get(dst, 0, dst.length); + } + + @Override + public final QpidByteBuffer get(final byte[] dst, final int offset, final int length) + { + if (!hasRemaining(length)) + { + throw new BufferUnderflowException(); + } + + int consumed = 0; + for (int i = 0; i < _fragments.length && consumed != length; i++) + { + final SingleQpidByteBuffer buffer = _fragments[i]; + int bytesToCopy = Math.min(buffer.remaining(), length - consumed); + buffer.get(dst, offset + consumed, bytesToCopy); + consumed += bytesToCopy; + } + if (consumed != length) + { + throw new IllegalStateException(String.format("Unexpectedly only consumed %d of %d bytes.", consumed, length)); + } + return this; + } + + /////////////// + // Other stuff + //////////////// + + @Override + public final void copyTo(final byte[] dst) + { + final int remaining = remaining(); + if (remaining < dst.length) + { + throw new BufferUnderflowException(); + } + if (remaining > dst.length) + { + throw new BufferOverflowException(); + } + int offset = 0; + for (SingleQpidByteBuffer fragment : _fragments) + { + final int length = Math.min(fragment.remaining(), dst.length - offset); + fragment.getUnderlyingBuffer().duplicate().get(dst, offset, length); + offset += length; + } + } + + @Override + public final void copyTo(final ByteBuffer dst) + { + if (dst.remaining() < remaining()) + { + throw new BufferOverflowException(); + } + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + dst.put(fragment.getUnderlyingBuffer().duplicate()); + } + } + + @Override + public final void putCopyOf(final QpidByteBuffer qpidByteBuffer) + { + int sourceRemaining = qpidByteBuffer.remaining(); + if (!hasRemaining(sourceRemaining)) + { + throw new BufferOverflowException(); + } + if (qpidByteBuffer instanceof MultiQpidByteBuffer) + { + MultiQpidByteBuffer source = (MultiQpidByteBuffer) qpidByteBuffer; + for (int i = 0, fragmentsSize = source._fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer srcFragment = source._fragments[i]; + put(srcFragment.getUnderlyingBuffer().duplicate()); + } + } + else if (qpidByteBuffer instanceof SingleQpidByteBuffer) + { + SingleQpidByteBuffer source = (SingleQpidByteBuffer) qpidByteBuffer; + put(source.getUnderlyingBuffer().duplicate()); + } + else + { + throw new IllegalStateException("unknown QBB implementation"); + } + } + + @Override + public final boolean isDirect() + { + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + if (!fragment.isDirect()) + { + return false; + } + } + return true; + } + + @Override + public final void close() + { + dispose(); + } + + @Override + public final void dispose() + { + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + fragment.dispose(); + } + } + + @Override + public final InputStream asInputStream() + { + return new QpidByteBufferInputStream(this); + } + + @Override + public final long read(ScatteringByteChannel channel) throws IOException + { + ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length]; + for (int i = 0; i < byteBuffers.length; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + byteBuffers[i] = fragment.getUnderlyingBuffer(); + } + return channel.read(byteBuffers); + } + + @Override + public String toString() + { + return "QpidByteBuffer{" + _fragments.length + " fragments}"; + } + + @Override + public QpidByteBuffer reset() + { + if (_resetFragmentIndex < 0) + { + throw new InvalidMarkException(); + } + final SingleQpidByteBuffer fragment = _fragments[_resetFragmentIndex]; + fragment.reset(); + for (int i = _resetFragmentIndex + 1, size = _fragments.length; i < size; ++i) + { + _fragments[i].position(0); + } + return this; + } + + @Override + public QpidByteBuffer rewind() + { + _resetFragmentIndex = -1; + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + fragment.rewind(); + } + return this; + } + + @Override + public final boolean hasArray() + { + return false; + } + + @Override + public byte[] array() + { + throw new UnsupportedOperationException("This QpidByteBuffer is not backed by an array."); + } + + @Override + public QpidByteBuffer clear() + { + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + _fragments[i].clear(); + } + return this; + } + + @Override + public QpidByteBuffer compact() + { + int position = position(); + int limit = limit(); + if (position != 0) + { + int dstPos = 0; + for (int srcPos = position; srcPos < limit; ++srcPos, ++dstPos) + { + put(dstPos, get(srcPos)); + } + position(dstPos); + limit(capacity()); + } + _resetFragmentIndex = -1; + return this; + } + + @Override + public int position() + { + int totalPosition = 0; + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + totalPosition += fragment.position(); + if (fragment.position() != fragment.limit()) + { + break; + } + } + return totalPosition; + } + + @Override + public QpidByteBuffer position(int newPosition) + { + if (newPosition < 0 || newPosition > limit()) + { + throw new IllegalArgumentException(String.format("new position %d is out of bounds [%d, %d)", newPosition, 0, limit())); + } + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + final int fragmentLimit = fragment.limit(); + if (newPosition <= fragmentLimit) + { + fragment.position(newPosition); + newPosition = 0; + } + else + { + if (fragmentLimit != fragment.capacity()) + { + throw new IllegalStateException(String.format("QBB Fragment %d has limit %d != capacity %d", + i, + fragmentLimit, + fragment.capacity())); + } + fragment.position(fragmentLimit); + newPosition -= fragmentLimit; + } + } + return this; + } + + @Override + public int limit() + { + int totalLimit = 0; + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + final int fragmentLimit = fragment.limit(); + totalLimit += fragmentLimit; + if (fragmentLimit != fragment.capacity()) + { + break; + } + } + + return totalLimit; + } + + @Override + public QpidByteBuffer limit(int newLimit) + { + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + final int fragmentCapacity = fragment.capacity(); + final int fragmentLimit = Math.min(newLimit, fragmentCapacity); + fragment.limit(fragmentLimit); + newLimit -= fragmentLimit; + } + return this; + } + + @Override + public final QpidByteBuffer mark() + { + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + if (fragment.position() != fragment.limit()) + { + fragment.mark(); + _resetFragmentIndex = i; + return this; + } + } + _resetFragmentIndex = _fragments.length - 1; + _fragments[_resetFragmentIndex].mark(); + return this; + } + + @Override + public final int remaining() + { + int remaining = 0; + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + remaining += fragment.remaining(); + } + return remaining; + } + + @Override + public final boolean hasRemaining() + { + return hasRemaining(1); + } + + @Override + public final boolean hasRemaining(int atLeast) + { + if (atLeast == 0) + { + return true; + } + int remaining = 0; + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + remaining += fragment.remaining(); + if (remaining >= atLeast) + { + return true; + } + } + return false; + } + + @Override + public QpidByteBuffer flip() + { + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + fragment.flip(); + } + return this; + } + + @Override + public int capacity() + { + int totalCapacity = 0; + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + totalCapacity += _fragments[i].capacity(); + } + return totalCapacity; + } + + @Override + public QpidByteBuffer duplicate() + { + final SingleQpidByteBuffer[] fragments = new SingleQpidByteBuffer[_fragments.length]; + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + fragments[i] =_fragments[i].duplicate(); + } + MultiQpidByteBuffer duplicate = new MultiQpidByteBuffer(fragments); + duplicate._resetFragmentIndex = _resetFragmentIndex; + return duplicate; + } + + @Override + public QpidByteBuffer slice() + { + return view(0, remaining()); + } + + @Override + public QpidByteBuffer view(int offset, int length) + { + if (offset + length > remaining()) + { + throw new IllegalArgumentException(String.format("offset: %d, length: %d, remaining: %d", offset, length, remaining())); + } + + final List<SingleQpidByteBuffer> fragments = new ArrayList<>(_fragments.length); + + boolean firstFragmentToBeConsidered = true; + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize && length > 0; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + if (fragment.hasRemaining()) + { + if (!firstFragmentToBeConsidered && fragment.position() != 0) + { + throw new IllegalStateException(String.format("Unexpectedly position %d on fragment %d.", fragment.position(), i)); + } + firstFragmentToBeConsidered = false; + final int fragmentRemaining = fragment.remaining(); + if (fragmentRemaining > offset) + { + final int fragmentViewLength = Math.min(fragmentRemaining - offset, length); + fragments.add(fragment.view(offset, fragmentViewLength)); + length -= fragmentViewLength; + offset = 0; + } + else + { + offset -= fragmentRemaining; + } + } + } + + return QpidByteBufferFactory.createQpidByteBuffer(fragments); + } + + @Override + public boolean isSparse() + { + for (int i = 0, fragmentsSize = _fragments.length; i < fragmentsSize; i++) + { + final SingleQpidByteBuffer fragment = _fragments[i]; + if (fragment.isSparse()) + { + return true; + } + } + return false; + } + + SingleQpidByteBuffer[] getFragments() + { + return _fragments; + } + + ByteBuffer[] getUnderlyingBuffers() + { + ByteBuffer[] byteBuffers = new ByteBuffer[_fragments.length]; + for (int i = 0; i < _fragments.length; i++) + { + byteBuffers[i] = _fragments[i].getUnderlyingBuffer(); + } + return byteBuffers; + + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e64e2826/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java index 0126e20..abd5b78 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java +++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java @@ -21,8 +21,6 @@ package org.apache.qpid.server.bytebuffer; import java.nio.ByteBuffer; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org