Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Fri Aug 7 00:28:17 2015 @@ -29,6 +29,7 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; @@ -115,7 +116,7 @@ public abstract class MessageStoreQuotaE { StorableMessageMetaData metaData = createMetaData(id, MESSAGE_DATA.length); MessageHandle<?> handle = _store.addMessage(metaData); - handle.addContent(ByteBuffer.wrap(MESSAGE_DATA)); + handle.addContent(QpidByteBuffer.wrap(MESSAGE_DATA)); StoredMessage<? extends StorableMessageMetaData> storedMessage = handle.allContentAdded(); TestMessage message = new TestMessage(id, storedMessage); return message;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Fri Aug 7 00:28:17 2015 @@ -23,6 +23,7 @@ package org.apache.qpid.server.store; import java.nio.ByteBuffer; import java.util.Collection; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -105,7 +106,7 @@ public class TestMessageMetaDataType imp } @Override - public Collection<ByteBuffer> getContent(int offset, int size) + public Collection<QpidByteBuffer> getContent(int offset, int size) { return null; } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java Fri Aug 7 00:28:17 2015 @@ -19,6 +19,7 @@ package org.apache.qpid.server.transport; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.network.AggregateTicker; @@ -112,7 +113,7 @@ public class NetworkConnectionSchedulerT Thread.sleep(500l); timidSender.start(); Thread.sleep(1000l); - verify(timidEngine, atLeast(6)).received(any(ByteBuffer.class)); + verify(timidEngine, atLeast(6)).received(any(QpidByteBuffer.class)); _keepRunningThreads = false; transport.close(); scheduler.close(); Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Fri Aug 7 00:28:17 2015 @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.Collection; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -107,7 +108,7 @@ class MockServerMessage implements Serve } - public Collection<ByteBuffer> getContent(int offset, int size) + public Collection<QpidByteBuffer> getContent(int offset, int size) { throw new UnsupportedOperationException(); } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Fri Aug 7 00:28:17 2015 @@ -34,6 +34,7 @@ import javax.security.auth.Subject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Broker; @@ -64,7 +65,7 @@ import org.apache.qpid.transport.network public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10> { private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class); - private final InputHandler _inputHandler; + private final ServerInputHandler _inputHandler; private final NetworkConnection _network; @@ -103,7 +104,8 @@ public class AMQPConnection_0_10 extends _connection.setRemoteAddress(network.getRemoteAddress()); _connection.setLocalAddress(network.getLocalAddress()); - _inputHandler = new InputHandler(new ServerAssembler(_connection), true); + _inputHandler = new ServerInputHandler(new ServerAssembler(_connection)); + _connection.addFrameSizeObserver(_inputHandler); _network = network; Subject.doAs(getSubject(), new PrivilegedAction<Object>() @@ -163,7 +165,7 @@ public class AMQPConnection_0_10 extends return new ByteBufferSender() { @Override - public void send(ByteBuffer msg) + public void send(final QpidByteBuffer msg) { updateLastWriteTime(); sender.send(msg); @@ -184,7 +186,7 @@ public class AMQPConnection_0_10 extends }; } - public void received(final ByteBuffer buf) + public void received(final QpidByteBuffer buf) { Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>() { @@ -212,10 +214,6 @@ public class AMQPConnection_0_10 extends throw new ConnectionScopedRuntimeException(e); } } - finally - { - buf.position(buf.limit()); - } return null; } }); Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Aug 7 00:28:17 2015 @@ -21,13 +21,18 @@ package org.apache.qpid.server.protocol.v0_10; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.FlowCreditManager; @@ -266,26 +271,26 @@ public class ConsumerTarget_0_10 extends boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding()); - ByteBuffer body = ByteBufferUtils.combine(msg.getBody()); + Collection<QpidByteBuffer> body = msg.getBody(); boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported(); if(msgCompressed && !compressionSupported) { - byte[] uncompressed = GZIPUtils.uncompressBufferToArray(body); + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(ByteBufferUtils.combine(body)); if(uncompressed != null) { messageProps.setContentEncoding(null); - body = ByteBuffer.wrap(uncompressed); + body = Collections.singleton(QpidByteBuffer.wrap(uncompressed)); } } else if(!msgCompressed && compressionSupported && (messageProps == null || messageProps.getContentEncoding()==null) && body != null - && body.remaining() > _session.getConnection().getMessageCompressionThreshold()) + && ByteBufferUtils.remaining(body) > _session.getConnection().getMessageCompressionThreshold()) { - byte[] compressed = GZIPUtils.compressBufferToArray(body); + byte[] compressed = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(body)); if(compressed != null) { if(messageProps == null) @@ -293,7 +298,7 @@ public class ConsumerTarget_0_10 extends messageProps = new MessageProperties(); } messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING); - body = ByteBuffer.wrap(compressed); + body = Collections.singleton(QpidByteBuffer.wrap(compressed)); } } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Fri Aug 7 00:28:17 2015 @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; @@ -91,9 +92,9 @@ public class MessageConverter_Internal_t } @Override - public Collection<ByteBuffer> getContent(int offsetInMessage, int size) + public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size) { - return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size)); + return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size)); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Fri Aug 7 00:28:17 2015 @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; @@ -91,7 +92,7 @@ public class MessageConverter_v0_10 impl } @Override - public Collection<ByteBuffer> getContent(int offsetInMessage, int size) + public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size) { return serverMsg.getContent(offsetInMessage, size); } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Fri Aug 7 00:28:17 2015 @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol. import java.nio.ByteBuffer; import java.util.Collection; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.store.StoredMessage; @@ -77,7 +78,7 @@ public class MessageTransferMessage exte return getMetaData().getHeader(); } - public Collection<ByteBuffer> getBody() + public Collection<QpidByteBuffer> getBody() { return getContent(0, (int)getSize()); } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java Fri Aug 7 00:28:17 2015 @@ -22,32 +22,61 @@ package org.apache.qpid.server.protocol. import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.bytebuffer.QpidByteBuffer; +import org.apache.qpid.codec.*; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.NetworkEventReceiver; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.network.Frame; +import org.apache.qpid.transport.network.NetworkDelegate; import org.apache.qpid.transport.network.NetworkEvent; -public class ServerAssembler extends Assembler +public class ServerAssembler { private static final Logger LOGGER = LoggerFactory.getLogger(ServerAssembler.class); private final ServerConnection _connection; - public ServerAssembler(final ServerConnection connection) + + + // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge + // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1. + private static final int ARRAY_SIZE = 0xFF; + private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1]; + private final Map<Integer, Method> _incompleteMethodMap = new HashMap<>(); + + private final Map<Integer,List<ServerFrame>> _segments; + private final ServerDecoder _decoder = new ServerDecoder(); + + public ServerAssembler(ServerConnection connection) { - super(connection); _connection = connection; + _segments = new HashMap<>(); } - @Override - public void received(final NetworkEvent event) + public void received(final ServerFrame event) { if (!_connection.isIgnoreFutureInput()) { - super.received(event); + frame(event); } else { @@ -55,11 +84,209 @@ public class ServerAssembler extends Ass } } - @Override protected ByteBuffer allocateByteBuffer(int size) { return ByteBuffer.allocateDirect(size); } + private int segmentKey(ServerFrame frame) + { + return (frame.getTrack() + 1) * frame.getChannel(); + } + + private List<ServerFrame> getSegment(ServerFrame frame) + { + return _segments.get(segmentKey(frame)); + } + + private void setSegment(ServerFrame frame, List<ServerFrame> segment) + { + int key = segmentKey(frame); + if (_segments.containsKey(key)) + { + error(new ProtocolError(Frame.L2, "segment in progress: %s", + frame)); + } + _segments.put(segmentKey(frame), segment); + } + + private void clearSegment(ServerFrame frame) + { + _segments.remove(segmentKey(frame)); + } + + private void emit(int channel, ProtocolEvent event) + { + event.setChannel(channel); + _connection.received(event); + } + + public void exception(Throwable t) + { + _connection.exception(t); + } + + public void closed() + { + _connection.closed(); + } + + public void init(ProtocolHeader header) + { + emit(0, header); + } + + public void error(ProtocolError error) + { + emit(0, error); + } + + public void frame(ServerFrame frame) + { + List<QpidByteBuffer> segment; + if (frame.isFirstFrame() && frame.isLastFrame()) + { + segment = Collections.singletonList(frame.getBody()); + assemble(frame, segment); + } + else + { + List<ServerFrame> frames; + if (frame.isFirstFrame()) + { + frames = new ArrayList<>(); + setSegment(frame, frames); + } + else + { + frames = getSegment(frame); + } + + frames.add(frame); + + if (frame.isLastFrame()) + { + clearSegment(frame); + segment = new ArrayList<>(frames.size()); + for (ServerFrame f : frames) + { + segment.add(f.getBody()); + } + assemble(frame, segment); + } + } + + } + + private void assemble(ServerFrame frame, List<QpidByteBuffer> segment) + { + ServerDecoder dec = _decoder; + dec.init(segment); + + int channel = frame.getChannel(); + Method command; + + switch (frame.getType()) + { + case CONTROL: + int controlType = dec.readUint16(); + Method control = Method.create(controlType); + control.read(dec); + emit(channel, control); + break; + case COMMAND: + int commandType = dec.readUint16(); + // read in the session header, right now we don't use it + int hdr = dec.readUint16(); + command = Method.create(commandType); + command.setSync((0x0001 & hdr) != 0); + command.read(dec); + if (command.hasPayload() && !frame.isLastSegment()) + { + setIncompleteCommand(channel, command); + } + else + { + emit(channel, command); + } + break; + case HEADER: + command = getIncompleteCommand(channel); + List<Struct> structs = null; + DeliveryProperties deliveryProps = null; + MessageProperties messageProps = null; + + while (dec.hasRemaining()) + { + Struct struct = dec.readStruct32(); + if(struct instanceof DeliveryProperties && deliveryProps == null) + { + deliveryProps = (DeliveryProperties) struct; + } + else if(struct instanceof MessageProperties && messageProps == null) + { + messageProps = (MessageProperties) struct; + } + else + { + if(structs == null) + { + structs = new ArrayList<>(2); + } + structs.add(struct); + } + + } + command.setHeader(new Header(deliveryProps,messageProps,structs)); + + if (frame.isLastSegment()) + { + setIncompleteCommand(channel, null); + emit(channel, command); + } + break; + case BODY: + command = getIncompleteCommand(channel); + command.setBody(segment); + setIncompleteCommand(channel, null); + emit(channel, command); + break; + default: + throw new IllegalStateException("unknown frame type: " + frame.getType()); + } + + dec.releaseBuffer(); + } + + private void setIncompleteCommand(int channelId, Method incomplete) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + _incompleteMethodArray[channelId] = incomplete; + } + else + { + if(incomplete != null) + { + _incompleteMethodMap.put(channelId, incomplete); + } + else + { + _incompleteMethodMap.remove(channelId); + } + } + } + + private Method getIncompleteCommand(int channelId) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + return _incompleteMethodArray[channelId]; + } + else + { + return _incompleteMethodMap.get(channelId); + } + } } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Fri Aug 7 00:28:17 2015 @@ -294,7 +294,6 @@ public class ServerConnectionDelegate ex } setConnectionTuneOkChannelMax(sconn, okChannelMax); - conn.setMaxFrameSize(okMaxFrameSize); } Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java (from r1693306, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java&r1=1693306&r2=1694594&rev=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java Fri Aug 7 00:28:17 2015 @@ -18,82 +18,138 @@ * under the License. * */ -package org.apache.qpid.transport.codec; - -import org.apache.qpid.transport.Binary; +package org.apache.qpid.server.protocol.v0_10; import java.nio.ByteBuffer; -import java.nio.ByteOrder; +import java.util.List; -/** - * Byte Buffer Decoder. - * Decoder concrete implementor using a backing byte buffer for decoding data. - * - * @author Rafael H. Schloming - */ -public final class BBDecoder extends AbstractDecoder +import org.apache.qpid.bytebuffer.QpidByteBuffer; +import org.apache.qpid.transport.codec.AbstractDecoder; + +public final class ServerDecoder extends AbstractDecoder { - private ByteBuffer in; + private List<QpidByteBuffer> _underlying; + private int _bufferIndex; - public void init(ByteBuffer in) - { - this.in = in; - this.in.order(ByteOrder.BIG_ENDIAN); - } - public void releaseBuffer() + public void init(List<QpidByteBuffer> in) { - in = null; + _underlying = in; } - protected byte doGet() + private void advanceIfNecessary() { - return in.get(); + while(!getCurrentBuffer().hasRemaining() && _bufferIndex != _underlying.size()-1) + { + _bufferIndex++; + } } - protected void doGet(byte[] bytes) + private QpidByteBuffer getBuffer(int size) { - in.get(bytes); + advanceIfNecessary(); + final QpidByteBuffer currentBuffer = getCurrentBuffer(); + if(currentBuffer.remaining()>= size) + { + return currentBuffer; + } + else + { + return readAsNativeByteBuffer(size); + } } - protected Binary get(int size) + private QpidByteBuffer readAsNativeByteBuffer(int len) { - if (in.hasArray()) + QpidByteBuffer currentBuffer = getCurrentBuffer(); + if(currentBuffer.remaining()>=len) { - byte[] bytes = in.array(); - Binary bin = new Binary(bytes, in.arrayOffset() + in.position(), size); - in.position(in.position() + size); - return bin; + QpidByteBuffer buf = currentBuffer.slice(); + buf.limit(len); + currentBuffer.position(currentBuffer.position()+len); + return buf; } else { - return super.get(size); + QpidByteBuffer dest = currentBuffer.isDirect() ? QpidByteBuffer.allocateDirect(len) : QpidByteBuffer.allocate(len); + while(dest.hasRemaining() && available()>0) + { + advanceIfNecessary(); + currentBuffer = getCurrentBuffer(); + final int remaining = dest.remaining(); + if(currentBuffer.remaining()>= remaining) + { + QpidByteBuffer buf = currentBuffer.slice(); + buf.limit(remaining); + currentBuffer.position(currentBuffer.position()+remaining); + dest.put(buf); + } + else + { + dest.put(currentBuffer); + } + } + + dest.flip(); + return dest; } } + private int available() + { + int remaining = 0; + for(int i = _bufferIndex; i < _underlying.size(); i++) + { + remaining += _underlying.get(i).remaining(); + } + return remaining; + } + + + private QpidByteBuffer getCurrentBuffer() + { + return _underlying.get(_bufferIndex); + } + + + public void releaseBuffer() + { + _underlying = null; + } + + protected byte doGet() + { + return getBuffer(1).get(); + } + + protected void doGet(byte[] bytes) + { + getBuffer(bytes.length).get(bytes); + } + public boolean hasRemaining() { - return in.hasRemaining(); + return available() != 0; } public short readUint8() { - return (short) (0xFF & in.get()); + return (short) (0xFF & getBuffer(1).get()); } public int readUint16() { - return 0xFFFF & in.getShort(); + return 0xFFFF & getBuffer(2).getShort(); } public long readUint32() { - return 0xFFFFFFFFL & in.getInt(); + return 0xFFFFFFFFL & getBuffer(4).getInt(); } public long readUint64() { - return in.getLong(); + return getBuffer(8).getLong(); } public byte[] readBin128() @@ -112,38 +168,38 @@ public final class BBDecoder extends Abs public double readDouble() { - return in.getDouble(); + return getBuffer(8).getDouble(); } public float readFloat() { - return in.getFloat(); + return getBuffer(4).getFloat(); } public short readInt16() { - return in.getShort(); + return getBuffer(2).getShort(); } public int readInt32() { - return in.getInt(); + return getBuffer(4).getInt(); } public byte readInt8() { - return in.get(); + return getBuffer(1).get(); } public byte[] readReaminingBytes() { - byte[] result = new byte[in.limit() - in.position()]; + byte[] result = new byte[available()]; get(result); return result; } public long readInt64() { - return in.getLong(); + return getBuffer(8).getLong(); } -} \ No newline at end of file +} Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java Fri Aug 7 00:28:17 2015 @@ -28,10 +28,14 @@ import static org.apache.qpid.transport. import static org.apache.qpid.transport.network.Frame.LAST_SEG; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.FrameSizeObserver; import org.apache.qpid.transport.Header; @@ -91,9 +95,9 @@ public final class ServerDisassembler im } } - private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) + private void frame(byte flags, byte type, byte track, int channel, int size, Collection<QpidByteBuffer> buf) { - ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE); + QpidByteBuffer data = QpidByteBuffer.allocate(HEADER_SIZE); data.put(0, flags); data.put(1, type); @@ -102,21 +106,43 @@ public final class ServerDisassembler im data.putShort(6, (short) channel); - ByteBuffer dup = buf.duplicate(); - dup.limit(dup.position() + size); - buf.position(buf.position() + size); _sender.send(data); - _sender.send(dup); - + if(size > 0) + { + int residual = size; + for(QpidByteBuffer b : buf) + { + final int remaining = b.remaining(); + if(remaining > 0 ) + { + if(remaining >= residual) + { + _sender.send(b.view(0,residual)); + b.position(b.position()+residual); + break; + } + else + { + _sender.send(b.duplicate()); + b.position(b.limit()); + residual-=remaining; + } + } + } + } } - private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf) + private void fragment(byte flags, SegmentType type, ProtocolEvent event, Collection<QpidByteBuffer> buf) { byte typeb = (byte) type.getValue(); byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; - int remaining = buf.remaining(); + int remaining = 0; + for(QpidByteBuffer b : buf) + { + remaining += b.remaining(); + } boolean first = true; while (true) { @@ -145,7 +171,7 @@ public final class ServerDisassembler im public void init(Void v, ProtocolHeader header) { - _sender.send(header.toByteBuffer(true)); + _sender.send(header.toByteBuffer()); _sender.flush(); } @@ -217,17 +243,22 @@ public final class ServerDisassembler im buf.position(0); buf.limit(methodLimit); - fragment(flags, type, method, buf.duplicate()); + fragment(flags, type, method, Collections.singletonList(QpidByteBuffer.wrap(buf.duplicate()))); if (payload) { - ByteBuffer body = method.getBody(); + Collection<QpidByteBuffer> body = method.getBody(); buf.limit(headerLimit); buf.position(methodLimit); - fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate()); + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, Collections.singletonList(QpidByteBuffer.wrap(buf.duplicate()))); if (body != null) { - fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate()); + Collection<QpidByteBuffer> dup = new ArrayList<>(); + for(QpidByteBuffer b : body) + { + dup.add(b.duplicate()); + } + fragment(LAST_SEG, SegmentType.BODY, method, dup); } } Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java (from r1693306, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java&r1=1693306&r2=1694594&rev=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java Fri Aug 7 00:28:17 2015 @@ -18,23 +18,25 @@ * under the License. * */ -package org.apache.qpid.transport.network; +package org.apache.qpid.server.protocol.v0_10; import static org.apache.qpid.transport.util.Functions.str; import java.nio.ByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.SegmentType; +import org.apache.qpid.transport.network.NetworkDelegate; +import org.apache.qpid.transport.network.NetworkEvent; -/** - * Frame - * - * @author Rafael H. Schloming - */ - -public final class Frame implements NetworkEvent +public final class ServerFrame { + private static final Logger LOGGER = LoggerFactory.getLogger(ServerFrame.class); + public static final int HEADER_SIZE = 12; // XXX: enums? @@ -43,10 +45,6 @@ public final class Frame implements Netw public static final byte L3 = 2; public static final byte L4 = 3; - public static final byte RESERVED = 0x0; - - public static final byte VERSION = 0x0; - public static final byte FIRST_SEG = 0x8; public static final byte LAST_SEG = 0x4; public static final byte FIRST_FRAME = 0x2; @@ -56,10 +54,10 @@ public final class Frame implements Netw final private SegmentType type; final private byte track; final private int channel; - final private ByteBuffer body; + final private QpidByteBuffer body; - public Frame(byte flags, SegmentType type, byte track, int channel, - ByteBuffer body) + public ServerFrame(byte flags, SegmentType type, byte track, int channel, + QpidByteBuffer body) { this.flags = flags; this.type = type; @@ -68,7 +66,7 @@ public final class Frame implements Netw this.body = body; } - public ByteBuffer getBody() + public QpidByteBuffer getBody() { return body.slice(); } @@ -123,11 +121,6 @@ public final class Frame implements Netw return flag(LAST_FRAME); } - public void delegate(NetworkDelegate delegate) - { - delegate.frame(this); - } - public String toString() { StringBuilder str = new StringBuilder(); Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java (from r1693325, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java&r1=1693325&r2=1694594&rev=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java Fri Aug 7 00:28:17 2015 @@ -18,40 +18,26 @@ * under the License. * */ -package org.apache.qpid.transport.network; +package org.apache.qpid.server.protocol.v0_10; -import static org.apache.qpid.transport.network.InputHandler.State.ERROR; -import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY; -import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR; -import static org.apache.qpid.transport.network.InputHandler.State.PROTO_HDR; import static org.apache.qpid.transport.util.Functions.str; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; +import static org.apache.qpid.server.protocol.v0_10.ServerInputHandler.State.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver; import org.apache.qpid.transport.FrameSizeObserver; -import org.apache.qpid.transport.NetworkEventReceiver; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.SegmentType; -/** - * InputHandler - * - * @author Rafael H. Schloming - */ - -public class InputHandler implements ExceptionHandlingByteBufferReceiver, FrameSizeObserver +public class ServerInputHandler implements FrameSizeObserver { - private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class); - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer - .allocate(0); + private static final Logger LOGGER = LoggerFactory.getLogger(ServerInputHandler.class); + private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocate(0); private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE; @@ -60,15 +46,11 @@ public class InputHandler implements Exc { PROTO_HDR, FRAME_HDR, - FRAME_BODY, ERROR; } - private final NetworkEventReceiver receiver; + private final ServerAssembler _serverAssembler; - private final boolean _useDirect; - private State state; - private ByteBuffer input = null; - private int needed; + private State _state = PROTO_HDR; private byte flags; private SegmentType type; @@ -76,21 +58,10 @@ public class InputHandler implements Exc private int channel; - public InputHandler(NetworkEventReceiver receiver, final boolean useDirect) + public ServerInputHandler(ServerAssembler serverAssembler) { - this.receiver = receiver; - this.state = PROTO_HDR; - _useDirect = useDirect; - - switch (state) - { - case PROTO_HDR: - needed = 8; - break; - case FRAME_HDR: - needed = Frame.HEADER_SIZE; - break; - } + _serverAssembler = serverAssembler; + _state = PROTO_HDR; } public void setMaxFrameSize(final int maxFrameSize) @@ -100,125 +71,127 @@ public class InputHandler implements Exc private void error(String fmt, Object ... args) { - receiver.received(new ProtocolError(Frame.L1, fmt, args)); + _serverAssembler.error(new ProtocolError(ServerFrame.L1, fmt, args)); } - @Override - public void received(ByteBuffer buf) + public void received(QpidByteBuffer buf) { - int limit = buf.limit(); - int remaining = buf.remaining(); - while (remaining > 0) + int position = buf.position(); + while(buf.hasRemaining() && _state != ERROR) { - if (remaining >= needed) - { - int consumed = needed; - int pos = buf.position(); - if (input == null) - { - buf.limit(pos + needed); - input = buf; - state = next(pos); - buf.limit(limit); - buf.position(pos + consumed); - } - else - { - buf.limit(pos + needed); - input.put(buf); - buf.limit(limit); - input.flip(); - state = next(0); - } + parse(buf); - remaining -= consumed; - input = null; + int newPosition = buf.position(); + if(position == newPosition) + { + break; } else { - if (input == null) - { - input = _useDirect ? ByteBuffer.allocateDirect(needed) : ByteBuffer.allocate(needed); - } - input.put(buf); - needed -= remaining; - remaining = 0; + position = newPosition; } } } - private State next(int pos) + private void parse(QpidByteBuffer buffer) { - input.order(ByteOrder.BIG_ENDIAN); - - switch (state) { - case PROTO_HDR: - if (input.get(pos) != 'A' && - input.get(pos + 1) != 'M' && - input.get(pos + 2) != 'Q' && - input.get(pos + 3) != 'P') - { - error("bad protocol header: %s", str(input)); - return ERROR; - } + buffer.mark(); + switch (_state) { + case PROTO_HDR: + if(buffer.remaining() < 8) + { + break; + } + if (buffer.get() != 'A' || + buffer.get() != 'M' || + buffer.get() != 'Q' || + buffer.get() != 'P') + { + buffer.reset(); + error("bad protocol header: %s", str(buffer)); + _state = ERROR; + } + else + { + byte protoClass = buffer.get(); + byte instance = buffer.get(); + byte major = buffer.get(); + byte minor = buffer.get(); - byte protoClass = input.get(pos + 4); - byte instance = input.get(pos + 5); - byte major = input.get(pos + 6); - byte minor = input.get(pos + 7); - receiver.received(new ProtocolHeader(protoClass, instance, major, minor)); - needed = Frame.HEADER_SIZE; - return FRAME_HDR; - case FRAME_HDR: - flags = input.get(pos); - type = SegmentType.get(input.get(pos + 1)); - int size = (0xFFFF & input.getShort(pos + 2)); - size -= Frame.HEADER_SIZE; - _maxFrameSize = 64 * 1024; - if (size < 0 || size > (_maxFrameSize - 12)) - { - error("bad frame size: %d", size); - return ERROR; - } - byte b = input.get(pos + 5); - if ((b & 0xF0) != 0) { - error("non-zero reserved bits in upper nibble of " + - "frame header byte 5: '%x'", b); - return ERROR; - } else { - track = (byte) (b & 0xF); - } - channel = (0xFFFF & input.getShort(pos + 6)); - if (size == 0) - { - Frame frame = new Frame(flags, type, track, channel, EMPTY_BYTE_BUFFER); - receiver.received(frame); - needed = Frame.HEADER_SIZE; - return FRAME_HDR; - } - else - { - needed = size; - return FRAME_BODY; - } - case FRAME_BODY: - Frame frame = new Frame(flags, type, track, channel, input.slice()); - receiver.received(frame); - needed = Frame.HEADER_SIZE; - return FRAME_HDR; - default: - throw new IllegalStateException(); + _serverAssembler.init(new ProtocolHeader(protoClass, instance, major, minor)); + _state = FRAME_HDR; + } + break; + case FRAME_HDR: + if(buffer.remaining() < ServerFrame.HEADER_SIZE) + { + buffer.reset(); + } + else + { + flags = buffer.get(); + type = SegmentType.get(buffer.get()); + int size = (0xFFFF & buffer.getShort()); + + size -= ServerFrame.HEADER_SIZE; + if (size < 0 || size > (_maxFrameSize - ServerFrame.HEADER_SIZE)) + { + error("bad frame size: %d", size); + _state = ERROR; + } + else + { + buffer.get(); // skip unused byte + byte b = buffer.get(); + if ((b & 0xF0) != 0) + { + error("non-zero reserved bits in upper nibble of " + + "frame header byte 5: '%x'", b); + _state = ERROR; + } + else + { + track = (byte) (b & 0xF); + + channel = (0xFFFF & buffer.getShort()); + buffer.position(buffer.position()+4); + if (size == 0) + { + ServerFrame frame = new ServerFrame(flags, type, track, channel, EMPTY_BYTE_BUFFER); + _serverAssembler.received(frame); + + } + else if (buffer.remaining() < size) + { + buffer.reset(); + } + else + { + final QpidByteBuffer body = buffer.slice(); + body.limit(size); + ServerFrame frame = new ServerFrame(flags, type, track, channel, body); + buffer.position(buffer.position() + size); + + _serverAssembler.received(frame); + } + } + } + } + break; + default: + throw new IllegalStateException(); } + } public void exception(Throwable t) { - receiver.exception(t); + _serverAssembler.exception(t); } public void closed() { - receiver.closed(); + _serverAssembler.closed(); } } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Aug 7 00:28:17 2015 @@ -34,6 +34,7 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; @@ -500,10 +501,13 @@ public class ServerSessionDelegate exten final MessageMetaData_0_10 messageMetaData, final MessageStore store) { final MessageHandle<MessageMetaData_0_10> addedMessage = store.addMessage(messageMetaData); - ByteBuffer body = xfr.getBody(); + Collection<QpidByteBuffer> body = xfr.getBody(); if(body != null) { - addedMessage.addContent(body); + for(QpidByteBuffer b : body) + { + addedMessage.addContent(b); + } } final StoredMessage<MessageMetaData_0_10> storedMessage = addedMessage.allContentAdded(); return storedMessage; Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Aug 7 00:28:17 2015 @@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol. import static org.apache.qpid.transport.util.Functions.hex; -import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -49,6 +48,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -2511,7 +2511,7 @@ public class AMQChannel } @Override - public void receiveMessageContent(final ByteBuffer data) + public void receiveMessageContent(final QpidByteBuffer data) { if(_logger.isDebugEnabled()) { Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Fri Aug 7 00:28:17 2015 @@ -26,7 +26,6 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.Principal; import java.security.PrivilegedAction; @@ -53,7 +52,9 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.ServerDecoder; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.*; @@ -126,13 +127,13 @@ public class AMQPConnection_0_8 private ConnectionState _state = ConnectionState.INIT; /** - * The channels that the latest call to {@link #received(ByteBuffer)} applied to. + * The channels that the latest call to {@link ProtocolEngine#received(QpidByteBuffer)} applied to. * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()} * on after handling the frames. */ private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>(); - private final AMQDecoder _decoder; + private final ServerDecoder _decoder; private SaslServer _saslServer; @@ -277,7 +278,7 @@ public class AMQPConnection_0_8 return new WriteDeliverMethod(channelId); } - public void received(final ByteBuffer msg) + public void received(final QpidByteBuffer msg) { Subject.doAs(getSubject(), new PrivilegedAction<Void>() { Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Aug 7 00:28:17 2015 @@ -31,6 +31,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; @@ -103,9 +104,9 @@ public class MessageConverter_Internal_t } @Override - public Collection<ByteBuffer> getContent(int offsetInMessage, int size) + public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size) { - return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size)); + return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size)); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Fri Aug 7 00:28:17 2015 @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; @@ -125,7 +126,8 @@ public class ProtocolOutputConverterImpl && compressionSupported && contentHeaderBody.getProperties().getEncoding()==null && bodySize > _connection.getMessageCompressionThreshold() - && (modifiedContent = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(message.getContent(0, bodySize)))) != null) + && (modifiedContent = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(message.getContent(0, + bodySize)))) != null) { BasicContentHeaderProperties modifiedProps = new BasicContentHeaderProperties(contentHeaderBody.getProperties()); @@ -163,9 +165,9 @@ public class ProtocolOutputConverterImpl } @Override - public Collection<ByteBuffer> getContent(final int offset, final int size) + public Collection<QpidByteBuffer> getContent(final int offset, final int size) { - return Collections.singleton(ByteBuffer.wrap(content, offset, size)); + return Collections.singleton(QpidByteBuffer.wrap(content, offset, size)); } @Override @@ -247,9 +249,9 @@ public class ProtocolOutputConverterImpl public void writePayload(DataOutput buffer) throws IOException { - Collection<ByteBuffer> bufs = _message.getContent(_offset, _length); + Collection<QpidByteBuffer> bufs = _message.getContent(_offset, _length); - for(ByteBuffer buf : bufs) + for(QpidByteBuffer buf : bufs) { if (buf.hasArray()) { @@ -271,9 +273,9 @@ public class ProtocolOutputConverterImpl public long writePayload(final ByteBufferSender sender) throws IOException { - Collection<ByteBuffer> bufs = _message.getContent(_offset, _length); + Collection<QpidByteBuffer> bufs = _message.getContent(_offset, _length); long size = 0l; - for(ByteBuffer buf : bufs) + for(QpidByteBuffer buf : bufs) { size += buf.remaining(); sender.send(buf.duplicate()); Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Fri Aug 7 00:28:17 2015 @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; @@ -48,7 +49,6 @@ import org.apache.qpid.server.message.In import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; @@ -288,10 +288,16 @@ public class InternalTestProtocolSession { _sender = new ByteBufferSender() { - public void send(ByteBuffer msg) + private void send(ByteBuffer msg) { } + @Override + public void send(final QpidByteBuffer msg) + { + + } + public void flush() { } Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Fri Aug 7 00:28:17 2015 @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; @@ -35,7 +36,7 @@ public class MockStoredMessage implement { private long _messageId; private MessageMetaData _metaData; - private final ByteBuffer _content; + private final QpidByteBuffer _content; public MockStoredMessage(long messageId) { @@ -62,7 +63,7 @@ public class MockStoredMessage implement ( chb.getProperties()).setHeaders(headers); } _metaData = new MessageMetaData(info, chb); - _content = ByteBuffer.allocate(_metaData.getContentSize()); + _content = QpidByteBuffer.allocate(_metaData.getContentSize()); } public MessageMetaData getMetaData() @@ -75,7 +76,7 @@ public class MockStoredMessage implement return _messageId; } - public void addContent(ByteBuffer src) + public void addContent(QpidByteBuffer src) { src = src.duplicate(); _content.put(src); @@ -90,24 +91,17 @@ public class MockStoredMessage implement public int getContent(int offset, ByteBuffer dst) { - ByteBuffer src = _content.duplicate(); - src.position(offset); - src = src.slice(); - if(dst.remaining() < src.limit()) - { - src.limit(dst.remaining()); - } - dst.put(src); - return src.limit(); + final int length = Math.min(dst.remaining(), _content.remaining() - offset); + QpidByteBuffer src = _content.view(offset, length); + src.get(dst); + return length; } - public Collection<ByteBuffer> getContent(int offsetInMessage, int size) + public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size) { - ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); - buf.position(0); + QpidByteBuffer buf = _content.view(offsetInMessage,size); return Collections.singleton(buf); } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java Fri Aug 7 00:28:17 2015 @@ -23,6 +23,8 @@ package org.apache.qpid.amqp_1_0.codec; import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; + public abstract class AbstractDescribedTypeWriter<V> implements ValueWriter<V> { private int _length; @@ -45,7 +47,7 @@ public abstract class AbstractDescribedT private State _state = State.FORMAT_CODE; - public int writeToBuffer(ByteBuffer buffer) + public int writeToBuffer(QpidByteBuffer buffer) { final int length = _length; @@ -110,7 +112,7 @@ public abstract class AbstractDescribedT return _length; } - private void writeFirstPass(ByteBuffer buffer) + private void writeFirstPass(QpidByteBuffer buffer) { int length = 1; @@ -185,4 +187,4 @@ public abstract class AbstractDescribedT { return false; } -} \ No newline at end of file +} Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java Fri Aug 7 00:28:17 2015 @@ -20,9 +20,9 @@ package org.apache.qpid.amqp_1_0.codec; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.transport.AmqpError; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import java.lang.reflect.Array; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -31,7 +31,7 @@ public abstract class ArrayTypeConstruct - public Object[] construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException + public Object[] construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { int size = read(in); if(in.remaining() < size) @@ -40,7 +40,7 @@ public abstract class ArrayTypeConstruct "Insufficient data to decode array - requires %d octects, only %d remaining.", size, in.remaining()); } - ByteBuffer dup = in.slice(); + QpidByteBuffer dup = in.slice(); dup.limit(size); in.position(in.position()+size); int count = read(dup); @@ -69,13 +69,13 @@ public abstract class ArrayTypeConstruct } - abstract int read(ByteBuffer in) throws AmqpErrorException; + abstract int read(QpidByteBuffer in) throws AmqpErrorException; private static final ArrayTypeConstructor ONE_BYTE_SIZE_ARRAY = new ArrayTypeConstructor() { - @Override int read(final ByteBuffer in) throws AmqpErrorException + @Override int read(final QpidByteBuffer in) throws AmqpErrorException { if(!in.hasRemaining()) { @@ -89,7 +89,7 @@ public abstract class ArrayTypeConstruct private static final ArrayTypeConstructor FOUR_BYTE_SIZE_ARRAY = new ArrayTypeConstructor() { - @Override int read(final ByteBuffer in) throws AmqpErrorException + @Override int read(final QpidByteBuffer in) throws AmqpErrorException { if(in.remaining()<4) { @@ -110,4 +110,4 @@ public abstract class ArrayTypeConstruct return FOUR_BYTE_SIZE_ARRAY; } -} \ No newline at end of file +} Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java Fri Aug 7 00:28:17 2015 @@ -19,7 +19,7 @@ package org.apache.qpid.amqp_1_0.codec; -import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; public class ArrayWriter implements ValueWriter<Object[]> { @@ -60,7 +60,7 @@ public class ArrayWriter implements Val //registry.register(List.class, FACTORY); } - public int writeToBuffer(final ByteBuffer buffer) + public int writeToBuffer(final QpidByteBuffer buffer) { return 0; //TODO change body of implemented methods use File | Settings | File Templates. } @@ -79,4 +79,4 @@ public class ArrayWriter implements Val { return false; //TODO change body of implemented methods use File | Settings | File Templates. } -} \ No newline at end of file +} Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java Fri Aug 7 00:28:17 2015 @@ -22,6 +22,7 @@ package org.apache.qpid.amqp_1_0.codec; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import java.nio.ByteBuffer; @@ -42,7 +43,7 @@ public class BinaryTypeConstructor exten } @Override - public Object construct(final ByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException + public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException { int size; @@ -55,7 +56,7 @@ public class BinaryTypeConstructor exten size = in.getInt(); } - ByteBuffer inDup = in.slice(); + QpidByteBuffer inDup = in.slice(); inDup.limit(inDup.position()+size); Binary binary; @@ -77,4 +78,4 @@ public class BinaryTypeConstructor exten } -} \ No newline at end of file +} Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java Fri Aug 7 00:28:17 2015 @@ -22,14 +22,13 @@ package org.apache.qpid.amqp_1_0.codec; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; - -import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; public class BooleanConstructor { private static final TypeConstructor<Boolean> TRUE_INSTANCE = new TypeConstructor<Boolean>() { - public Boolean construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException + public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { return Boolean.TRUE; } @@ -37,14 +36,14 @@ public class BooleanConstructor private static final TypeConstructor<Boolean> FALSE_INSTANCE = new TypeConstructor<Boolean>() { - public Boolean construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException + public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { return Boolean.FALSE; } }; private static final TypeConstructor<Boolean> BYTE_INSTANCE = new TypeConstructor<Boolean>() { - public Boolean construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException + public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { if(in.hasRemaining()) { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java Fri Aug 7 00:28:17 2015 @@ -21,14 +21,14 @@ package org.apache.qpid.amqp_1_0.codec; -import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; public class BooleanWriter implements ValueWriter<Boolean> { private boolean _complete = true; private boolean _value; - public int writeToBuffer(ByteBuffer buffer) + public int writeToBuffer(QpidByteBuffer buffer) { if(!_complete & buffer.hasRemaining()) { @@ -67,4 +67,4 @@ public class BooleanWriter implements Va { registry.register(Boolean.class, FACTORY); } -} \ No newline at end of file +} Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java Fri Aug 7 00:28:17 2015 @@ -23,8 +23,7 @@ package org.apache.qpid.amqp_1_0.codec; import org.apache.qpid.amqp_1_0.type.*; import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; import org.apache.qpid.amqp_1_0.type.transport.Error; - -import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; public class ByteTypeConstructor implements TypeConstructor { @@ -39,7 +38,7 @@ public class ByteTypeConstructor impleme { } - public Object construct(final ByteBuffer in, ValueHandler handler) throws AmqpErrorException + public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException { if(in.hasRemaining()) { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java Fri Aug 7 00:28:17 2015 @@ -21,14 +21,14 @@ package org.apache.qpid.amqp_1_0.codec; -import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; public class ByteWriter implements ValueWriter<Byte> { private int _written = 2; private byte _value; - public int writeToBuffer(ByteBuffer buffer) + public int writeToBuffer(QpidByteBuffer buffer) { switch(_written) @@ -87,4 +87,4 @@ public class ByteWriter implements Value { registry.register(Byte.class, FACTORY); } -} \ No newline at end of file +} Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java Fri Aug 7 00:28:17 2015 @@ -22,8 +22,7 @@ package org.apache.qpid.amqp_1_0.codec; import org.apache.qpid.amqp_1_0.type.*; import org.apache.qpid.amqp_1_0.type.transport.*; - -import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; public class CharTypeConstructor implements TypeConstructor { @@ -39,7 +38,7 @@ public class CharTypeConstructor impleme { } - public Object construct(final ByteBuffer in, ValueHandler handler) throws AmqpErrorException + public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException { if(in.remaining()>=4) { @@ -64,4 +63,4 @@ public class CharTypeConstructor impleme } } -} \ No newline at end of file +} Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java Fri Aug 7 00:28:17 2015 @@ -23,6 +23,7 @@ package org.apache.qpid.amqp_1_0.codec; import org.apache.qpid.amqp_1_0.type.*; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -154,7 +155,7 @@ public class CompoundTypeConstructor ext } @Override - public Object construct(final ByteBuffer in, boolean isCopy, ValueHandler delegate) throws AmqpErrorException + public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler delegate) throws AmqpErrorException { int size; int count; @@ -170,8 +171,7 @@ public class CompoundTypeConstructor ext count = in.getInt(); } - ByteBuffer data; - ByteBuffer inDup = in.slice(); + QpidByteBuffer inDup = in.slice(); inDup.limit(size-getSize()); @@ -189,4 +189,4 @@ public class CompoundTypeConstructor ext } -} \ No newline at end of file +} Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java Fri Aug 7 00:28:17 2015 @@ -23,6 +23,8 @@ package org.apache.qpid.amqp_1_0.codec; import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; + public abstract class CompoundWriter<V> implements ValueWriter<V> { private int _length; @@ -51,12 +53,12 @@ public abstract class CompoundWriter<V> private State _state = State.FORMAT_CODE; - public int writeToBuffer(ByteBuffer buffer) + public int writeToBuffer(QpidByteBuffer buffer) { return writeToBuffer(buffer, false); } - public int writeToBuffer(ByteBuffer buffer, boolean large) + public int writeToBuffer(QpidByteBuffer buffer, boolean large) { final int length = _length; @@ -209,7 +211,7 @@ public abstract class CompoundWriter<V> return _length; } - private void writeFirstPass(ByteBuffer buffer, int size) + private void writeFirstPass(QpidByteBuffer buffer, int size) { State state = State.FORMAT_CODE; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java Fri Aug 7 00:28:17 2015 @@ -21,9 +21,9 @@ package org.apache.qpid.amqp_1_0.codec; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import java.math.BigDecimal; -import java.nio.ByteBuffer; public abstract class DecimalConstructor implements TypeConstructor<BigDecimal> { @@ -31,7 +31,7 @@ public abstract class DecimalConstructor private static final DecimalConstructor DECIMAL_32 = new DecimalConstructor() { - public BigDecimal construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException + public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { @@ -54,7 +54,7 @@ public abstract class DecimalConstructor private static final DecimalConstructor DECIMAL_64 = new DecimalConstructor() { - public BigDecimal construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException + public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { long val; @@ -77,7 +77,7 @@ public abstract class DecimalConstructor private static final DecimalConstructor DECIMAL_128 = new DecimalConstructor() { - public BigDecimal construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException + public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { long high; long low; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org