Author: rgodfrey Date: Tue Dec 15 15:45:46 2015 New Revision: 1720183 URL: http://svn.apache.org/viewvc?rev=1720183&view=rev Log: QPID-6953 : Remove use of DataOutput for AMQP 0-8/9/9-1 encoding
Removed: qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java qpid/java/trunk/common/src/main/java/org/apache/qpid/util/BytesDataOutput.java Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicQosOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionCloseOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldArray.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueBindOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxCommitBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxCommitOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxRollbackBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxRollbackOkBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxSelectBody.java qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original) +++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Tue Dec 15 15:45:46 2015 @@ -1123,14 +1123,49 @@ public abstract class AbstractBDBMessage return data; } + @Override - public synchronized Collection<QpidByteBuffer> getContent() + public synchronized Collection<QpidByteBuffer> getContent(int offset, int length) { Collection<QpidByteBuffer> bufs = getContentAsByteBuffer(); Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size()); + int pos = 0; for (QpidByteBuffer buf : bufs) { - content.add(buf.duplicate()); + if(length > 0) + { + int bufRemaining = buf.remaining(); + if (pos + bufRemaining <= offset) + { + pos += bufRemaining; + } + else if (pos >= offset) + { + buf = buf.duplicate(); + if (bufRemaining <= length) + { + length -= bufRemaining; + } + else + { + buf.limit(length); + length = 0; + } + content.add(buf); + pos += buf.remaining(); + + } + else + { + int offsetInBuf = offset - pos; + int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf; + final QpidByteBuffer bufView = buf.view(offsetInBuf, limit); + content.add(bufView); + length -= limit; + pos+=limit+offsetInBuf; + } + } + } return content; } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Tue Dec 15 15:45:46 2015 @@ -168,13 +168,13 @@ public abstract class AbstractServerMess } @Override - final public Collection<QpidByteBuffer> getContent() + final public Collection<QpidByteBuffer> getContent(int offset, int length) { StoredMessage<T> storedMessage = getStoredMessage(); boolean wasInMemory = storedMessage.isInMemory(); try { - return storedMessage.getContent(); + return storedMessage.getContent(offset, length); } finally { Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java Tue Dec 15 15:45:46 2015 @@ -28,7 +28,7 @@ import org.apache.qpid.bytebuffer.QpidBy public interface MessageContentSource { - Collection<QpidByteBuffer> getContent(); + Collection<QpidByteBuffer> getContent(int offset, int length); long getSize(); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Tue Dec 15 15:45:46 2015 @@ -64,7 +64,7 @@ public class InternalMessage extends Abs { super(msg, null); _contentSize = msg.getMetaData().getContentSize(); - Collection<QpidByteBuffer> bufs = msg.getContent(); + Collection<QpidByteBuffer> bufs = msg.getContent(0, _contentSize); try(ObjectInputStream is = new ObjectInputStream(new ByteBufferInputStream(ByteBufferUtils.combine(bufs)))) { @@ -223,9 +223,9 @@ public class InternalMessage extends Abs } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(final int offset, final int length) { - return Collections.singleton(QpidByteBuffer.wrap(bytes)); + return Collections.singleton(QpidByteBuffer.wrap(bytes, offset, length)); } @Override Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Dec 15 15:45:46 2015 @@ -2596,7 +2596,8 @@ public abstract class AbstractQueue<X ex @Override public void write(OutputStream outputStream) throws IOException { - Collection<QpidByteBuffer> content = _messageReference.getMessage().getContent(); + ServerMessage message = _messageReference.getMessage(); + Collection<QpidByteBuffer> content = message.getContent(0, (int) message.getSize()); try { for (QpidByteBuffer b : content) Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Dec 15 15:45:46 2015 @@ -1506,13 +1506,48 @@ public abstract class AbstractJDBCMessag } @Override - public synchronized Collection<QpidByteBuffer> getContent() + public synchronized Collection<QpidByteBuffer> getContent(int offset, int length) { Collection<QpidByteBuffer> bufs = getContentAsByteBuffer(); Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size()); + + int pos = 0; for (QpidByteBuffer buf : bufs) { - content.add(buf.duplicate()); + if (length > 0) + { + int bufRemaining = buf.remaining(); + if (pos + bufRemaining <= offset) + { + pos += bufRemaining; + } + else if (pos >= offset) + { + buf = buf.duplicate(); + if (bufRemaining <= length) + { + length -= bufRemaining; + } + else + { + buf.limit(length); + length = 0; + } + content.add(buf); + pos += buf.remaining(); + + } + else + { + int offsetInBuf = offset - pos; + int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf; + final QpidByteBuffer bufView = buf.view(offsetInBuf, limit); + content.add(bufView); + length -= limit; + pos+=limit+offsetInBuf; + } + } + } return content; } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Tue Dec 15 15:45:46 2015 @@ -83,14 +83,15 @@ public class StoredMemoryMessage<T exten return this; } + @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(int offset, int length) { if(_content == null) { return null; } - return Collections.singleton(_content.duplicate()); + return Collections.singleton(_content.view(offset, length)); } public T getMetaData() Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java Tue Dec 15 15:45:46 2015 @@ -31,7 +31,8 @@ public interface StoredMessage<M extends long getMessageNumber(); - Collection<QpidByteBuffer> getContent(); + Collection<QpidByteBuffer> getContent(int offset, int length); + void remove(); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Tue Dec 15 15:45:46 2015 @@ -106,7 +106,6 @@ public class NetworkConnectionScheduler { rerun = false; boolean closed = connection.doWork(); - if (!closed && connection.getScheduler() == this) { Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Tue Dec 15 15:45:46 2015 @@ -184,7 +184,14 @@ public class NonBlockingNetworkTransport else { LOGGER.error("No Engine available."); - } + try + { + socketChannel.close(); + } + catch (IOException e) + { + LOGGER.debug("Failed to close socket " + socketChannel, e); + } } } } catch (IOException e) Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java Tue Dec 15 15:45:46 2015 @@ -100,7 +100,7 @@ public class TrustStoreMessageSourceTest { final int bodySize = (int) message.getSize(); byte[] msgContent = new byte[bodySize]; - final Collection<QpidByteBuffer> allData = message.getStoredMessage().getContent(); + final Collection<QpidByteBuffer> allData = message.getStoredMessage().getContent(0, bodySize); int total = 0; for(QpidByteBuffer b : allData) { Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java Tue Dec 15 15:45:46 2015 @@ -20,16 +20,11 @@ */ package org.apache.qpid.server.store; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import org.apache.qpid.bytebuffer.QpidByteBuffer; -import org.apache.qpid.framing.EncodingUtils; import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.util.ByteBufferOutputStream; public class TestMessageMetaData implements StorableMessageMetaData { @@ -96,17 +91,8 @@ public class TestMessageMetaData impleme public int writeToBuffer(QpidByteBuffer dest) { int oldPosition = dest.position(); - try - { - DataOutput dataOutputStream = dest.asDataOutput(); - EncodingUtils.writeLong(dataOutputStream, _messageId); - EncodingUtils.writeInteger(dataOutputStream, _contentSize); - } - catch (IOException e) - { - // This shouldn't happen as we are not actually using anything that can throw an IO Exception - throw new RuntimeException(e); - } + dest.putLong(_messageId); + dest.putInt(_contentSize); return dest.position() - oldPosition; }; Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Tue Dec 15 15:45:46 2015 @@ -99,11 +99,12 @@ public class TestMessageMetaDataType imp } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(int offset, int length) { return null; } + @Override public Object getConnectionReference() { Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Tue Dec 15 15:45:46 2015 @@ -109,7 +109,7 @@ class MockServerMessage implements Serve } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(int offset, int length) { throw new UnsupportedOperationException(); } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Tue Dec 15 15:45:46 2015 @@ -78,9 +78,9 @@ public class MessageConverter_Internal_t } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(final int offset, final int length) { - return Collections.singleton(QpidByteBuffer.wrap(messageContent)); + return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length)); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Tue Dec 15 15:45:46 2015 @@ -85,9 +85,9 @@ public class MessageConverter_v0_10 impl } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(final int offset, final int length) { - return serverMsg.getContent(); + return serverMsg.getContent(offset, length); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java Tue Dec 15 15:45:46 2015 @@ -63,7 +63,7 @@ public class MessageConverter_v0_10_to_I final String mimeType = serverMessage.getMessageHeader().getMimeType(); byte[] data = new byte[(int) serverMessage.getSize()]; int total = 0; - for(QpidByteBuffer b : serverMessage.getContent()) + for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize())) { int len = b.remaining(); b.get(data, total, len); Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Tue Dec 15 15:45:46 2015 @@ -83,6 +83,6 @@ public class MessageTransferMessage exte public Collection<QpidByteBuffer> getBody() { - return getContent(); + return getContent(0, (int) getSize()); } } Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Tue Dec 15 15:45:46 2015 @@ -392,14 +392,7 @@ public class AMQPConnection_0_8 _logger.debug("SEND: " + frame); } - try - { - frame.writePayload(_sender); - } - catch (IOException e) - { - throw new ServerScopedRuntimeException(e); - } + frame.writePayload(_sender); updateLastWriteTime(); Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Tue Dec 15 15:45:46 2015 @@ -91,9 +91,9 @@ public class MessageConverter_Internal_t } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(final int offset, final int length) { - return Collections.singleton(QpidByteBuffer.wrap(messageContent)); + return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length)); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Tue Dec 15 15:45:46 2015 @@ -63,7 +63,7 @@ public class MessageConverter_v0_8_to_In final String mimeType = serverMessage.getMessageHeader().getMimeType(); byte[] data = new byte[(int) serverMessage.getSize()]; int total = 0; - for(QpidByteBuffer b : serverMessage.getContent()) + for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize())) { int len = b.remaining(); b.get(data, total, len); Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Tue Dec 15 15:45:46 2015 @@ -110,32 +110,24 @@ public class MessageMetaData implements public int writeToBuffer(final QpidByteBuffer dest) { int oldPosition = dest.position(); - try - { - - dest.putInt(_contentHeaderBody.getSize()); - _contentHeaderBody.writePayload(dest); - EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getExchange()); - EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getRoutingKey()); - byte flags = 0; - if(_messagePublishInfo.isMandatory()) - { - flags |= MANDATORY_FLAG; - } - if(_messagePublishInfo.isImmediate()) - { - flags |= IMMEDIATE_FLAG; - } - dest.put(flags); - dest.putLong(_arrivalTime); + dest.putInt(_contentHeaderBody.getSize()); + _contentHeaderBody.writePayload(dest); + EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getExchange()); + EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getRoutingKey()); + byte flags = 0; + if(_messagePublishInfo.isMandatory()) + { + flags |= MANDATORY_FLAG; } - catch (IOException e) + if(_messagePublishInfo.isImmediate()) { - // This shouldn't happen as we are not actually using anything that can throw an IO Exception - throw new ConnectionScopedRuntimeException(e); + flags |= IMMEDIATE_FLAG; } + dest.put(flags); + dest.putLong(_arrivalTime); + return dest.position()-oldPosition; } @@ -143,55 +135,47 @@ public class MessageMetaData implements @Override public Collection<QpidByteBuffer> asByteBuffers() { - try - { - final List<QpidByteBuffer> buffers = new ArrayList<>(); - QpidByteBuffer buf = QpidByteBuffer.allocateDirect(4); - buffers.add(buf); - buf.putInt(0, _contentHeaderBody.getSize()); - _contentHeaderBody.writePayload(new ByteBufferSender() + final List<QpidByteBuffer> buffers = new ArrayList<>(); + QpidByteBuffer buf = QpidByteBuffer.allocateDirect(4); + buffers.add(buf); + buf.putInt(0, _contentHeaderBody.getSize()); + _contentHeaderBody.writePayload(new ByteBufferSender() + { + @Override + public void send(final QpidByteBuffer msg) + { + buffers.add(msg.duplicate()); + } + + @Override + public void flush() + { + + } + + @Override + public void close() { - @Override - public void send(final QpidByteBuffer msg) - { - buffers.add(msg.duplicate()); - } - - @Override - public void flush() - { - - } - - @Override - public void close() - { - - } - }); - buf = QpidByteBuffer.allocateDirect(9+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getExchange())+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getRoutingKey())); - EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getExchange()); - EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getRoutingKey()); - byte flags = 0; - if(_messagePublishInfo.isMandatory()) - { - flags |= MANDATORY_FLAG; - } - if(_messagePublishInfo.isImmediate()) - { - flags |= IMMEDIATE_FLAG; - } - buf.put(flags); - buf.putLong(_arrivalTime); - buf.flip(); - buffers.add(buf); - return buffers; + + } + }); + buf = QpidByteBuffer.allocateDirect(9+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getExchange())+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getRoutingKey())); + EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getExchange()); + EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getRoutingKey()); + byte flags = 0; + if(_messagePublishInfo.isMandatory()) + { + flags |= MANDATORY_FLAG; } - catch (IOException e) + if(_messagePublishInfo.isImmediate()) { - // This shouldn't happen as we are not actually using anything that can throw an IO Exception - throw new ConnectionScopedRuntimeException(e); + flags |= IMMEDIATE_FLAG; } + buf.put(flags); + buf.putLong(_arrivalTime); + buf.flip(); + buffers.add(buf); + return buffers; } public int getContentSize() Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Tue Dec 15 15:45:46 2015 @@ -20,11 +20,9 @@ */ package org.apache.qpid.server.protocol.v0_8; -import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +36,6 @@ import org.apache.qpid.framing.AMQMethod import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicGetOkBody; -import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; @@ -49,7 +45,6 @@ import org.apache.qpid.server.message.Se import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.transport.ByteBufferSender; -import org.apache.qpid.util.ByteBufferUtils; import org.apache.qpid.util.GZIPUtils; public class ProtocolOutputConverterImpl implements ProtocolOutputConverter @@ -100,73 +95,64 @@ public class ProtocolOutputConverterImpl return writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody); } + interface DisposableMessageContentSource extends MessageContentSource + { + void dispose(); + } + private long writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) { int bodySize = (int) message.getSize(); boolean msgCompressed = isCompressed(contentHeaderBody); - Collection<QpidByteBuffer> modifiedContentBuffers = null; + DisposableMessageContentSource modifiedContent = null; boolean compressionSupported = _connection.isCompressionSupported(); - Collection<QpidByteBuffer> contentBuffers = message.getContent(); long length; if(msgCompressed && !compressionSupported - && (contentBuffers != null) - && (modifiedContentBuffers = inflateIfPossible(contentBuffers)) != null) + && (modifiedContent = inflateIfPossible(message)) != null) { BasicContentHeaderProperties modifiedProps = new BasicContentHeaderProperties(contentHeaderBody.getProperties()); modifiedProps.setEncoding((String)null); - length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps); + length = writeMessageDeliveryModified(modifiedContent, channelId, deliverBody, modifiedProps); } else if(!msgCompressed && compressionSupported && contentHeaderBody.getProperties().getEncoding()==null && bodySize > _connection.getMessageCompressionThreshold() - && (contentBuffers != null) - && (modifiedContentBuffers = deflateIfPossible(contentBuffers)) != null) + && (modifiedContent = deflateIfPossible(message)) != null) { BasicContentHeaderProperties modifiedProps = new BasicContentHeaderProperties(contentHeaderBody.getProperties()); modifiedProps.setEncoding(GZIP_ENCODING); - length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps); + length = writeMessageDeliveryModified(modifiedContent, channelId, deliverBody, modifiedProps); } else { - writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, contentHeaderBody, bodySize); + writeMessageDeliveryUnchanged(message, channelId, deliverBody, contentHeaderBody, bodySize); length = bodySize; } - if (contentBuffers != null) + if (modifiedContent != null) { - for (QpidByteBuffer buf : contentBuffers) - { - buf.dispose(); - } - } - - if (modifiedContentBuffers != null) - { - for(QpidByteBuffer buf : modifiedContentBuffers) - { - buf.dispose(); - } + modifiedContent.dispose(); } return length; } - private Collection<QpidByteBuffer> deflateIfPossible(final Collection<QpidByteBuffer> buffers) + private DisposableMessageContentSource deflateIfPossible(MessageContentSource source) { try { - return QpidByteBuffer.deflate(buffers); + return new ModifiedContentSource(QpidByteBuffer.deflate(source.getContent(0, (int) source.getSize()))); } catch (IOException e) { @@ -175,11 +161,12 @@ public class ProtocolOutputConverterImpl } } - private Collection<QpidByteBuffer> inflateIfPossible(final Collection<QpidByteBuffer> buffers) + + private DisposableMessageContentSource inflateIfPossible(MessageContentSource source) { try { - return QpidByteBuffer.inflate(buffers); + return new ModifiedContentSource(QpidByteBuffer.inflate(source.getContent(0, (int) source.getSize()))); } catch (IOException e) { @@ -188,18 +175,19 @@ public class ProtocolOutputConverterImpl } } - private int writeMessageDeliveryModified(final Collection<QpidByteBuffer> contentBuffers, final int channelId, + + private int writeMessageDeliveryModified(final MessageContentSource content, final int channelId, final AMQBody deliverBody, final BasicContentHeaderProperties modifiedProps) { - final int bodySize = ByteBufferUtils.remaining(contentBuffers); + final int bodySize = (int) content.getSize(); ContentHeaderBody modifiedHeaderBody = new ContentHeaderBody(modifiedProps, bodySize); - writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, modifiedHeaderBody, bodySize); + writeMessageDeliveryUnchanged(content, channelId, deliverBody, modifiedHeaderBody, bodySize); return bodySize; } - private void writeMessageDeliveryUnchanged(Collection<QpidByteBuffer> contentBuffers, + private void writeMessageDeliveryUnchanged(MessageContentSource content, int channelId, AMQBody deliverBody, ContentHeaderBody contentHeaderBody, int bodySize) { @@ -219,7 +207,7 @@ public class ProtocolOutputConverterImpl int writtenSize = capacity; - AMQBody firstContentBody = new MessageContentSourceBody(contentBuffers, 0, capacity); + AMQBody firstContentBody = new MessageContentSourceBody(content, 0, capacity); CompositeAMQBodyBlock compositeBlock = @@ -229,7 +217,7 @@ public class ProtocolOutputConverterImpl while (writtenSize < bodySize) { capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; - AMQBody body = new MessageContentSourceBody(contentBuffers, writtenSize, capacity); + AMQBody body = new MessageContentSourceBody(content, writtenSize, capacity); writtenSize += capacity; writeFrame(new AMQFrame(channelId, body)); @@ -246,47 +234,12 @@ public class ProtocolOutputConverterImpl { public static final byte TYPE = 3; private final int _length; - private final Collection<QpidByteBuffer> _contentBuffers; + private final MessageContentSource _content; private final int _offset; - public MessageContentSourceBody(Collection<QpidByteBuffer> bufs, int offset, int length) + public MessageContentSourceBody(MessageContentSource content, int offset, int length) { - int pos = 0; - int added = 0; - - List<QpidByteBuffer> content = new ArrayList<>(bufs.size()); - for(QpidByteBuffer buf : bufs) - { - if(pos < offset) - { - final int remaining = buf.remaining(); - if(pos + remaining > offset) - { - buf = buf.view(offset-pos,length); - - content.add(buf); - added += buf.remaining(); - } - pos += remaining; - - } - else - { - buf = buf.slice(); - if(buf.remaining() > (length-added)) - { - buf.limit(length-added); - } - content.add(buf); - added += buf.remaining(); - } - if(added >= length) - { - break; - } - } - - _contentBuffers = content; + _content = content; _offset = offset; _length = length; } @@ -301,32 +254,11 @@ public class ProtocolOutputConverterImpl return _length; } - public void writePayload(DataOutput buffer) throws IOException - { - for(QpidByteBuffer buf : _contentBuffers) - { - if (buf.hasArray()) - { - buffer.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); - } - else - { - - byte[] data = new byte[_length]; - - buf.get(data); - - buffer.write(data); - } - buf.dispose(); - } - } - @Override - public long writePayload(final ByteBufferSender sender) throws IOException + public long writePayload(final ByteBufferSender sender) { - long size = 0l; - for(QpidByteBuffer buf : _contentBuffers) + long size = 0L; + for(QpidByteBuffer buf : _content.getContent(_offset, _length)) { size += buf.remaining(); @@ -373,8 +305,7 @@ public class ProtocolOutputConverterImpl exchangeName = pb.getExchange(); routingKey = pb.getRoutingKey(); - final AMQBody returnBlock = new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered); - return returnBlock; + return new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered); } private class EncodedDeliveryBody implements AMQBody @@ -418,16 +349,7 @@ public class ProtocolOutputConverterImpl return _underlyingBody.getSize(); } - public void writePayload(DataOutput buffer) throws IOException - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - _underlyingBody.writePayload(buffer); - } - - public long writePayload(ByteBufferSender sender) throws IOException + public long writePayload(ByteBufferSender sender) { if(_underlyingBody == null) { @@ -461,14 +383,11 @@ public class ProtocolOutputConverterImpl final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED)); - BasicGetOkBody getOkBody = - _connection.getMethodRegistry().createBasicGetOkBody(deliveryTag, - isRedelivered, - exchangeName, - routingKey, - queueSize); - - return getOkBody; + return _connection.getMethodRegistry().createBasicGetOkBody(deliveryTag, + isRedelivered, + exchangeName, + routingKey, + queueSize); } private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, @@ -476,14 +395,11 @@ public class ProtocolOutputConverterImpl AMQShortString replyText) { - BasicReturnBody basicReturnBody = - _connection.getMethodRegistry().createBasicReturnBody(replyCode, - replyText, - messagePublishInfo.getExchange(), - messagePublishInfo.getRoutingKey()); - - return basicReturnBody; + return _connection.getMethodRegistry().createBasicReturnBody(replyCode, + replyText, + messagePublishInfo.getExchange(), + messagePublishInfo.getRoutingKey()); } public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) @@ -533,13 +449,8 @@ public class ProtocolOutputConverterImpl return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); } - public void writePayload(DataOutput buffer) throws IOException - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); - } - @Override - public long writePayload(final ByteBufferSender sender) throws IOException + public long writePayload(final ByteBufferSender sender) { long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); @@ -586,13 +497,8 @@ public class ProtocolOutputConverterImpl return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; } - public void writePayload(DataOutput buffer) throws IOException - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); - } - @Override - public long writePayload(final ByteBufferSender sender) throws IOException + public long writePayload(final ByteBufferSender sender) { long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); size += (new AMQFrame(_channel, _headerBody)).writePayload(sender); @@ -611,4 +517,81 @@ public class ProtocolOutputConverterImpl } } + private static class ModifiedContentSource implements DisposableMessageContentSource + { + private final Collection<QpidByteBuffer> _buffers; + private final int _size; + + public ModifiedContentSource(final Collection<QpidByteBuffer> buffers) + { + _buffers = buffers; + int size = 0; + for(QpidByteBuffer buf : buffers) + { + size += buf.remaining(); + } + _size = size; + } + + @Override + public void dispose() + { + for(QpidByteBuffer buffer : _buffers) + { + buffer.dispose(); + } + } + + @Override + public Collection<QpidByteBuffer> getContent(final int offset, int length) + { + Collection<QpidByteBuffer> content = new ArrayList<>(_buffers.size()); + int pos = 0; + for (QpidByteBuffer buf : _buffers) + { + if (length > 0) + { + int bufRemaining = buf.remaining(); + if (pos + bufRemaining <= offset) + { + pos += bufRemaining; + } + else if (pos >= offset) + { + buf = buf.duplicate(); + if (bufRemaining <= length) + { + length -= bufRemaining; + } + else + { + buf.limit(length); + length = 0; + } + content.add(buf); + pos+=buf.remaining(); + + } + else + { + int offsetInBuf = offset - pos; + int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf; + final QpidByteBuffer bufView = buf.view(offsetInBuf, limit); + content.add(bufView); + length -= limit; + pos+=limit+offsetInBuf; + } + } + + } + return content; + + } + + @Override + public long getSize() + { + return _size; + } + } } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java Tue Dec 15 15:45:46 2015 @@ -51,6 +51,7 @@ import org.apache.qpid.amqp_1_0.type.mes import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.qpid.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.codec.BBEncoder; import org.apache.qpid.typedmessage.TypedBytesContentWriter; @@ -63,7 +64,7 @@ public class MessageConverter_from_1_0 public static Object convertBodyToObject(final Message_1_0 serverMessage) { byte[] data = new byte[(int) serverMessage.getSize()]; - final Collection<QpidByteBuffer> allData = serverMessage.getStoredMessage().getContent(); + final Collection<QpidByteBuffer> allData = serverMessage.getContent(0, (int) serverMessage.getSize()); int offset = 0; for(QpidByteBuffer buf : allData) { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Tue Dec 15 15:45:46 2015 @@ -209,7 +209,7 @@ public abstract class MessageConverter_t final String mimeType = serverMessage.getMessageHeader().getMimeType(); byte[] data = new byte[(int) serverMessage.getSize()]; int total = 0; - for(QpidByteBuffer b : serverMessage.getContent()) + for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize())) { int len = b.remaining(); b.get(data, total, len); @@ -245,9 +245,9 @@ public abstract class MessageConverter_t } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(int offset, int length) { - return Collections.singleton(allData.duplicate()); + return Collections.singleton(allData.view(offset, length)); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Tue Dec 15 15:45:46 2015 @@ -100,7 +100,7 @@ public class Message_1_0 extends Abstrac public Collection<QpidByteBuffer> getFragments() { - return getContent(); + return getContent(0, (int) getSize()); } } Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Tue Dec 15 15:45:46 2015 @@ -89,9 +89,9 @@ public class MessageConverter_1_0_to_v0_ } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(final int offset, final int length) { - return Collections.singleton(QpidByteBuffer.wrap(messageContent)); + return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length)); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Tue Dec 15 15:45:46 2015 @@ -193,9 +193,9 @@ public class MessageConverter_0_10_to_0_ } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(final int offset, final int length) { - return message.getContent(); + return message.getContent(offset, length); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Tue Dec 15 15:45:46 2015 @@ -82,9 +82,9 @@ public class MessageConverter_0_8_to_0_1 } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(final int offset, final int length) { - return message_0_8.getContent(); + return message_0_8.getContent(offset, length); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Tue Dec 15 15:45:46 2015 @@ -91,9 +91,9 @@ public class MessageConverter_1_0_to_v0_ } @Override - public Collection<QpidByteBuffer> getContent() + public Collection<QpidByteBuffer> getContent(final int offset, final int length) { - return Collections.singleton(QpidByteBuffer.wrap(messageContent)); + return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length)); } @Override Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java (original) +++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java Tue Dec 15 15:45:46 2015 @@ -218,7 +218,8 @@ public class ReportRunner<T> private static ReportableMessage convertMessage(QueueEntry entry) { final MessageInfoImpl messageInfo = new MessageInfoImpl(entry, true); - final Collection<QpidByteBuffer> contentBuffers = entry.getMessage().getContent(); + ServerMessage message = entry.getMessage(); + final Collection<QpidByteBuffer> contentBuffers = message.getContent(0, (int) message.getSize()); final ByteBuffer content = ByteBufferUtils.combine(contentBuffers); for(QpidByteBuffer buf : contentBuffers) { Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java Tue Dec 15 15:45:46 2015 @@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteA import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.slf4j.Logger; @@ -47,7 +46,6 @@ import org.apache.qpid.client.state.AMQS import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.codec.ClientDecoder; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQBody; @@ -71,7 +69,6 @@ import org.apache.qpid.transport.Excepti import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportActivity; -import org.apache.qpid.util.BytesDataOutput; public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, TransportActivity { @@ -142,8 +139,6 @@ public class AMQProtocolHandler implemen private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; - private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes); - private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); private int _queueId = 1; private final Object _queueIdLock = new Object(); @@ -557,10 +552,9 @@ public class AMQProtocolHandler implemen public synchronized void writeFrame(AMQDataBlock frame, boolean flush) { - final ByteBuffer buf = asByteBuffer(frame); _lastWriteTime = System.currentTimeMillis(); - _writtenBytes += buf.remaining(); - _sender.send(QpidByteBuffer.wrap(buf)); + _writtenBytes += frame.getSize(); + frame.writePayload(_sender); if(flush) { _sender.flush(); @@ -581,49 +575,6 @@ public class AMQProtocolHandler implemen } - private ByteBuffer asByteBuffer(AMQDataBlock block) - { - final int size = (int) block.getSize(); - - final byte[] data; - - - if(size > REUSABLE_BYTE_BUFFER_CAPACITY) - { - data= new byte[size]; - } - else - { - - data = _reusableBytes; - } - _reusableDataOutput.setBuffer(data); - - try - { - block.writePayload(_reusableDataOutput); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - - final ByteBuffer buf; - - if(size < REUSABLE_BYTE_BUFFER_CAPACITY) - { - buf = _reusableByteBuffer; - buf.position(0); - } - else - { - buf = ByteBuffer.wrap(data); - } - buf.limit(_reusableDataOutput.length()); - - return buf; - } - /** * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Tue Dec 15 15:45:46 2015 @@ -58,7 +58,6 @@ import org.apache.qpid.transport.Message import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; import org.apache.qpid.transport.codec.BBEncoder; -import org.apache.qpid.util.BytesDataOutput; import org.apache.qpid.util.GZIPUtils; import org.apache.qpid.util.Strings; @@ -251,9 +250,6 @@ public class BasicMessageProducer_0_10 e final int headerLength = buf.remaining(); byte[] unencryptedBytes = new byte[headerLength + (data == null ? 0 : data.remaining())]; - BytesDataOutput output = new BytesDataOutput(unencryptedBytes); - - output.write(buf.array(), buf.arrayOffset()+buf.position(), buf.remaining()); if (data != null) { Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Tue Dec 15 15:45:46 2015 @@ -38,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -59,7 +60,6 @@ import org.apache.qpid.framing.ContentHe import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.util.BytesDataOutput; import org.apache.qpid.util.GZIPUtils; public class BasicMessageProducer_0_8 extends BasicMessageProducer @@ -222,8 +222,8 @@ public class BasicMessageProducer_0_8 ex final int headerLength = contentHeaderProperties.getPropertyListSize() + 2; byte[] unencryptedBytes = new byte[headerLength + size]; - BytesDataOutput output = new BytesDataOutput(unencryptedBytes); - output.writeShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff)); + QpidByteBuffer output = QpidByteBuffer.wrap(unencryptedBytes); + output.putShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff)); contentHeaderProperties.writePropertyListPayload(output); if (size != 0) Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Tue Dec 15 15:45:46 2015 @@ -29,6 +29,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and * the content body/ies. @@ -40,6 +43,7 @@ public class UnprocessedMessage_0_8 exte { private long _bytesReceived = 0; + private static final Logger LOGGER = LoggerFactory.getLogger(UnprocessedMessage_0_8.class); private AMQShortString _exchange; private AMQShortString _routingKey; @@ -124,6 +128,7 @@ public class UnprocessedMessage_0_8 exte public boolean isAllBodyDataReceived() { + LOGGER.debug("Received {} of {} bytes for message body", _bytesReceived, getContentHeader().getBodySize()); return _bytesReceived == getContentHeader().getBodySize(); } Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java (original) +++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java Tue Dec 15 15:45:46 2015 @@ -38,13 +38,13 @@ import javax.crypto.spec.IvParameterSpec import javax.crypto.spec.SecretKeySpec; import javax.security.auth.x500.X500Principal; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestSSLConstants; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.util.BytesDataOutput; public class Encrypted091MessageFactoryTest extends QpidTestCase { @@ -78,8 +78,8 @@ public class Encrypted091MessageFactoryT final int headerLength = _props.getPropertyListSize() + 2; _unencrypted = new byte[headerLength + _data.length]; - BytesDataOutput output = new BytesDataOutput(_unencrypted); - output.writeShort((short) (_props.getPropertyFlags() & 0xffff)); + QpidByteBuffer output = QpidByteBuffer.wrap(_unencrypted); + output.putShort((short) (_props.getPropertyFlags() & 0xffff)); _props.writePropertyListPayload(output); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1720183&r1=1720182&r2=1720183&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Tue Dec 15 15:45:46 2015 @@ -21,7 +21,6 @@ package org.apache.qpid.bytebuffer; import java.io.BufferedOutputStream; -import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -440,11 +439,6 @@ public final class QpidByteBuffer } - public DataOutput asDataOutput() - { - return new BufferDataOutput(); - } - public static QpidByteBuffer allocate(int size) { return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size))); @@ -910,118 +904,4 @@ public final class QpidByteBuffer } } - private final class BufferDataOutput implements DataOutput - { - public void write(int b) - { - _buffer.put((byte) b); - } - - public void write(byte[] b) - { - _buffer.put(b); - } - - - public void write(byte[] b, int off, int len) - { - _buffer.put(b, off, len); - - } - - public void writeBoolean(boolean v) - { - _buffer.put(v ? (byte) 1 : (byte) 0); - } - - public void writeByte(int v) - { - _buffer.put((byte) v); - } - - public void writeShort(int v) - { - _buffer.putShort((short) v); - } - - public void writeChar(int v) - { - _buffer.put((byte) (v >>> 8)); - _buffer.put((byte) v); - } - - public void writeInt(int v) - { - _buffer.putInt(v); - } - - public void writeLong(long v) - { - _buffer.putLong(v); - } - - public void writeFloat(float v) - { - writeInt(Float.floatToIntBits(v)); - } - - public void writeDouble(double v) - { - writeLong(Double.doubleToLongBits(v)); - } - - public void writeBytes(String s) - { - throw new UnsupportedOperationException("writeBytes(String s) not supported"); - } - - public void writeChars(String s) - { - int len = s.length(); - for (int i = 0 ; i < len ; i++) - { - int v = s.charAt(i); - _buffer.put((byte) (v >>> 8)); - _buffer.put((byte) v); - } - } - - public void writeUTF(String s) - { - int strlen = s.length(); - - int pos = _buffer.position(); - _buffer.position(pos + 2); - - - for (int i = 0; i < strlen; i++) - { - int c = s.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) - { - c = s.charAt(i); - _buffer.put((byte) c); - - } - else if (c > 0x07FF) - { - _buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F))); - _buffer.put((byte) (0x80 | ((c >> 6) & 0x3F))); - _buffer.put((byte) (0x80 | (c & 0x3F))); - } - else - { - _buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F))); - _buffer.put((byte) (0x80 | (c & 0x3F))); - } - } - - int len = _buffer.position() - (pos + 2); - - _buffer.put(pos++, (byte) (len >>> 8)); - _buffer.put(pos, (byte) len); - } - - } - } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org