ARTEMIS-204 Improvements on OpenWire https://issues.apache.org/jira/browse/ARTEMIS-204
by consequence this will also fix any possible issues with AMQP Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1dae9974 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1dae9974 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1dae9974 Branch: refs/heads/master Commit: 1dae99746b4532313bbfa66e530751e8d79d2d5b Parents: a4498d4 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Thu Aug 13 19:20:20 2015 -0400 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Aug 13 20:39:01 2015 -0400 ---------------------------------------------------------------------- .../artemis/api/core/ActiveMQBuffer.java | 7 +- .../core/buffers/impl/ChannelBufferWrapper.java | 26 ++- .../apache/activemq/artemis/utils/ByteUtil.java | 12 + .../CompressedLargeMessageControllerImpl.java | 30 ++- .../client/impl/LargeMessageControllerImpl.java | 23 +- .../artemis/reader/BytesMessageUtil.java | 106 ++++----- .../activemq/artemis/reader/MapMessageUtil.java | 17 +- .../activemq/artemis/reader/MessageUtil.java | 7 +- .../artemis/reader/StreamMessageUtil.java | 40 ++-- .../artemis/reader/TextMessageUtil.java | 7 +- .../jms/client/ActiveMQBytesMessage.java | 52 ++--- .../artemis/jms/client/ActiveMQMapMessage.java | 4 +- .../jms/client/ActiveMQStreamMessage.java | 22 +- .../artemis/jms/client/ActiveMQTextMessage.java | 4 +- .../converter/jms/ServerJMSBytesMessage.java | 53 ++--- .../converter/jms/ServerJMSMapMessage.java | 4 +- .../proton/converter/jms/ServerJMSMessage.java | 19 ++ .../converter/jms/ServerJMSStreamMessage.java | 84 ++++--- .../converter/jms/ServerJMSTextMessage.java | 6 +- .../core/protocol/proton/TestConversions.java | 20 +- .../protocol/openwire/DataInputWrapper.java | 228 ------------------- .../protocol/openwire/OpenWireConnection.java | 163 ++++++------- .../openwire/OpenWireMessageConverter.java | 23 +- .../openwire/OpenWireProtocolManager.java | 44 ++-- .../core/server/impl/ServerConsumerImpl.java | 6 - .../openwire/SimpleOpenWireTest.java | 136 ++++++++--- .../openwire/VerySimpleOenwireTest.java | 118 ++++++++++ .../netty/ActiveMQFrameDecoder2Test.java | 8 +- 28 files changed, 646 insertions(+), 623 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java index 12dd09f..da78fb7 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.api.core; +import java.io.DataInput; import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf; @@ -29,7 +30,7 @@ import io.netty.buffer.ByteBuf; * * @see ActiveMQBuffers */ -public interface ActiveMQBuffer { +public interface ActiveMQBuffer extends DataInput { /** * Returns the underlying Netty's ByteBuf @@ -642,7 +643,7 @@ public interface ActiveMQBuffer { * @return an unsigned byte at the current {@code readerIndex} * @throws IndexOutOfBoundsException if {@code this.readableBytes} is less than {@code 1} */ - short readUnsignedByte(); + int readUnsignedByte(); /** * Gets a 16-bit short integer at the current {@code readerIndex} @@ -874,7 +875,7 @@ public interface ActiveMQBuffer { * @param length The number of bytes to skip * @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.readableBytes} */ - void skipBytes(int length); + int skipBytes(int length); /** * Sets the specified byte at the current {@code writerIndex} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java index 51fea91..1c830c6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java @@ -16,12 +16,14 @@ */ package org.apache.activemq.artemis.core.buffers.impl; +import java.io.IOException; import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.UTF8Util; @@ -350,7 +352,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { return new ChannelBufferWrapper(buffer.readSlice(length), releasable); } - public short readUnsignedByte() { + public int readUnsignedByte() { return buffer.readUnsignedByte(); } @@ -426,8 +428,9 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { buffer.setShort(index, value); } - public void skipBytes(final int length) { + public int skipBytes(final int length) { buffer.skipBytes(length); + return length; } public ActiveMQBuffer slice() { @@ -510,4 +513,23 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { buffer.writeShort(value); } + /** from {@link java.io.DataInput} interface */ + @Override + public void readFully(byte[] b) throws IOException { + readBytes(b); + } + + /** from {@link java.io.DataInput} interface */ + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + readBytes(b, off, len); + } + + /** from {@link java.io.DataInput} interface */ + @Override + public String readLine() throws IOException { + return ByteUtil.readLine(this); + } + + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index 6e2ca99..b7ff841 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; public class ByteUtil { @@ -67,4 +68,15 @@ public class ByteUtil { return buffer.array(); } + + public static String readLine(ActiveMQBuffer buffer) { + StringBuilder sb = new StringBuilder(""); + char c = buffer.readChar(); + while (c != '\n') { + sb.append(c); + c = buffer.readChar(); + } + return sb.toString(); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java index 1e93ced..ae711bf 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.client.impl; import java.io.DataInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -27,8 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; +import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.InflaterReader; import org.apache.activemq.artemis.utils.InflaterWriter; import org.apache.activemq.artemis.utils.UTF8Util; @@ -302,9 +303,9 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } - public short readUnsignedByte() { + public int readUnsignedByte() { try { - return (short) getStream().readUnsignedByte(); + return getStream().readUnsignedByte(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -391,18 +392,39 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll dst.put(bytesToGet); } - public void skipBytes(final int length) { + public int skipBytes(final int length) { try { for (int i = 0; i < length; i++) { getStream().read(); } + return length; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } + + /** from {@link java.io.DataInput} interface */ + @Override + public void readFully(byte[] b) throws IOException { + readBytes(b); + } + + /** from {@link java.io.DataInput} interface */ + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + readBytes(b, off, len); + } + + /** from {@link java.io.DataInput} interface */ + @Override + public String readLine() throws IOException { + return getStream().readLine(); + } + + public void writeByte(final byte value) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index 7f44cff..ad79e82 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.UTF8Util; @@ -666,7 +667,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } - public short readUnsignedByte() { + public int readUnsignedByte() { return (short) (readByte() & 0xFF); } @@ -758,11 +759,12 @@ public class LargeMessageControllerImpl implements LargeMessageController { readerIndex += length; } - public void skipBytes(final int length) { + public int skipBytes(final int length) { long newReaderIndex = readerIndex + length; checkForPacket(newReaderIndex); readerIndex = newReaderIndex; + return length; } public void writeByte(final byte value) { @@ -1176,7 +1178,24 @@ public class LargeMessageControllerImpl implements LargeMessageController { } } } + } + + /** from {@link java.io.DataInput} interface */ + @Override + public void readFully(byte[] b) throws IOException { + readBytes(b); + } + /** from {@link java.io.DataInput} interface */ + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + readBytes(b, off, len); + } + + /** from {@link java.io.DataInput} interface */ + @Override + public String readLine() throws IOException { + return ByteUtil.readLine(this); } public ByteBuf byteBuf() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java index 806a321..a8dce4a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/BytesMessageUtil.java @@ -16,115 +16,115 @@ */ package org.apache.activemq.artemis.reader; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; public class BytesMessageUtil extends MessageUtil { - public static boolean bytesReadBoolean(Message message) { - return getBodyBuffer(message).readBoolean(); + public static boolean bytesReadBoolean(ActiveMQBuffer message) { + return message.readBoolean(); } - public static byte bytesReadByte(Message message) { - return getBodyBuffer(message).readByte(); + public static byte bytesReadByte(ActiveMQBuffer message) { + return message.readByte(); } - public static int bytesReadUnsignedByte(Message message) { - return getBodyBuffer(message).readUnsignedByte(); + public static int bytesReadUnsignedByte(ActiveMQBuffer message) { + return message.readUnsignedByte(); } - public static short bytesReadShort(Message message) { - return getBodyBuffer(message).readShort(); + public static short bytesReadShort(ActiveMQBuffer message) { + return message.readShort(); } - public static int bytesReadUnsignedShort(Message message) { - return getBodyBuffer(message).readUnsignedShort(); + public static int bytesReadUnsignedShort(ActiveMQBuffer message) { + return message.readUnsignedShort(); } - public static char bytesReadChar(Message message) { - return (char) getBodyBuffer(message).readShort(); + public static char bytesReadChar(ActiveMQBuffer message) { + return (char) message.readShort(); } - public static int bytesReadInt(Message message) { - return getBodyBuffer(message).readInt(); + public static int bytesReadInt(ActiveMQBuffer message) { + return message.readInt(); } - public static long bytesReadLong(Message message) { - return getBodyBuffer(message).readLong(); + public static long bytesReadLong(ActiveMQBuffer message) { + return message.readLong(); } - public static float bytesReadFloat(Message message) { - return Float.intBitsToFloat(getBodyBuffer(message).readInt()); + public static float bytesReadFloat(ActiveMQBuffer message) { + return Float.intBitsToFloat(message.readInt()); } - public static double bytesReadDouble(Message message) { - return Double.longBitsToDouble(getBodyBuffer(message).readLong()); + public static double bytesReadDouble(ActiveMQBuffer message) { + return Double.longBitsToDouble(message.readLong()); } - public static String bytesReadUTF(Message message) { - return getBodyBuffer(message).readUTF(); + public static String bytesReadUTF(ActiveMQBuffer message) { + return message.readUTF(); } - public static int bytesReadBytes(Message message, final byte[] value) { + public static int bytesReadBytes(ActiveMQBuffer message, final byte[] value) { return bytesReadBytes(message, value, value.length); } - public static int bytesReadBytes(Message message, final byte[] value, final int length) { - if (!getBodyBuffer(message).readable()) { + public static int bytesReadBytes(ActiveMQBuffer message, final byte[] value, final int length) { + if (!message.readable()) { return -1; } - int read = Math.min(length, getBodyBuffer(message).readableBytes()); + int read = Math.min(length, message.readableBytes()); if (read != 0) { - getBodyBuffer(message).readBytes(value, 0, read); + message.readBytes(value, 0, read); } return read; } - public static void bytesWriteBoolean(Message message, boolean value) { - getBodyBuffer(message).writeBoolean(value); + public static void bytesWriteBoolean(ActiveMQBuffer message, boolean value) { + message.writeBoolean(value); } - public static void bytesWriteByte(Message message, byte value) { - getBodyBuffer(message).writeByte(value); + public static void bytesWriteByte(ActiveMQBuffer message, byte value) { + message.writeByte(value); } - public static void bytesWriteShort(Message message, short value) { - getBodyBuffer(message).writeShort(value); + public static void bytesWriteShort(ActiveMQBuffer message, short value) { + message.writeShort(value); } - public static void bytesWriteChar(Message message, char value) { - getBodyBuffer(message).writeShort((short) value); + public static void bytesWriteChar(ActiveMQBuffer message, char value) { + message.writeShort((short) value); } - public static void bytesWriteInt(Message message, int value) { - getBodyBuffer(message).writeInt(value); + public static void bytesWriteInt(ActiveMQBuffer message, int value) { + message.writeInt(value); } - public static void bytesWriteLong(Message message, long value) { - getBodyBuffer(message).writeLong(value); + public static void bytesWriteLong(ActiveMQBuffer message, long value) { + message.writeLong(value); } - public static void bytesWriteFloat(Message message, float value) { - getBodyBuffer(message).writeInt(Float.floatToIntBits(value)); + public static void bytesWriteFloat(ActiveMQBuffer message, float value) { + message.writeInt(Float.floatToIntBits(value)); } - public static void bytesWriteDouble(Message message, double value) { - getBodyBuffer(message).writeLong(Double.doubleToLongBits(value)); + public static void bytesWriteDouble(ActiveMQBuffer message, double value) { + message.writeLong(Double.doubleToLongBits(value)); } - public static void bytesWriteUTF(Message message, String value) { - getBodyBuffer(message).writeUTF(value); + public static void bytesWriteUTF(ActiveMQBuffer message, String value) { + message.writeUTF(value); } - public static void bytesWriteBytes(Message message, byte[] value) { - getBodyBuffer(message).writeBytes(value); + public static void bytesWriteBytes(ActiveMQBuffer message, byte[] value) { + message.writeBytes(value); } - public static void bytesWriteBytes(Message message, final byte[] value, final int offset, final int length) { - getBodyBuffer(message).writeBytes(value, offset, length); + public static void bytesWriteBytes(ActiveMQBuffer message, final byte[] value, final int offset, final int length) { + message.writeBytes(value, offset, length); } /** @@ -134,7 +134,7 @@ public class BytesMessageUtil extends MessageUtil { * @param value * @return */ - public static boolean bytesWriteObject(Message message, Object value) { + public static boolean bytesWriteObject(ActiveMQBuffer message, Object value) { if (value == null) { throw new NullPointerException("Attempt to write a null value"); } @@ -175,8 +175,8 @@ public class BytesMessageUtil extends MessageUtil { return true; } - public static void bytesMessageReset(Message message) { - getBodyBuffer(message).resetReaderIndex(); + public static void bytesMessageReset(ActiveMQBuffer message) { + message.resetReaderIndex(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java index 9ae4798..65aeccb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MapMessageUtil.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.reader; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.utils.TypedProperties; public class MapMessageUtil extends MessageUtil { @@ -25,16 +24,15 @@ public class MapMessageUtil extends MessageUtil { /** * Utility method to set the map on a message body */ - public static void writeBodyMap(Message message, TypedProperties properties) { - ActiveMQBuffer buff = getBodyBuffer(message); - buff.resetWriterIndex(); - properties.encode(buff); + public static void writeBodyMap(ActiveMQBuffer message, TypedProperties properties) { + message.resetWriterIndex(); + properties.encode(message); } /** * Utility method to set the map on a message body */ - public static TypedProperties readBodyMap(Message message) { + public static TypedProperties readBodyMap(ActiveMQBuffer message) { TypedProperties map = new TypedProperties(); readBodyMap(message, map); return map; @@ -43,10 +41,9 @@ public class MapMessageUtil extends MessageUtil { /** * Utility method to set the map on a message body */ - public static void readBodyMap(Message message, TypedProperties map) { - ActiveMQBuffer buff = getBodyBuffer(message); - buff.resetReaderIndex(); - map.decode(buff); + public static void readBodyMap(ActiveMQBuffer message, TypedProperties map) { + message.resetReaderIndex(); + map.decode(message); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java index 9f1a598..b56abc4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java @@ -21,7 +21,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; @@ -52,9 +51,9 @@ public class MessageUtil { public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__AMQ_CID"); - public static ActiveMQBuffer getBodyBuffer(Message message) { - return message.getBodyBuffer(); - } +// public static ActiveMQBuffer getBodyBuffer(Message message) { +// return message.getBodyBuffer(); +// } public static byte[] getJMSCorrelationIDAsBytes(Message message) { Object obj = message.getObjectProperty(CORRELATIONID_HEADER_NAME); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java index d59662f..dbba989 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/StreamMessageUtil.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.reader; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.utils.DataConstants; @@ -27,11 +26,10 @@ public class StreamMessageUtil extends MessageUtil { * Method to read boolean values out of the Stream protocol existent on JMS Stream Messages * Throws IllegalStateException if the type was invalid * - * @param message + * @param buff * @return */ - public static boolean streamReadBoolean(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static boolean streamReadBoolean(ActiveMQBuffer buff) { byte type = buff.readByte(); switch (type) { @@ -46,8 +44,7 @@ public class StreamMessageUtil extends MessageUtil { } - public static byte streamReadByte(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static byte streamReadByte(ActiveMQBuffer buff) { int index = buff.readerIndex(); try { byte type = buff.readByte(); @@ -68,8 +65,7 @@ public class StreamMessageUtil extends MessageUtil { } - public static short streamReadShort(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static short streamReadShort(ActiveMQBuffer buff) { byte type = buff.readByte(); switch (type) { case DataConstants.BYTE: @@ -84,8 +80,7 @@ public class StreamMessageUtil extends MessageUtil { } } - public static char streamReadChar(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static char streamReadChar(ActiveMQBuffer buff) { byte type = buff.readByte(); switch (type) { case DataConstants.CHAR: @@ -104,8 +99,7 @@ public class StreamMessageUtil extends MessageUtil { } - public static int streamReadInteger(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static int streamReadInteger(ActiveMQBuffer buff) { byte type = buff.readByte(); switch (type) { case DataConstants.BYTE: @@ -122,8 +116,7 @@ public class StreamMessageUtil extends MessageUtil { } } - public static long streamReadLong(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static long streamReadLong(ActiveMQBuffer buff) { byte type = buff.readByte(); switch (type) { case DataConstants.BYTE: @@ -142,8 +135,7 @@ public class StreamMessageUtil extends MessageUtil { } } - public static float streamReadFloat(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static float streamReadFloat(ActiveMQBuffer buff) { byte type = buff.readByte(); switch (type) { case DataConstants.FLOAT: @@ -156,8 +148,7 @@ public class StreamMessageUtil extends MessageUtil { } } - public static double streamReadDouble(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static double streamReadDouble(ActiveMQBuffer buff) { byte type = buff.readByte(); switch (type) { case DataConstants.FLOAT: @@ -172,8 +163,7 @@ public class StreamMessageUtil extends MessageUtil { } } - public static String streamReadString(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static String streamReadString(ActiveMQBuffer buff) { byte type = buff.readByte(); switch (type) { case DataConstants.BOOLEAN: @@ -204,12 +194,10 @@ public class StreamMessageUtil extends MessageUtil { * It will return remainingBytes, bytesRead * * @param remainingBytes remaining Bytes from previous read. Send it to 0 if it was the first call for the message - * @param message + * @param buff * @return a pair of remaining bytes and bytes read */ - public static Pair<Integer, Integer> streamReadBytes(Message message, int remainingBytes, byte[] value) { - ActiveMQBuffer buff = getBodyBuffer(message); - + public static Pair<Integer, Integer> streamReadBytes(ActiveMQBuffer buff, int remainingBytes, byte[] value) { if (remainingBytes == -1) { return new Pair<>(0, -1); } @@ -230,9 +218,7 @@ public class StreamMessageUtil extends MessageUtil { } - public static Object streamReadObject(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); - + public static Object streamReadObject(ActiveMQBuffer buff) { byte type = buff.readByte(); switch (type) { case DataConstants.BOOLEAN: http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java index c7515fc..c9ece4d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/TextMessageUtil.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.reader; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; public class TextMessageUtil extends MessageUtil { @@ -25,8 +24,7 @@ public class TextMessageUtil extends MessageUtil { /** * Utility method to set the Text message on a message body */ - public static void writeBodyText(Message message, SimpleString text) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static void writeBodyText(ActiveMQBuffer buff, SimpleString text) { buff.clear(); buff.writeNullableSimpleString(text); } @@ -34,8 +32,7 @@ public class TextMessageUtil extends MessageUtil { /** * Utility method to set the Text message on a message body */ - public static SimpleString readBodyText(Message message) { - ActiveMQBuffer buff = getBodyBuffer(message); + public static SimpleString readBodyText(ActiveMQBuffer buff) { buff.resetReaderIndex(); return buff.readNullableSimpleString(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java index 72770a4..09d6b18 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java @@ -102,7 +102,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public boolean readBoolean() throws JMSException { checkRead(); try { - return bytesReadBoolean(message); + return bytesReadBoolean(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -112,7 +112,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public byte readByte() throws JMSException { checkRead(); try { - return bytesReadByte(message); + return bytesReadByte(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -122,7 +122,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public int readUnsignedByte() throws JMSException { checkRead(); try { - return bytesReadUnsignedByte(message); + return bytesReadUnsignedByte(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -132,7 +132,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public short readShort() throws JMSException { checkRead(); try { - return bytesReadShort(message); + return bytesReadShort(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -142,7 +142,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public int readUnsignedShort() throws JMSException { checkRead(); try { - return bytesReadUnsignedShort(message); + return bytesReadUnsignedShort(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -152,7 +152,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public char readChar() throws JMSException { checkRead(); try { - return bytesReadChar(message); + return bytesReadChar(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -162,7 +162,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public int readInt() throws JMSException { checkRead(); try { - return bytesReadInt(message); + return bytesReadInt(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -172,7 +172,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public long readLong() throws JMSException { checkRead(); try { - return bytesReadLong(message); + return bytesReadLong(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -182,7 +182,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public float readFloat() throws JMSException { checkRead(); try { - return bytesReadFloat(message); + return bytesReadFloat(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -192,7 +192,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public double readDouble() throws JMSException { checkRead(); try { - return bytesReadDouble(message); + return bytesReadDouble(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -202,7 +202,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public String readUTF() throws JMSException { checkRead(); try { - return bytesReadUTF(message); + return bytesReadUTF(message.getBodyBuffer()); } catch (IndexOutOfBoundsException e) { throw new MessageEOFException(""); @@ -217,59 +217,59 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public int readBytes(final byte[] value) throws JMSException { checkRead(); - return bytesReadBytes(message, value); + return bytesReadBytes(message.getBodyBuffer(), value); } public int readBytes(final byte[] value, final int length) throws JMSException { checkRead(); - return bytesReadBytes(message, value, length); + return bytesReadBytes(message.getBodyBuffer(), value, length); } public void writeBoolean(final boolean value) throws JMSException { checkWrite(); - bytesWriteBoolean(message, value); + bytesWriteBoolean(message.getBodyBuffer(), value); } public void writeByte(final byte value) throws JMSException { checkWrite(); - bytesWriteByte(message, value); + bytesWriteByte(message.getBodyBuffer(), value); } public void writeShort(final short value) throws JMSException { checkWrite(); - bytesWriteShort(message, value); + bytesWriteShort(message.getBodyBuffer(), value); } public void writeChar(final char value) throws JMSException { checkWrite(); - bytesWriteChar(message, value); + bytesWriteChar(message.getBodyBuffer(), value); } public void writeInt(final int value) throws JMSException { checkWrite(); - bytesWriteInt(message, value); + bytesWriteInt(message.getBodyBuffer(), value); } public void writeLong(final long value) throws JMSException { checkWrite(); - bytesWriteLong(message, value); + bytesWriteLong(message.getBodyBuffer(), value); } public void writeFloat(final float value) throws JMSException { checkWrite(); - bytesWriteFloat(message, value); + bytesWriteFloat(message.getBodyBuffer(), value); } public void writeDouble(final double value) throws JMSException { checkWrite(); - bytesWriteDouble(message, value); + bytesWriteDouble(message.getBodyBuffer(), value); } public void writeUTF(final String value) throws JMSException { checkWrite(); try { - bytesWriteUTF(message, value); + bytesWriteUTF(message.getBodyBuffer(), value); } catch (Exception e) { JMSException je = new JMSException("Failed to write UTF"); @@ -282,17 +282,17 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag public void writeBytes(final byte[] value) throws JMSException { checkWrite(); - bytesWriteBytes(message, value); + bytesWriteBytes(message.getBodyBuffer(), value); } public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException { checkWrite(); - bytesWriteBytes(message, value, offset, length); + bytesWriteBytes(message.getBodyBuffer(), value, offset, length); } public void writeObject(final Object value) throws JMSException { checkWrite(); - if (!bytesWriteObject(message, value)) { + if (!bytesWriteObject(message.getBodyBuffer(), value)) { throw new MessageFormatException("Invalid object for properties"); } } @@ -304,7 +304,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag bodyLength = message.getBodySize(); } - bytesMessageReset(message); + bytesMessageReset(message.getBodyBuffer()); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java index 35ee0e4..747316d 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java @@ -317,7 +317,7 @@ public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMess @Override public void doBeforeSend() throws Exception { if (invalid) { - writeBodyMap(message, map); + writeBodyMap(message.getBodyBuffer(), map); invalid = false; } @@ -328,7 +328,7 @@ public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMess public void doBeforeReceive() throws ActiveMQException { super.doBeforeReceive(); - readBodyMap(message, map); + readBodyMap(message.getBodyBuffer(), map); } // Package protected --------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java index 4a8d5a8..315721f 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java @@ -89,7 +89,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public boolean readBoolean() throws JMSException { checkRead(); try { - return streamReadBoolean(message); + return streamReadBoolean(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -103,7 +103,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre checkRead(); try { - return streamReadByte(message); + return streamReadByte(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -116,7 +116,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public short readShort() throws JMSException { checkRead(); try { - return streamReadShort(message); + return streamReadShort(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -129,7 +129,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public char readChar() throws JMSException { checkRead(); try { - return streamReadChar(message); + return streamReadChar(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -142,7 +142,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public int readInt() throws JMSException { checkRead(); try { - return streamReadInteger(message); + return streamReadInteger(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -155,7 +155,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public long readLong() throws JMSException { checkRead(); try { - return streamReadLong(message); + return streamReadLong(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -168,7 +168,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public float readFloat() throws JMSException { checkRead(); try { - return streamReadFloat(message); + return streamReadFloat(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -181,7 +181,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public double readDouble() throws JMSException { checkRead(); try { - return streamReadDouble(message); + return streamReadDouble(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -194,7 +194,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public String readString() throws JMSException { checkRead(); try { - return streamReadString(message); + return streamReadString(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -212,7 +212,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public int readBytes(final byte[] value) throws JMSException { checkRead(); try { - Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value); + Pair<Integer, Integer> pairRead = streamReadBytes(message.getBodyBuffer(), len, value); len = pairRead.getA(); return pairRead.getB(); @@ -228,7 +228,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre public Object readObject() throws JMSException { checkRead(); try { - return streamReadObject(message); + return streamReadObject(message.getBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java index b9fba2d..bd0a615 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTextMessage.java @@ -84,7 +84,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage this.text = null; } - writeBodyText(message, this.text); + writeBodyText(message.getBodyBuffer(), this.text); } public String getText() { @@ -109,7 +109,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage public void doBeforeReceive() throws ActiveMQException { super.doBeforeReceive(); - text = readBodyText(message); + text = readBodyText(message.getBodyBuffer()); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java index 76e2515..cc436f4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java @@ -60,128 +60,128 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess @Override public boolean readBoolean() throws JMSException { - return bytesReadBoolean(message); + return bytesReadBoolean(getReadBodyBuffer()); } @Override public byte readByte() throws JMSException { - return bytesReadByte(message); + return bytesReadByte(getReadBodyBuffer()); } @Override public int readUnsignedByte() throws JMSException { - return bytesReadUnsignedByte(message); + return bytesReadUnsignedByte(getReadBodyBuffer()); } @Override public short readShort() throws JMSException { - return bytesReadShort(message); + return bytesReadShort(getReadBodyBuffer()); } @Override public int readUnsignedShort() throws JMSException { - return bytesReadUnsignedShort(message); + return bytesReadUnsignedShort(getReadBodyBuffer()); } @Override public char readChar() throws JMSException { - return bytesReadChar(message); + return bytesReadChar(getReadBodyBuffer()); } @Override public int readInt() throws JMSException { - return bytesReadInt(message); + return bytesReadInt(getReadBodyBuffer()); } @Override public long readLong() throws JMSException { - return bytesReadLong(message); + return bytesReadLong(getReadBodyBuffer()); } @Override public float readFloat() throws JMSException { - return bytesReadFloat(message); + return bytesReadFloat(getReadBodyBuffer()); } @Override public double readDouble() throws JMSException { - return bytesReadDouble(message); + return bytesReadDouble(getReadBodyBuffer()); } @Override public String readUTF() throws JMSException { - return bytesReadUTF(message); + return bytesReadUTF(getReadBodyBuffer()); } @Override public int readBytes(byte[] value) throws JMSException { - return bytesReadBytes(message, value); + return bytesReadBytes(getReadBodyBuffer(), value); } @Override public int readBytes(byte[] value, int length) throws JMSException { - return bytesReadBytes(message, value, length); + return bytesReadBytes(getReadBodyBuffer(), value, length); } @Override public void writeBoolean(boolean value) throws JMSException { - bytesWriteBoolean(message, value); + bytesWriteBoolean(getWriteBodyBuffer(), value); } @Override public void writeByte(byte value) throws JMSException { - bytesWriteByte(message, value); + bytesWriteByte(getWriteBodyBuffer(), value); } @Override public void writeShort(short value) throws JMSException { - bytesWriteShort(message, value); + bytesWriteShort(getWriteBodyBuffer(), value); } @Override public void writeChar(char value) throws JMSException { - bytesWriteChar(message, value); + bytesWriteChar(getWriteBodyBuffer(), value); } @Override public void writeInt(int value) throws JMSException { - bytesWriteInt(message, value); + bytesWriteInt(getWriteBodyBuffer(), value); } @Override public void writeLong(long value) throws JMSException { - bytesWriteLong(message, value); + bytesWriteLong(getWriteBodyBuffer(), value); } @Override public void writeFloat(float value) throws JMSException { - bytesWriteFloat(message, value); + bytesWriteFloat(getWriteBodyBuffer(), value); } @Override public void writeDouble(double value) throws JMSException { - bytesWriteDouble(message, value); + bytesWriteDouble(getWriteBodyBuffer(), value); } @Override public void writeUTF(String value) throws JMSException { - bytesWriteUTF(message, value); + bytesWriteUTF(getWriteBodyBuffer(), value); } @Override public void writeBytes(byte[] value) throws JMSException { - bytesWriteBytes(message, value); + bytesWriteBytes(getWriteBodyBuffer(), value); } @Override public void writeBytes(byte[] value, int offset, int length) throws JMSException { - bytesWriteBytes(message, value, offset, length); + bytesWriteBytes(getWriteBodyBuffer(), value, offset, length); } @Override public void writeObject(Object value) throws JMSException { - if (!bytesWriteObject(message, value)) { + if (!bytesWriteObject(getWriteBodyBuffer(), value)) { throw new JMSException("Can't make conversion of " + value + " to any known type"); } } @@ -199,7 +199,8 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess @Override public void reset() throws JMSException { - bytesMessageReset(message); + bytesMessageReset(getReadBodyBuffer()); + bytesMessageReset(getWriteBodyBuffer()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java index bbece71..e41a1a3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java @@ -247,12 +247,12 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe public void encode() throws Exception { super.encode(); - writeBodyMap(message, map); + writeBodyMap(getWriteBodyBuffer(), map); } public void decode() throws Exception { super.decode(); - readBodyMap(message, map); + readBodyMap(getReadBodyBuffer(), map); } // Package protected --------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java index 315dd12..afacd21 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java @@ -23,6 +23,7 @@ import javax.jms.Message; import java.util.Collections; import java.util.Enumeration; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.MessageInternal; @@ -45,6 +46,24 @@ public class ServerJMSMessage implements Message { this.deliveryCount = deliveryCount; } + private ActiveMQBuffer readBodyBuffer; + + /** When reading we use a protected copy so multi-threads can work fine */ + protected ActiveMQBuffer getReadBodyBuffer() { + if (readBodyBuffer == null) { + // to avoid clashes between multiple threads + readBodyBuffer = message.getBodyBufferCopy(); + } + return readBodyBuffer; + } + + /** When writing on the conversion we use the buffer directly */ + protected ActiveMQBuffer getWriteBodyBuffer() { + readBodyBuffer = null; // it invalidates this buffer if anything is written + return message.getBodyBuffer(); + } + + @Override public final String getJMSMessageID() throws JMSException { return null; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java index 1afc8eb..9b70f57 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java @@ -21,13 +21,11 @@ import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; import javax.jms.StreamMessage; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.utils.DataConstants; -import static org.apache.activemq.artemis.reader.MessageUtil.getBodyBuffer; import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean; import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadByte; import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBytes; @@ -48,14 +46,14 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) { super(message, deliveryCount); - } // StreamMessage implementation ---------------------------------- public boolean readBoolean() throws JMSException { + try { - return streamReadBoolean(message); + return streamReadBoolean(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -67,7 +65,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public byte readByte() throws JMSException { try { - return streamReadByte(message); + return streamReadByte(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -80,7 +78,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public short readShort() throws JMSException { try { - return streamReadShort(message); + return streamReadShort(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -93,7 +91,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public char readChar() throws JMSException { try { - return streamReadChar(message); + return streamReadChar(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -106,7 +104,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public int readInt() throws JMSException { try { - return streamReadInteger(message); + return streamReadInteger(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -119,7 +117,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public long readLong() throws JMSException { try { - return streamReadLong(message); + return streamReadLong(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -132,7 +130,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public float readFloat() throws JMSException { try { - return streamReadFloat(message); + return streamReadFloat(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -145,7 +143,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public double readDouble() throws JMSException { try { - return streamReadDouble(message); + return streamReadDouble(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -158,7 +156,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public String readString() throws JMSException { try { - return streamReadString(message); + return streamReadString(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -176,7 +174,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public int readBytes(final byte[] value) throws JMSException { try { - Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value); + Pair<Integer, Integer> pairRead = streamReadBytes(getReadBodyBuffer(), len, value); len = pairRead.getA(); return pairRead.getB(); @@ -191,11 +189,11 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public Object readObject() throws JMSException { - if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition()) { + if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) { throw new MessageEOFException(""); } try { - return streamReadObject(message); + return streamReadObject(getReadBodyBuffer()); } catch (IllegalStateException e) { throw new MessageFormatException(e.getMessage()); @@ -207,70 +205,70 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public void writeBoolean(final boolean value) throws JMSException { - getBuffer().writeByte(DataConstants.BOOLEAN); - getBuffer().writeBoolean(value); + getWriteBodyBuffer().writeByte(DataConstants.BOOLEAN); + getWriteBodyBuffer().writeBoolean(value); } public void writeByte(final byte value) throws JMSException { - getBuffer().writeByte(DataConstants.BYTE); - getBuffer().writeByte(value); + getWriteBodyBuffer().writeByte(DataConstants.BYTE); + getWriteBodyBuffer().writeByte(value); } public void writeShort(final short value) throws JMSException { - getBuffer().writeByte(DataConstants.SHORT); - getBuffer().writeShort(value); + getWriteBodyBuffer().writeByte(DataConstants.SHORT); + getWriteBodyBuffer().writeShort(value); } public void writeChar(final char value) throws JMSException { - getBuffer().writeByte(DataConstants.CHAR); - getBuffer().writeShort((short) value); + getWriteBodyBuffer().writeByte(DataConstants.CHAR); + getWriteBodyBuffer().writeShort((short) value); } public void writeInt(final int value) throws JMSException { - getBuffer().writeByte(DataConstants.INT); - getBuffer().writeInt(value); + getWriteBodyBuffer().writeByte(DataConstants.INT); + getWriteBodyBuffer().writeInt(value); } public void writeLong(final long value) throws JMSException { - getBuffer().writeByte(DataConstants.LONG); - getBuffer().writeLong(value); + getWriteBodyBuffer().writeByte(DataConstants.LONG); + getWriteBodyBuffer().writeLong(value); } public void writeFloat(final float value) throws JMSException { - getBuffer().writeByte(DataConstants.FLOAT); - getBuffer().writeInt(Float.floatToIntBits(value)); + getWriteBodyBuffer().writeByte(DataConstants.FLOAT); + getWriteBodyBuffer().writeInt(Float.floatToIntBits(value)); } public void writeDouble(final double value) throws JMSException { - getBuffer().writeByte(DataConstants.DOUBLE); - getBuffer().writeLong(Double.doubleToLongBits(value)); + getWriteBodyBuffer().writeByte(DataConstants.DOUBLE); + getWriteBodyBuffer().writeLong(Double.doubleToLongBits(value)); } public void writeString(final String value) throws JMSException { - getBuffer().writeByte(DataConstants.STRING); - getBuffer().writeNullableString(value); + getWriteBodyBuffer().writeByte(DataConstants.STRING); + getWriteBodyBuffer().writeNullableString(value); } public void writeBytes(final byte[] value) throws JMSException { - getBuffer().writeByte(DataConstants.BYTES); - getBuffer().writeInt(value.length); - getBuffer().writeBytes(value); + getWriteBodyBuffer().writeByte(DataConstants.BYTES); + getWriteBodyBuffer().writeInt(value.length); + getWriteBodyBuffer().writeBytes(value); } public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException { - getBuffer().writeByte(DataConstants.BYTES); - getBuffer().writeInt(length); - getBuffer().writeBytes(value, offset, length); + getWriteBodyBuffer().writeByte(DataConstants.BYTES); + getWriteBodyBuffer().writeInt(length); + getWriteBodyBuffer().writeBytes(value, offset, length); } public void writeObject(final Object value) throws JMSException { @@ -313,7 +311,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St } public void reset() throws JMSException { - getBuffer().resetReaderIndex(); + getWriteBodyBuffer().resetReaderIndex(); } // ActiveMQRAMessage overrides ---------------------------------------- @@ -322,11 +320,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St public void clearBody() throws JMSException { super.clearBody(); - getBuffer().clear(); - } - - private ActiveMQBuffer getBuffer() { - return message.getBodyBuffer(); + getWriteBodyBuffer().clear(); } public void decode() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java index 95e24b5..3191067 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSTextMessage.java @@ -63,7 +63,7 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag this.text = null; } - writeBodyText(message, this.text); + writeBodyText(getWriteBodyBuffer(), this.text); } public String getText() { @@ -84,12 +84,12 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag public void encode() throws Exception { super.encode(); - writeBodyText(message, text); + writeBodyText(getWriteBodyBuffer(), text); } public void decode() throws Exception { super.decode(); - text = readBodyText(message); + text = readBodyText(getReadBodyBuffer()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java index 0b5cb51..b563e61 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.proton; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; @@ -473,7 +474,7 @@ public class TestConversions extends Assert { } @Override - public short readUnsignedByte() { + public int readUnsignedByte() { return 0; } @@ -588,8 +589,8 @@ public class TestConversions extends Assert { } @Override - public void skipBytes(int length) { - + public int skipBytes(int length) { + return length; } @Override @@ -683,6 +684,19 @@ public class TestConversions extends Assert { } @Override + public void readFully(byte[] b) throws IOException { + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + } + + @Override + public String readLine() throws IOException { + return null; + } + + @Override public ActiveMQBuffer copy() { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java deleted file mode 100644 index 32d37d4..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/DataInputWrapper.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.utils.UTF8Util; -import org.apache.activemq.artemis.utils.UTF8Util.StringUtilBuffer; - -public class DataInputWrapper implements DataInput { - - private static final int DEFAULT_CAPACITY = 1024 * 1024; - private static final NotEnoughBytesException exception = new NotEnoughBytesException(); - private ByteBuffer internalBuffer; - - public DataInputWrapper() { - this(DEFAULT_CAPACITY); - } - - public DataInputWrapper(int capacity) { - this.internalBuffer = ByteBuffer.allocateDirect(capacity); - this.internalBuffer.mark(); - this.internalBuffer.limit(0); - } - - public void receiveData(byte[] data) { - int newSize = data.length; - int freeSpace = internalBuffer.capacity() - internalBuffer.limit(); - if (freeSpace < newSize) { - internalBuffer.reset(); - internalBuffer.compact(); - if (internalBuffer.remaining() < newSize) { - //need to enlarge - } - //make sure mark is at zero and position is at effective limit - int pos = internalBuffer.position(); - internalBuffer.position(0); - internalBuffer.mark(); - internalBuffer.position(pos); - } - else { - internalBuffer.position(internalBuffer.limit()); - internalBuffer.limit(internalBuffer.capacity()); - } - internalBuffer.put(data); - internalBuffer.limit(internalBuffer.position()); - internalBuffer.reset(); - } - - public void receiveData(ActiveMQBuffer buffer) { - int newSize = buffer.readableBytes(); - byte[] newData = new byte[newSize]; - buffer.readBytes(newData); - this.receiveData(newData); - } - - //invoke after each successful unmarshall - public void mark() { - this.internalBuffer.mark(); - } - - @Override - public void readFully(byte[] b) throws IOException { - readFully(b, 0, b.length); - } - - private void checkSize(int n) throws NotEnoughBytesException { - if (internalBuffer.remaining() < n) { - throw exception; - } - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - checkSize(len); - internalBuffer.get(b, off, len); - } - - @Override - public int skipBytes(int n) throws IOException { - checkSize(n); - int pos = internalBuffer.position(); - internalBuffer.position(pos + n); - return n; - } - - @Override - public boolean readBoolean() throws IOException { - checkSize(1); - byte b = internalBuffer.get(); - return b != 0; - } - - @Override - public byte readByte() throws IOException { - checkSize(1); - return this.internalBuffer.get(); - } - - @Override - public int readUnsignedByte() throws IOException { - checkSize(1); - return 0xFF & this.internalBuffer.get(); - } - - @Override - public short readShort() throws IOException { - checkSize(2); - return this.internalBuffer.getShort(); - } - - @Override - public int readUnsignedShort() throws IOException { - checkSize(2); - return 0xFFFF & this.internalBuffer.getShort(); - } - - @Override - public char readChar() throws IOException { - checkSize(2); - return this.internalBuffer.getChar(); - } - - @Override - public int readInt() throws IOException { - checkSize(4); - return this.internalBuffer.getInt(); - } - - @Override - public long readLong() throws IOException { - checkSize(8); - return this.internalBuffer.getLong(); - } - - @Override - public float readFloat() throws IOException { - checkSize(4); - return this.internalBuffer.getFloat(); - } - - @Override - public double readDouble() throws IOException { - checkSize(8); - return this.internalBuffer.getDouble(); - } - - @Override - public String readLine() throws IOException { - StringBuilder sb = new StringBuilder(""); - char c = this.readChar(); - while (c != '\n') { - sb.append(c); - c = this.readChar(); - } - return sb.toString(); - } - - @Override - public String readUTF() throws IOException { - StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer(); - - final int size = this.readUnsignedShort(); - - if (size > buffer.byteBuffer.length) { - buffer.resizeByteBuffer(size); - } - - if (size > buffer.charBuffer.length) { - buffer.resizeCharBuffer(size); - } - - int count = 0; - int byte1, byte2, byte3; - int charCount = 0; - - this.readFully(buffer.byteBuffer, 0, size); - - while (count < size) { - byte1 = buffer.byteBuffer[count++]; - - if (byte1 > 0 && byte1 <= 0x7F) { - buffer.charBuffer[charCount++] = (char) byte1; - } - else { - int c = byte1 & 0xff; - switch (c >> 4) { - case 0xc: - case 0xd: - byte2 = buffer.byteBuffer[count++]; - buffer.charBuffer[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F); - break; - case 0xe: - byte2 = buffer.byteBuffer[count++]; - byte3 = buffer.byteBuffer[count++]; - buffer.charBuffer[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0); - break; - default: - throw new InternalError("unhandled utf8 byte " + c); - } - } - } - - return new String(buffer.charBuffer, 0, charCount); - } - - public boolean readable() { - return this.internalBuffer.hasRemaining(); - } - -}