http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java b/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java new file mode 100644 index 0000000..fecff14 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import com.google.common.collect.Maps; +import com.google.common.io.BaseEncoding; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; + +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSCorrelationID; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSDeliveryMode; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSDeliveryTime; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSDestination; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSExpiration; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSMessageID; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSPriority; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSRedelivered; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSReplyTo; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSTimestamp; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSType; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_EXPIRATION_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_REDELIVERED_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_TIMESTAMP_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Boolean; +import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Integer; +import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Long; +import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Object; +import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2String; + +public abstract class AbstractJMSMessage implements javax.jms.Message { + + protected Map<JMSHeaderEnum, Object> headers = Maps.newHashMap(); + protected Map<String, Object> properties = Maps.newHashMap(); + + protected boolean writeOnly; + + @Override + public String getJMSMessageID() { + return cast2String(headers.get(JMSMessageID)); + } + + @Override + public void setJMSMessageID(String id) { + setHeader(JMSMessageID, id); + } + + @Override + public long getJMSTimestamp() { + if (headers.containsKey(JMSTimestamp)) { + return cast2Long(headers.get(JMSTimestamp)); + } + return JMS_TIMESTAMP_DEFAULT_VALUE; + } + + @Override + public void setJMSTimestamp(long timestamp) { + setHeader(JMSTimestamp, timestamp); + } + + @Override + public byte[] getJMSCorrelationIDAsBytes() { + String jmsCorrelationID = getJMSCorrelationID(); + if (jmsCorrelationID != null) { + try { + return BaseEncoding.base64().decode(jmsCorrelationID); + } + catch (Exception e) { + return jmsCorrelationID.getBytes(); + } + } + return null; + } + + @Override + public void setJMSCorrelationIDAsBytes(byte[] correlationID) { + String encodedText = BaseEncoding.base64().encode(correlationID); + setJMSCorrelationID(encodedText); + } + + @Override + public String getJMSCorrelationID() { + return cast2String(headers.get(JMSCorrelationID)); + } + + @Override + public void setJMSCorrelationID(String correlationID) { + setHeader(JMSCorrelationID, correlationID); + } + + @Override + public Destination getJMSReplyTo() { + return cast2Object(headers.get(JMSReplyTo), Destination.class); + } + + @Override + public void setJMSReplyTo(Destination replyTo) { + setHeader(JMSReplyTo, replyTo); + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + + @Override + public Destination getJMSDestination() { + return cast2Object(headers.get(JMSDestination), Destination.class); + } + + @Override + public void setJMSDestination(Destination destination) { + setHeader(JMSDestination, destination); + } + + @SuppressWarnings("unchecked") + public abstract <T> T getBody(Class<T> clazz) throws JMSException; + + public abstract byte[] getBody() throws JMSException; + + @Override + public int getJMSDeliveryMode() { + if (headers.containsKey(JMSDeliveryMode)) { + return cast2Integer(headers.get(JMSDeliveryMode)); + } + return JMS_DELIVERY_MODE_DEFAULT_VALUE; + } + + @Override + public void setJMSDeliveryMode(int deliveryMode) { + setHeader(JMSDeliveryMode, deliveryMode); + } + + @Override + public boolean getJMSRedelivered() { + if (headers.containsKey(JMSRedelivered)) { + return cast2Boolean(headers.get(JMSRedelivered)); + } + return JMS_REDELIVERED_DEFAULT_VALUE; + } + + @Override + public void setJMSRedelivered(boolean redelivered) { + setHeader(JMSRedelivered, redelivered); + } + + @Override + public String getJMSType() { + return cast2String(headers.get(JMSType)); + } + + @Override + public void setJMSType(String type) { + setHeader(JMSType, type); + } + + public Map<JMSHeaderEnum, Object> getHeaders() { + return this.headers; + } + + @Override + public long getJMSExpiration() { + if (headers.containsKey(JMSExpiration)) { + return cast2Long(headers.get(JMSExpiration)); + } + return JMS_EXPIRATION_DEFAULT_VALUE; + } + + @Override + public void setJMSExpiration(long expiration) { + setHeader(JMSExpiration, expiration); + } + + @Override + public int getJMSPriority() { + if (headers.containsKey(JMSPriority)) { + return cast2Integer(headers.get(JMSPriority)); + } + return JMS_PRIORITY_DEFAULT_VALUE; + } + + @Override + public void setJMSPriority(int priority) { + setHeader(JMSPriority, priority); + } + + @Override + public long getJMSDeliveryTime() throws JMSException { + if (headers.containsKey(JMSDeliveryTime)) { + return cast2Long(headers.get(JMSDeliveryTime)); + } + return JMS_DELIVERY_TIME_DEFAULT_VALUE; + } + + @Override + public void setJMSDeliveryTime(long deliveryTime) throws JMSException { + setHeader(JMSDeliveryTime, deliveryTime); + } + + private void setHeader(JMSHeaderEnum name, Object value) { + this.headers.put(name, value); + } + + public Map<String, Object> getProperties() { + return this.properties; + } + + public void setProperties(Map<String, Object> properties) { + this.properties = properties; + } + + @Override + public void acknowledge() throws JMSException { + //todo + throw new UnsupportedOperationException("Unsupported!"); + } + + @Override + public void clearProperties() { + this.properties.clear(); + } + + @Override + public void clearBody() { + this.writeOnly = true; + } + + @Override + public boolean propertyExists(String name) { + return properties.containsKey(name); + } + + @Override + public boolean getBooleanProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Boolean.valueOf(value.toString()); + } + return false; + } + + @Override + public byte getByteProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Byte.valueOf(value.toString()); + } + return 0; + } + + @Override + public short getShortProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Short.valueOf(value.toString()); + } + return 0; + } + + @Override + public int getIntProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Integer.valueOf(value.toString()); + } + return 0; + } + + @Override + public long getLongProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Long.valueOf(value.toString()); + } + return 0L; + } + + @Override + public float getFloatProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Float.valueOf(value.toString()); + } + return 0f; + } + + @Override + public double getDoubleProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Double.valueOf(value.toString()); + } + return 0d; + } + + @Override + public String getStringProperty(String name) throws JMSException { + if (propertyExists(name)) { + return getObjectProperty(name).toString(); + } + return null; + } + + @Override + public Object getObjectProperty(String name) throws JMSException { + return this.properties.get(name); + } + + @Override + public Enumeration<?> getPropertyNames() throws JMSException { + return Collections.enumeration(this.properties.keySet()); + } + + @Override + public void setBooleanProperty(String name, boolean value) { + setObjectProperty(name, value); + } + + @Override + public void setByteProperty(String name, byte value) { + setObjectProperty(name, value); + } + + @Override + public void setShortProperty(String name, short value) { + setObjectProperty(name, value); + } + + @Override + public void setIntProperty(String name, int value) { + setObjectProperty(name, value); + } + + @Override + public void setLongProperty(String name, long value) { + setObjectProperty(name, value); + } + + public void setFloatProperty(String name, float value) { + setObjectProperty(name, value); + } + + @Override + public void setDoubleProperty(String name, double value) { + setObjectProperty(name, value); + } + + @Override + public void setStringProperty(String name, String value) { + setObjectProperty(name, value); + } + + @Override + public abstract boolean isBodyAssignableTo(Class c) throws JMSException; + + @Override + public void setObjectProperty(String name, Object value) { + if (value instanceof Number || value instanceof String || value instanceof Boolean) { + this.properties.put(name, value); + } + else { + throw new IllegalArgumentException( + "Value should be boolean, byte, short, int, long, float, double, and String."); + } + } + + protected boolean isWriteOnly() { + return writeOnly; + } + + protected void checkIsWriteOnly() throws MessageNotWriteableException { + if (!writeOnly) { + throw new MessageNotWriteableException("Not writable"); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java b/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java new file mode 100644 index 0000000..a409118 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import javax.jms.IllegalStateRuntimeException; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; +import org.apache.rocketmq.jms.support.PrimitiveTypeCast; + +import static java.lang.String.format; + +/** + * RocketMQ ByteMessage. + */ +public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage { + + private byte[] bytesIn; + private DataInputStream dataAsInput; + + private ByteArrayOutputStream bytesOut; + private DataOutputStream dataAsOutput; + + protected boolean readOnly; + + /** + * Message created for reading + * + * @param data + */ + public JMSBytesMessage(byte[] data) { + this.bytesIn = data; + this.dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length)); + this.readOnly = true; + this.writeOnly = false; + } + + /** + * Message created to be sent + */ + public JMSBytesMessage() { + this.bytesOut = new ByteArrayOutputStream(); + this.dataAsOutput = new DataOutputStream(this.bytesOut); + this.readOnly = false; + this.writeOnly = true; + } + + @Override public byte[] getBody(Class clazz) throws JMSException { + byte[] result; + if (isBodyAssignableTo(clazz)) { + if (isWriteOnly()) { + result = bytesOut.toByteArray(); + this.reset(); + return result; + } + else if (isReadOnly()) { + result = Arrays.copyOf(bytesIn, bytesIn.length); + this.reset(); + return result; + } + else { + throw new IllegalStateRuntimeException("Message must be in write only or read only status"); + } + } + + throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); + } + + @Override public byte[] getBody() throws JMSException { + return getBody(byte[].class); + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return byte[].class.isAssignableFrom(c); + } + + @Override public long getBodyLength() throws JMSException { + if (isWriteOnly()) { + return bytesOut.size(); + } + else if (isReadOnly()) { + return bytesIn.length; + } + else { + throw new IllegalStateRuntimeException("Message must be in write only or read only status"); + } + } + + public boolean readBoolean() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readBoolean(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + private void checkIsReadOnly() throws MessageNotReadableException { + if (!isReadOnly()) { + throw new MessageNotReadableException("Not readable"); + } + if (dataAsInput == null) { + throw new MessageNotReadableException("No data to read"); + } + } + + public byte readByte() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readByte(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readUnsignedByte() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readUnsignedByte(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public short readShort() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readShort(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readUnsignedShort() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readUnsignedShort(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public char readChar() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readChar(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readInt() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readInt(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public long readLong() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readLong(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public float readFloat() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readFloat(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public double readDouble() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readDouble(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public String readUTF() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readUTF(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readBytes(byte[] value) throws JMSException { + checkIsReadOnly(); + + return readBytes(value, value.length); + } + + public int readBytes(byte[] value, int length) throws JMSException { + checkIsReadOnly(); + + if (length > value.length) { + throw new IndexOutOfBoundsException("length must be smaller than the length of value"); + } + if (dataAsInput == null) { + throw new MessageNotReadableException("Message is not readable! "); + } + try { + int offset = 0; + while (offset < length) { + int read = dataAsInput.read(value, offset, length - offset); + if (read < 0) { + break; + } + offset += read; + } + + if (offset == 0 && length != 0) { + return -1; + } + else { + return offset; + } + } + catch (IOException e) { + throw handleInputException(e); + } + + } + + public void writeBoolean(boolean value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeBoolean(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + private void initializeWriteIfNecessary() { + if (bytesOut == null) { + bytesOut = new ByteArrayOutputStream(); + } + if (dataAsOutput == null) { + dataAsOutput = new DataOutputStream(bytesOut); + } + } + + public void writeByte(byte value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeByte(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeShort(short value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeShort(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeChar(char value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeChar(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeInt(int value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeInt(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeLong(long value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeLong(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeFloat(float value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeFloat(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeDouble(double value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeDouble(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeUTF(String value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeUTF(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeBytes(byte[] value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + if (dataAsOutput == null) { + throw new MessageNotWriteableException("Message is not writable! "); + } + try { + dataAsOutput.write(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeBytes(byte[] value, int offset, int length) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + if (dataAsOutput == null) { + throw new MessageNotWriteableException("Message is not writable! "); + } + try { + dataAsOutput.write(value, offset, length); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeObject(Object value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + if (!PrimitiveTypeCast.isPrimitiveType(value)) { + throw new JMSException("Object must be primitive type"); + } + + try { + dataAsOutput.writeBytes(String.valueOf(value)); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void reset() throws JMSException { + try { + if (bytesOut != null) { + bytesOut.reset(); + } + if (this.dataAsInput != null) { + this.dataAsInput.reset(); + } + + this.readOnly = true; + } + catch (IOException e) { + throw new JMSException(e.getMessage()); + } + } + + @Override public void clearBody() { + super.clearBody(); + this.bytesOut = null; + this.dataAsOutput = null; + this.dataAsInput = null; + this.bytesIn = null; + } + + private JMSException handleOutputException(final IOException e) { + JMSException ex = new JMSException(e.getMessage()); + ex.initCause(e); + ex.setLinkedException(e); + return ex; + } + + private JMSException handleInputException(final IOException e) { + JMSException ex; + if (e instanceof EOFException) { + ex = new MessageEOFException(e.getMessage()); + } + else { + ex = new MessageFormatException(e.getMessage()); + } + ex.initCause(e); + ex.setLinkedException(e); + return ex; + } + + protected boolean isReadOnly() { + return readOnly; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java b/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java new file mode 100644 index 0000000..dddfb58 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotWriteableException; +import org.apache.commons.lang.StringUtils; +import org.apache.rocketmq.jms.msg.serialize.MapSerialize; + +import static java.lang.String.format; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Boolean; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Byte; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2ByteArray; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Char; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Double; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Float; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Int; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Long; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Short; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2String; + +/** + * Message can only be accessed by a thread at a time. + */ +public class JMSMapMessage extends AbstractJMSMessage implements MapMessage { + + private Map<String, Object> map; + + protected boolean readOnly; + + public JMSMapMessage(Map<String, Object> map) { + this.map = map; + } + + public JMSMapMessage() { + this.map = new HashMap(); + } + + @Override public Map<String, Object> getBody(Class clazz) throws JMSException { + if (isBodyAssignableTo(clazz)) { + return this.map; + } + + throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); + } + + @Override public byte[] getBody() throws JMSException { + return MapSerialize.instance().serialize(this.map); + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return Map.class.isAssignableFrom(c); + } + + @Override public boolean getBoolean(String name) throws JMSException { + checkName(name); + + return cast2Boolean(map.get(name)); + } + + private void checkName(String name) throws JMSException { + if (StringUtils.isBlank(name)) { + throw new JMSException("Name is required"); + } + } + + @Override public byte getByte(String name) throws JMSException { + checkName(name); + + return cast2Byte(map.get(name)); + } + + @Override public short getShort(String name) throws JMSException { + checkName(name); + + return cast2Short(map.get(name)); + } + + @Override public char getChar(String name) throws JMSException { + checkName(name); + + return cast2Char(map.get(name)); + } + + @Override public int getInt(String name) throws JMSException { + checkName(name); + + return cast2Int(map.get(name)); + } + + @Override public long getLong(String name) throws JMSException { + checkName(name); + + return cast2Long(map.get(name)); + } + + @Override public float getFloat(String name) throws JMSException { + checkName(name); + + return cast2Float(map.get(name)); + } + + @Override public double getDouble(String name) throws JMSException { + checkName(name); + + return cast2Double(map.get(name)); + } + + @Override public String getString(String name) throws JMSException { + checkName(name); + + return cast2String(map.get(name)); + } + + @Override public byte[] getBytes(String name) throws JMSException { + checkName(name); + + return cast2ByteArray(map.get(name)); + } + + @Override public Object getObject(String name) throws JMSException { + checkName(name); + + return map.get(name); + } + + @Override public Enumeration getMapNames() throws JMSException { + return Collections.enumeration(map.keySet()); + } + + @Override public void setBoolean(String name, boolean value) throws JMSException { + putProperty(name, value); + } + + private void putProperty(String name, Object obj) throws JMSException { + if (isReadOnly()) { + throw new MessageNotWriteableException("Message is not writable"); + } + + checkName(name); + + map.put(name, obj); + } + + @Override public void setByte(String name, byte value) throws JMSException { + putProperty(name, value); + } + + @Override public void setShort(String name, short value) throws JMSException { + putProperty(name, value); + } + + @Override public void setChar(String name, char value) throws JMSException { + putProperty(name, value); + } + + @Override public void setInt(String name, int value) throws JMSException { + putProperty(name, value); + } + + @Override public void setLong(String name, long value) throws JMSException { + putProperty(name, value); + } + + @Override public void setFloat(String name, float value) throws JMSException { + putProperty(name, value); + } + + @Override public void setDouble(String name, double value) throws JMSException { + putProperty(name, value); + } + + @Override public void setString(String name, String value) throws JMSException { + putProperty(name, value); + } + + @Override public void setBytes(String name, byte[] value) throws JMSException { + putProperty(name, value); + } + + @Override public void setBytes(String name, byte[] value, int offset, int length) throws JMSException { + putProperty(name, value); + } + + @Override public void setObject(String name, Object value) throws JMSException { + putProperty(name, value); + } + + @Override public boolean itemExists(String name) throws JMSException { + checkName(name); + + return map.containsKey(name); + } + + @Override public void clearBody() { + super.clearBody(); + this.map.clear(); + this.readOnly = false; + } + + protected boolean isReadOnly() { + return this.readOnly; + } + + public void setReadOnly(boolean readOnly) { + this.readOnly = readOnly; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java b/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java new file mode 100644 index 0000000..4f29d33 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.io.Serializable; +import javax.jms.JMSException; +import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; + +public class JMSObjectMessage extends AbstractJMSMessage implements javax.jms.ObjectMessage { + + private Serializable body; + + public JMSObjectMessage(Serializable object) { + this.body = object; + } + + public JMSObjectMessage() { + + } + + @Override public Serializable getBody(Class clazz) throws JMSException { + return body; + } + + @Override public byte[] getBody() throws JMSException { + return ObjectSerialize.instance().serialize(body); + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return true; + } + + public Serializable getObject() throws JMSException { + return this.body; + } + + public void setObject(Serializable object) throws JMSException { + this.body = object; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java b/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java new file mode 100644 index 0000000..5fd67a3 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import org.apache.rocketmq.jms.msg.serialize.StringSerialize; + +import static java.lang.String.format; + +public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { + + private String text; + + public JMSTextMessage() { + + } + + public JMSTextMessage(String text) { + setText(text); + } + + @Override public String getBody(Class clazz) throws JMSException { + if (isBodyAssignableTo(clazz)) { + return text; + } + + throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); + } + + @Override public byte[] getBody() throws JMSException { + return StringSerialize.instance().serialize(this.text); + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return String.class.isAssignableFrom(c); + } + + public void clearBody() { + super.clearBody(); + this.text = null; + } + + public String getText() throws JMSException { + return this.text; + } + + public void setText(String text) { + this.text = text; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java b/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java new file mode 100644 index 0000000..ca2cbed --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.convert; + +import java.util.Map; +import javax.jms.JMSException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.support.JMSUtils; + +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSExpiration; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMSMessageID; +import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME; +import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.toMsgModelEnum; + +public class JMS2RMQMessageConvert { + + public static final String USER_PROPERTY_PREFIX = "USER:"; + + public static MessageExt convert(AbstractJMSMessage jmsMsg) throws Exception { + MessageExt rmqMsg = new MessageExt(); + + handleHeader(jmsMsg, rmqMsg); + + handleBody(jmsMsg, rmqMsg); + + handleProperties(jmsMsg, rmqMsg); + + return rmqMsg; + } + + private static void handleHeader(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) { + rmqMsg.setTopic(JMSUtils.getDestinationName(jmsMsg.getJMSDestination())); + rmqMsg.putUserProperty(JMSMessageID.name(), jmsMsg.getJMSMessageID()); + rmqMsg.setBornTimestamp(jmsMsg.getJMSTimestamp()); + rmqMsg.putUserProperty(JMSExpiration.name(), String.valueOf(jmsMsg.getJMSExpiration())); + rmqMsg.setKeys(jmsMsg.getJMSMessageID()); + } + + private static void handleProperties(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) { + Map<String, Object> userProps = jmsMsg.getProperties(); + for (Map.Entry<String, Object> entry : userProps.entrySet()) { + rmqMsg.putUserProperty(new StringBuffer(USER_PROPERTY_PREFIX).append(entry.getKey()).toString(), entry.getValue().toString()); + } + } + + private static void handleBody(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) throws JMSException { + rmqMsg.putUserProperty(MSG_MODEL_NAME, toMsgModelEnum(jmsMsg).name()); + rmqMsg.setBody(jmsMsg.getBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java b/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java new file mode 100644 index 0000000..4adb692 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.convert; + +import java.util.Map; +import javax.jms.JMSException; +import javax.jms.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.destination.RocketMQTopic; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSMapMessage; +import org.apache.rocketmq.jms.msg.JMSObjectMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; +import org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum; +import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; +import org.apache.rocketmq.jms.msg.serialize.MapSerialize; +import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; +import org.apache.rocketmq.jms.msg.serialize.StringSerialize; + +import static java.lang.String.format; +import static org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert.USER_PROPERTY_PREFIX; +import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME; + +public class RMQ2JMSMessageConvert { + + public static Message convert(MessageExt rmqMsg) throws JMSException { + if (rmqMsg == null) { + throw new IllegalArgumentException("RocketMQ message could not be null"); + } + if (rmqMsg.getBody() == null) { + throw new IllegalArgumentException("RocketMQ message body could not be null"); + } + + AbstractJMSMessage jmsMsg = newAbstractJMSMessage(rmqMsg.getUserProperty(MSG_MODEL_NAME), rmqMsg.getBody()); + + setHeader(rmqMsg, jmsMsg); + + setProperties(rmqMsg, jmsMsg); + + return jmsMsg; + } + + private static AbstractJMSMessage newAbstractJMSMessage(String msgModel, byte[] body) throws JMSException { + AbstractJMSMessage message; + switch (JMSMessageModelEnum.valueOf(msgModel)) { + case BYTE: + return new JMSBytesMessage(body); + case MAP: + message = new JMSMapMessage(MapSerialize.instance().deserialize(body)); + break; + case OBJECT: + message = new JMSObjectMessage(ObjectSerialize.instance().deserialize(body)); + break; + case STRING: + message = new JMSTextMessage(StringSerialize.instance().deserialize(body)); + break; + default: + throw new JMSException(format("The type[%s] is not supported", msgModel)); + } + + return message; + } + + private static void setHeader(MessageExt rmqMsg, AbstractJMSMessage jmsMsg) { + jmsMsg.setJMSMessageID(rmqMsg.getUserProperty(JMSHeaderEnum.JMSMessageID.name())); + jmsMsg.setJMSTimestamp(rmqMsg.getBornTimestamp()); + jmsMsg.setJMSExpiration(Long.valueOf(rmqMsg.getUserProperty(JMSHeaderEnum.JMSExpiration.name()))); + jmsMsg.setJMSRedelivered(rmqMsg.getReconsumeTimes() > 0 ? true : false); + //todo: what about Queue? + jmsMsg.setJMSDestination(new RocketMQTopic(rmqMsg.getTopic())); + } + + private static void setProperties(MessageExt rmqMsg, AbstractJMSMessage jmsMsg) { + jmsMsg.setIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name(), rmqMsg.getReconsumeTimes() + 1); + + Map<String, String> propertiesMap = rmqMsg.getProperties(); + if (propertiesMap != null) { + for (String properName : propertiesMap.keySet()) { + if (properName.startsWith(USER_PROPERTY_PREFIX)) { + String properValue = propertiesMap.get(properName); + jmsMsg.setStringProperty(properName.substring(USER_PROPERTY_PREFIX.length()), properValue); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java b/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java new file mode 100644 index 0000000..cb27675 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSHeaderEnum.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.enums; + +import javax.jms.Message; + +public enum JMSHeaderEnum { + + JMSDestination, + JMSDeliveryMode, + JMSMessageID, + JMSTimestamp, + JMSCorrelationID, + JMSReplyTo, + JMSRedelivered, + JMSType, + JMSExpiration, + JMSPriority, + JMSDeliveryTime; + + public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = Message.DEFAULT_DELIVERY_MODE; + public static final long JMS_TIME_TO_LIVE_DEFAULT_VALUE = Message.DEFAULT_TIME_TO_LIVE; + public static final int JMS_PRIORITY_DEFAULT_VALUE = Message.DEFAULT_PRIORITY; + public static final long JMS_DELIVERY_TIME_DEFAULT_VALUE = Message.DEFAULT_DELIVERY_DELAY; + public static final boolean JMS_REDELIVERED_DEFAULT_VALUE = false; + public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0; + public static final int JMS_EXPIRATION_DEFAULT_VALUE = 0; + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java b/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java new file mode 100644 index 0000000..f7dc15a --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.enums; + +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSMapMessage; +import org.apache.rocketmq.jms.msg.JMSObjectMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; + +public enum JMSMessageModelEnum { + BYTE(JMSBytesMessage.class), + MAP(JMSMapMessage.class), + OBJECT(JMSObjectMessage.class), + STRING(JMSTextMessage.class); + + public static final String MSG_MODEL_NAME = "MsgModel"; + + private Class jmsClass; + + JMSMessageModelEnum(Class jmsClass) { + this.jmsClass = jmsClass; + } + + public static JMSMessageModelEnum toMsgModelEnum(AbstractJMSMessage jmsMsg) { + for (JMSMessageModelEnum e : values()) { + if (e.getJmsClass().isInstance(jmsMsg)) { + return e; + } + } + + throw new IllegalArgumentException(String.format("Not supported class[%s]", jmsMsg.getClass().getSimpleName())); + } + + public Class getJmsClass() { + return jmsClass; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java b/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java new file mode 100644 index 0000000..dd5955b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.enums; + +public enum JMSPropertiesEnum { + JMSXUserID, + JMSXDeliveryCount, + JMSXGroupID, + JMSXGroupSeq, + JMSXRcvTimestamp +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java b/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java new file mode 100644 index 0000000..7c7f1ea --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import com.alibaba.fastjson.JSON; +import java.util.HashMap; +import java.util.Map; +import javax.jms.JMSException; + +public class MapSerialize implements Serialize<Map> { + + private static MapSerialize ins = new MapSerialize(); + + public static MapSerialize instance() { + return ins; + } + + @Override public byte[] serialize(Map map) throws JMSException { + return JSON.toJSONBytes(map); + } + + private MapSerialize() { + } + + @Override public Map deserialize(byte[] bytes) throws JMSException { + return JSON.parseObject(bytes, HashMap.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java b/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java new file mode 100644 index 0000000..5e72955 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import javax.jms.JMSException; +import org.apache.commons.lang.exception.ExceptionUtils; + +public class ObjectSerialize implements Serialize<Object> { + + private static ObjectSerialize ins = new ObjectSerialize(); + + public static ObjectSerialize instance() { + return ins; + } + + private ObjectSerialize() { + } + + public byte[] serialize(Object object) throws JMSException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.close(); + baos.close(); + return baos.toByteArray(); + } + catch (IOException e) { + throw new JMSException(ExceptionUtils.getStackTrace(e)); + } + } + + public Serializable deserialize(byte[] bytes) throws JMSException { + try { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais); + ois.close(); + bais.close(); + return (Serializable) ois.readObject(); + } + catch (IOException e) { + throw new JMSException(e.getMessage()); + } + catch (ClassNotFoundException e) { + throw new JMSException(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java b/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java new file mode 100644 index 0000000..78a499c --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import javax.jms.JMSException; + +public interface Serialize<T> { + + byte[] serialize(T t) throws JMSException; + + T deserialize(byte[] bytes) throws JMSException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java b/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java new file mode 100644 index 0000000..9ee0d3b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import com.google.common.base.Charsets; +import java.nio.charset.Charset; +import javax.jms.JMSException; + +public class StringSerialize implements Serialize<String> { + + private static final String EMPTY_STRING = ""; + private static final byte[] EMPTY_BYTES = new byte[0]; + private static final Charset DEFAULT_CHARSET = Charsets.UTF_8; + private static StringSerialize ins = new StringSerialize(); + + public static StringSerialize instance() { + return ins; + } + + private StringSerialize() { + } + + @Override public byte[] serialize(String s) throws JMSException { + if (null == s) { + return EMPTY_BYTES; + } + try { + return s.getBytes(DEFAULT_CHARSET); + } + catch (Exception e) { + throw new JMSException(e.getMessage()); + } + } + + @Override public String deserialize(byte[] bytes) throws JMSException { + if (null == bytes) { + return EMPTY_STRING; + } + try { + return new String(bytes, DEFAULT_CHARSET); + } + catch (Exception e) { + throw new JMSException(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java b/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java new file mode 100644 index 0000000..bed8165 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +import java.util.UUID; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Queue; +import javax.jms.Topic; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.rocketmq.jms.RocketMQConsumer; + +public class JMSUtils { + + public static String getDestinationName(Destination destination) { + try { + String topicName; + if (destination instanceof Topic) { + topicName = ((Topic) destination).getTopicName(); + } + else if (destination instanceof Queue) { + topicName = ((Queue) destination).getQueueName(); + } + else { + throw new JMSException(String.format("Unsupported Destination type:", destination.getClass())); + } + return topicName; + } + catch (JMSException e) { + throw new JMSRuntimeException(e.getMessage()); + } + } + + public static String getConsumerGroup(RocketMQConsumer consumer) { + try { + return getConsumerGroup(consumer.getSubscriptionName(), + consumer.getSession().getConnection().getClientID(), + consumer.isShared() + ); + } + catch (JMSException e) { + throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e)); + } + } + + public static String getConsumerGroup(String subscriptionName, String clientID, boolean shared) { + StringBuffer consumerGroup = new StringBuffer(); + + if (StringUtils.isNotBlank(subscriptionName)) { + consumerGroup.append(subscriptionName); + } + + if (StringUtils.isNotBlank(clientID)) { + if (consumerGroup.length() != 0) { + consumerGroup.append("-"); + } + consumerGroup.append(clientID); + } + + if (shared) { + if (consumerGroup.length() != 0) { + consumerGroup.append("-"); + } + consumerGroup.append(uuid()); + } + + if (consumerGroup.length() == 0) { + consumerGroup.append(uuid()); + } + + return consumerGroup.toString(); + } + + public static String uuid() { + return UUID.randomUUID().toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java b/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java new file mode 100644 index 0000000..3ff1d69 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +/** + * Converter that convert object directly, which means Integer can only be + * converted to Integer,rather than Integer and Long. + */ +public class ObjectTypeCast { + + public static String cast2String(Object obj) { + if (obj == null) { + return null; + } + if (String.class.isInstance(obj)) { + return (String) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not String.class"); + } + + public static Long cast2Long(Object obj) { + if (obj == null) { + return null; + } + if (Long.class.isInstance(obj)) { + return (Long) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not Long.class"); + } + + public static Integer cast2Integer(Object obj) { + if (obj == null) { + return null; + } + if (Integer.class.isInstance(obj)) { + return (Integer) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not Integer.class"); + } + + public static Boolean cast2Boolean(Object obj) { + if (obj == null) { + return null; + } + if (Boolean.class.isInstance(obj)) { + return (Boolean) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not Boolean.class"); + } + + public static <T> T cast2Object(Object obj, Class<T> target) { + if (obj == null) { + return null; + } + if (target.isInstance(obj)) { + return (T) obj; + } + throw new ClassCastException("To casted object is " + obj.getClass() + ", not " + target.getSimpleName() + ".class"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java b/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java new file mode 100644 index 0000000..6e24ab0 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +import javax.jms.JMSException; +import javax.jms.MapMessage; + +/** + * Primitive type converter, according to the conversion table in {@link MapMessage}. + */ +public class PrimitiveTypeCast { + + /** + * Indicate if the parameter obj is primitive type. + * + * @param obj that to be check + * @return true if the obj is primitive type, otherwise return false + */ + public static boolean isPrimitiveType(Object obj) { + if (obj == null) { + return false; + } + if (Boolean.class.isInstance(obj) + || Byte.class.isInstance(obj) + || Short.class.isInstance(obj) + || Character.class.isInstance(obj) + || Integer.class.isInstance(obj) + || Long.class.isInstance(obj) + || Float.class.isInstance(obj) + || Double.class.isInstance(obj) + || String.class.isInstance(obj) + || byte[].class.isInstance(obj)) { + return true; + } + + return false; + } + + public static boolean cast2Boolean(Object obj) throws JMSException { + if (obj == null) { + return Boolean.valueOf(null); + } + + if (Boolean.class.isInstance(obj)) { + return (Boolean) obj; + } + if (String.class.isInstance(obj)) { + return Boolean.valueOf((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static byte cast2Byte(Object obj) throws JMSException { + if (obj == null) { + return Byte.valueOf(null); + } + + if (Byte.class.isInstance(obj)) { + return (Byte) obj; + } + if (String.class.isInstance(obj)) { + return Byte.valueOf((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static short cast2Short(Object obj) throws JMSException { + if (obj == null) { + return Short.valueOf(null); + } + + if (Byte.class.isInstance(obj)) { + return ((Byte) obj).shortValue(); + } + if (Short.class.isInstance(obj)) { + return (Short) obj; + } + if (String.class.isInstance(obj)) { + return Short.valueOf((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static char cast2Char(Object obj) throws JMSException { + if (obj == null) { + throw new NullPointerException("Obj is required"); + } + + if (Character.class.isInstance(obj)) { + return (Character) obj; + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static int cast2Int(Object obj) throws JMSException { + if (obj == null) { + return Integer.valueOf(null); + } + + if (Byte.class.isInstance(obj)) { + return ((Byte) obj).intValue(); + } + if (Short.class.isInstance(obj)) { + return ((Short) obj).intValue(); + } + if (Integer.class.isInstance(obj)) { + return (Integer) obj; + } + if (String.class.isInstance(obj)) { + return Integer.parseInt((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static long cast2Long(Object obj) throws JMSException { + if (obj == null) { + return Long.valueOf(null); + } + + if (Byte.class.isInstance(obj)) { + return ((Byte) obj).longValue(); + } + if (Short.class.isInstance(obj)) { + return ((Short) obj).longValue(); + } + if (Integer.class.isInstance(obj)) { + return ((Integer) obj).longValue(); + } + if (Long.class.isInstance(obj)) { + return (Long) obj; + } + if (String.class.isInstance(obj)) { + return Long.parseLong((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static float cast2Float(Object obj) throws JMSException { + if (obj == null) { + return Float.valueOf(null); + } + + if (Float.class.isInstance(obj)) { + return (Float) obj; + } + if (String.class.isInstance(obj)) { + return Float.parseFloat((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static double cast2Double(Object obj) throws JMSException { + if (obj == null) { + return Double.valueOf(null); + } + + if (Float.class.isInstance(obj)) { + return ((Float) obj).doubleValue(); + } + if (Double.class.isInstance(obj)) { + return (Double) obj; + } + if (String.class.isInstance(obj)) { + return Double.parseDouble((String) obj); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static String cast2String(Object obj) throws JMSException { + if (obj == null) { + return String.valueOf(null); + } + + if (Boolean.class.isInstance(obj) + || Byte.class.isInstance(obj) + || Short.class.isInstance(obj) + || Character.class.isInstance(obj) + || Integer.class.isInstance(obj) + || Long.class.isInstance(obj) + || Float.class.isInstance(obj) + || Double.class.isInstance(obj) + || String.class.isInstance(obj) + ) { + return obj.toString(); + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } + + public static byte[] cast2ByteArray(Object obj) throws JMSException { + if (obj instanceof byte[]) { + return (byte[]) obj; + } + + throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java b/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java new file mode 100644 index 0000000..c67e71c --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +public class ProviderVersion { + + public static final Version CURRENT_VERSION = Version.V1_1_0; + + public enum Version { + + V1_1_0(1); + private int value; + + Version(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..39da112 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="DefaultAppender" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${user.home}/logs/rocketmq/jms.log</file> + <append>true</append> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>${user.home}/logs/rocketmq/otherdays/jms.%i.log + </fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>10</maxIndex> + </rollingPolicy> + <triggeringPolicy + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>100MB</maxFileSize> + </triggeringPolicy> + <encoder> + <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <append>true</append> + <encoder> + <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <logger name="org.apache.rocketmq.jms"> + <level value="DEBUG"/> + </logger> + + <root> + <level value="ERROR"/> + <appender-ref ref="STDOUT"/> + </root> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java b/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java new file mode 100644 index 0000000..61f1e54 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.notNullValue; + +public class RocketMQConnectionFactoryTest { + + @Test + public void testClientId() throws Exception { + final String nameServerAddress = "localhost:6789"; + RocketMQConnectionFactory connectionFactory = new RocketMQConnectionFactory(nameServerAddress); + + assertThat(connectionFactory.getNameServerAddress(), is(nameServerAddress)); + assertThat(connectionFactory.getClientId(), notNullValue()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java b/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java new file mode 100644 index 0000000..0a3b36b --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.destination; + +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class RocketMQQueueTest { + + @Test + public void test() throws Exception { + RocketMQQueue queue = new RocketMQQueue("MyQueue"); + + assertThat(queue.getQueueName(), is("MyQueue")); + assertThat(queue.toString(), is("MyQueue")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java b/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java new file mode 100644 index 0000000..c482e1c --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.destination; + +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class RocketMQTopicTest { + + @Test + public void test() throws Exception { + RocketMQTopic topic = new RocketMQTopic("MyTopic"); + + assertThat(topic.getTopicName(), is("MyTopic")); + assertThat(topic.toString(), is("MyTopic")); + } + +} \ No newline at end of file
