http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java new file mode 100644 index 0000000..ea4b49e --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain.message; + +import com.google.common.collect.Maps; +import com.google.common.io.BaseEncoding; +import java.io.Serializable; +import java.util.Enumeration; +import java.util.Map; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.rocketmq.jms.domain.JmsBaseConstant; +import org.apache.rocketmq.jms.util.ExceptionUtil; + +public class JmsBaseMessage implements Message { + /** + * Message properties + */ + protected Map<String, Object> properties = Maps.newHashMap(); + /** + * Message headers + */ + protected Map<String, Object> headers = Maps.newHashMap(); + /** + * Message body + */ + protected Serializable body; + + @Override + public String getJMSMessageID() { + return (String) headers.get(JmsBaseConstant.JMS_MESSAGE_ID); + } + + /** + * Sets the message ID. + * <p/> + * <P>JMS providers set this field when a message is sent. Do not allow User to set the message ID by yourself. + * + * @param id the ID of the message + * @see javax.jms.Message#getJMSMessageID() + */ + + @Override + public void setJMSMessageID(String id) { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public long getJMSTimestamp() { + if (headers.containsKey(JmsBaseConstant.JMS_TIMESTAMP)) { + return (Long) headers.get(JmsBaseConstant.JMS_TIMESTAMP); + } + return 0; + } + + @Override + public void setJMSTimestamp(long timestamp) { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public byte[] getJMSCorrelationIDAsBytes() { + String jmsCorrelationID = getJMSCorrelationID(); + if (jmsCorrelationID != null) { + try { + return BaseEncoding.base64().decode(jmsCorrelationID); + } + catch (Exception e) { + return jmsCorrelationID.getBytes(); + } + } + return null; + } + + @Override + public void setJMSCorrelationIDAsBytes(byte[] correlationID) { + String encodedText = BaseEncoding.base64().encode(correlationID); + setJMSCorrelationID(encodedText); + } + + @Override + public String getJMSCorrelationID() { + if (headers.containsKey(JmsBaseConstant.JMS_CORRELATION_ID)) { + return (String) headers.get(JmsBaseConstant.JMS_CORRELATION_ID); + } + return null; + } + + @Override + public void setJMSCorrelationID(String correlationID) { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public Destination getJMSReplyTo() { + if (headers.containsKey(JmsBaseConstant.JMS_REPLY_TO)) { + return (Destination) headers.get(JmsBaseConstant.JMS_REPLY_TO); + } + return null; + } + + @Override + public void setJMSReplyTo(Destination replyTo) { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + + @Override + public Destination getJMSDestination() { + if (headers.containsKey(JmsBaseConstant.JMS_DESTINATION)) { + return (Destination) headers.get(JmsBaseConstant.JMS_DESTINATION); + } + return null; + } + + @Override + public void setJMSDestination(Destination destination) { + ExceptionUtil.handleUnSupportedException(); + } + + @SuppressWarnings("unchecked") + public <T> T getBody(Class<T> clazz) throws JMSException { + if (clazz.isInstance(body)) { + return (T) body; + } + else { + throw new IllegalArgumentException("The class " + clazz + + " is unknown to this implementation"); + } + } + + @Override + public int getJMSDeliveryMode() { + if (headers.containsKey(JmsBaseConstant.JMS_DELIVERY_MODE)) { + return (Integer) headers.get(JmsBaseConstant.JMS_DELIVERY_MODE); + } + return 0; + } + + /** + * Sets the <CODE>DeliveryMode</CODE> value for this message. + * <p/> + * <P>JMS providers set this field when a message is sent. ROCKETMQ only support DeliveryMode.PERSISTENT mode. So do not + * allow User to set this by yourself, but you can get the default mode by <CODE>getJMSDeliveryMode</CODE> method. + * + * @param deliveryMode the delivery mode for this message + * @see javax.jms.Message#getJMSDeliveryMode() + * @see javax.jms.DeliveryMode + */ + + @Override + public void setJMSDeliveryMode(int deliveryMode) { + ExceptionUtil.handleUnSupportedException(); + } + + public boolean isBodyAssignableTo(Class<?> clazz) throws JMSException { + return clazz.isInstance(body); + } + + @Override + public boolean getJMSRedelivered() { + return headers.containsKey(JmsBaseConstant.JMS_REDELIVERED) + && (Boolean) headers.get(JmsBaseConstant.JMS_REDELIVERED); + } + + @Override + public void setJMSRedelivered(boolean redelivered) { + ExceptionUtil.handleUnSupportedException(); + } + + /** + * copy meta data from source message + * + * @param sourceMessage source message + */ + public void copyMetaData(JmsBaseMessage sourceMessage) { + if (!sourceMessage.getHeaders().isEmpty()) { + for (Map.Entry<String, Object> entry : sourceMessage.getHeaders().entrySet()) { + if (!headerExits(entry.getKey())) { + setHeader(entry.getKey(), entry.getValue()); + } + } + } + if (!sourceMessage.getProperties().isEmpty()) { + for (Map.Entry<String, Object> entry : sourceMessage.getProperties().entrySet()) { + if (!propertyExists(entry.getKey())) { + setObjectProperty(entry.getKey(), entry.getValue()); + } + } + } + } + + @Override + public String getJMSType() { + return (String) headers.get(JmsBaseConstant.JMS_TYPE); + } + + @Override + public void setJMSType(String type) { + ExceptionUtil.handleUnSupportedException(); + } + + public Map<String, Object> getHeaders() { + return this.headers; + } + + @Override + public long getJMSExpiration() { + if (headers.containsKey(JmsBaseConstant.JMS_EXPIRATION)) { + return (Long) headers.get(JmsBaseConstant.JMS_EXPIRATION); + } + return 0; + } + + @Override + public void setJMSExpiration(long expiration) { + ExceptionUtil.handleUnSupportedException(); + } + + public boolean headerExits(String name) { + return this.headers.containsKey(name); + } + + @Override + public int getJMSPriority() { + if (headers.containsKey(JmsBaseConstant.JMS_PRIORITY)) { + return (Integer) headers.get(JmsBaseConstant.JMS_PRIORITY); + } + return 5; + } + + @Override + public void setJMSPriority(int priority) { + ExceptionUtil.handleUnSupportedException(); + } + + public void setHeader(String name, Object value) { + this.headers.put(name, value); + } + + public Map<String, Object> getProperties() { + return this.properties; + } + + public void setProperties(Map<String, Object> properties) { + this.properties = properties; + } + + @Override + public void acknowledge() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + @Override + public void clearProperties() { + this.properties.clear(); + } + + @Override + public void clearBody() { + this.body = null; + } + + @Override + public boolean propertyExists(String name) { + return properties.containsKey(name); + } + + @Override + public boolean getBooleanProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return value instanceof Boolean ? (Boolean) value : Boolean.valueOf(value.toString()); + } + return false; + } + + @Override + public byte getByteProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return value instanceof Byte ? (Byte) value : Byte.valueOf(value.toString()); + } + return 0; + } + + @Override + public short getShortProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return value instanceof Short ? (Short) value : Short.valueOf(value.toString()); + } + return 0; + } + + @Override + public int getIntProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return value instanceof Integer ? (Integer) value : Integer.valueOf(value.toString()); + } + return 0; + } + + @Override + public long getLongProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return value instanceof Long ? (Long) value : Long.valueOf(value.toString()); + } + return 0L; + } + + @Override + public float getFloatProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return value instanceof Float ? (Float) value : Float.valueOf(value.toString()); + } + return 0f; + } + + @Override + public double getDoubleProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return value instanceof Double ? (Double) value : Double.valueOf(value.toString()); + } + return 0d; + } + + @Override + public String getStringProperty(String name) throws JMSException { + if (propertyExists(name)) { + return getObjectProperty(name).toString(); + } + return null; + } + + @Override + public Object getObjectProperty(String name) throws JMSException { + return this.properties.get(name); + } + + @Override + public Enumeration<?> getPropertyNames() throws JMSException { + final Object[] keys = this.properties.keySet().toArray(); + return new Enumeration<Object>() { + int i; + + @Override + public boolean hasMoreElements() { + return i < keys.length; + } + + @Override + public Object nextElement() { + return keys[i++]; + } + }; + } + + @Override + public void setBooleanProperty(String name, boolean value) { + setObjectProperty(name, value); + } + + @Override + public void setByteProperty(String name, byte value) { + setObjectProperty(name, value); + } + + @Override + public void setShortProperty(String name, short value) { + setObjectProperty(name, value); + } + + @Override + public void setIntProperty(String name, int value) { + setObjectProperty(name, value); + } + + @Override + public void setLongProperty(String name, long value) { + setObjectProperty(name, value); + } + + public void setFloatProperty(String name, float value) { + setObjectProperty(name, value); + } + + @Override + public void setDoubleProperty(String name, double value) { + setObjectProperty(name, value); + } + + @Override + public void setStringProperty(String name, String value) { + setObjectProperty(name, value); + } + + @Override + public void setObjectProperty(String name, Object value) { + if (value instanceof Number || value instanceof String || value instanceof Boolean) { + this.properties.put(name, value); + } + else { + throw new IllegalArgumentException( + "Value should be boolean, byte, short, int, long, float, double, and String."); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java new file mode 100644 index 0000000..b1e85b0 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain.message; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; +import org.apache.rocketmq.jms.util.ExceptionUtil; + +/** + * The <CODE>BytesMessage</CODE> methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and + * <CODE>java.io.DataOutputStream</CODE>. <P> Notice:Although the JMS API allows the use of message properties with byte + * messages, they are typically not used, since the inclusion of properties may affect the format. <P> + */ +public class JmsBytesMessage extends JmsBaseMessage implements BytesMessage { + private DataInputStream dataAsInput; + private DataOutputStream dataAsOutput; + private ByteArrayOutputStream bytesOut; + private byte[] bytesIn; + + /** + * Message created for reading + * + * @param data + */ + public JmsBytesMessage(byte[] data) { + this.bytesIn = data; + dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length)); + } + + /** + * Message created to be sent + */ + public JmsBytesMessage() { + bytesOut = new ByteArrayOutputStream(); + dataAsOutput = new DataOutputStream(bytesOut); + } + + public long getBodyLength() throws JMSException { + return getData().length; + } + + /** + * @return the data + */ + public byte[] getData() { + if (bytesOut != null) { + return bytesOut.toByteArray(); + } + else { + return bytesIn; + } + + } + + public boolean readBoolean() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public byte readByte() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public int readUnsignedByte() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public short readShort() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public int readUnsignedShort() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public char readChar() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public int readInt() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public long readLong() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public float readFloat() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public double readDouble() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public String readUTF() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + public int readBytes(byte[] value) throws JMSException { + return readBytes(value, value.length); + } + + public int readBytes(byte[] value, int length) throws JMSException { + if (length > value.length) { + throw new IndexOutOfBoundsException("length must be smaller than the length of value"); + } + if (dataAsInput == null) { + throw new MessageNotReadableException("Message is not readable! "); + } + try { + int offset = 0; + while (offset < length) { + int read = dataAsInput.read(value, offset, length - offset); + if (read < 0) { + break; + } + offset += read; + } + + if (offset == 0 && length != 0) { + return -1; + } + else { + return offset; + } + } + catch (IOException e) { + throw handleInputException(e); + } + + } + + public void writeBoolean(boolean value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void writeByte(byte value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void writeShort(short value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void writeChar(char value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void writeInt(int value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void writeLong(long value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void writeFloat(float value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void writeDouble(double value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void writeUTF(String value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void writeBytes(byte[] value) throws JMSException { + if (dataAsOutput == null) { + throw new MessageNotWriteableException("Message is not writable! "); + } + try { + dataAsOutput.write(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeBytes(byte[] value, int offset, int length) throws JMSException { + if (dataAsOutput == null) { + throw new MessageNotWriteableException("Message is not writable! "); + } + try { + dataAsOutput.write(value, offset, length); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeObject(Object value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + public void reset() throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + private JMSException handleOutputException(final IOException e) { + JMSException ex = new JMSException(e.getMessage()); + ex.initCause(e); + ex.setLinkedException(e); + return ex; + } + + private JMSException handleInputException(final IOException e) { + JMSException ex; + if (e instanceof EOFException) { + ex = new MessageEOFException(e.getMessage()); + } + else { + ex = new MessageFormatException(e.getMessage()); + } + ex.initCause(e); + ex.setLinkedException(e); + return ex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java new file mode 100644 index 0000000..f67da14 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain.message; + +import java.io.Serializable; +import javax.jms.JMSException; +import javax.jms.ObjectMessage; + +public class JmsObjectMessage extends JmsBaseMessage implements ObjectMessage { + + public JmsObjectMessage(Serializable object) { + this.body = object; + } + + public JmsObjectMessage() { + + } + + public Serializable getObject() throws JMSException { + return this.body; + } + + public void setObject(Serializable object) throws JMSException { + this.body = object; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java new file mode 100644 index 0000000..ce19b51 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain.message; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + +public class JmsTextMessage extends JmsBaseMessage implements TextMessage { + private String text; + + public JmsTextMessage() { + + } + + public JmsTextMessage(String text) { + setText(text); + } + + public void clearBody() { + this.text = null; + super.clearBody(); + } + + public String getText() throws JMSException { + return this.text; + } + + public void setText(String text) { + this.body = text; + this.text = text; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java new file mode 100644 index 0000000..bd926e5 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.util; + +import com.google.common.base.Preconditions; +import javax.jms.JMSException; + +public class ExceptionUtil { + public static final boolean SKIP_SET_EXCEPTION + = Boolean.parseBoolean(System.getProperty("skip.set.exception", "false")); + + public static void handleUnSupportedException() { + if (!ExceptionUtil.SKIP_SET_EXCEPTION) { + throw new UnsupportedOperationException("Operation unsupported! If you want to skip this Exception," + + " use '-Dskip.set.exception=true' in JVM options."); + } + } + + public static JMSException convertToJmsException(Exception e, String extra) { + Preconditions.checkNotNull(extra); + Preconditions.checkNotNull(e); + JMSException jmsException = new JMSException(extra); + jmsException.initCause(e); + return jmsException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java new file mode 100644 index 0000000..3cf03f9 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.util; + +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import javax.jms.BytesMessage; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; +import org.apache.commons.lang.StringUtils; +import org.apache.rocketmq.jms.domain.JmsBaseConstant; +import org.apache.rocketmq.jms.domain.JmsBaseTopic; +import org.apache.rocketmq.jms.domain.message.JmsBaseMessage; +import org.apache.rocketmq.jms.domain.message.JmsBytesMessage; +import org.apache.rocketmq.jms.domain.message.JmsObjectMessage; +import org.apache.rocketmq.jms.domain.message.JmsTextMessage; + +import static org.apache.rocketmq.jms.domain.JmsBaseMessageProducer.initRocketMQHeaders; + +public class MessageConverter { + public static byte[] getContentFromJms(javax.jms.Message jmsMessage) throws Exception { + byte[] content; + if (jmsMessage instanceof TextMessage) { + if (StringUtils.isEmpty(((TextMessage) jmsMessage).getText())) { + throw new IllegalArgumentException("Message body length is zero"); + } + content = MsgConvertUtil.string2Bytes(((TextMessage) jmsMessage).getText(), + Charsets.UTF_8.toString()); + } + else if (jmsMessage instanceof ObjectMessage) { + if (((ObjectMessage) jmsMessage).getObject() == null) { + throw new IllegalArgumentException("Message body length is zero"); + } + content = MsgConvertUtil.objectSerialize(((ObjectMessage) jmsMessage).getObject()); + } + else if (jmsMessage instanceof BytesMessage) { + JmsBytesMessage bytesMessage = (JmsBytesMessage) jmsMessage; + if (bytesMessage.getBodyLength() == 0) { + throw new IllegalArgumentException("Message body length is zero"); + } + content = bytesMessage.getData(); + } + else { + throw new IllegalArgumentException("Unknown message type " + jmsMessage.getJMSType()); + } + + return content; + } + + public static JmsBaseMessage convert2JMSMessage(MessageExt msg) throws Exception { + JmsBaseMessage message; + if (MsgConvertUtil.MSGMODEL_BYTES.equals( + msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) { + message = new JmsBytesMessage(msg.getBody()); + } + else if (MsgConvertUtil.MSGMODEL_OBJ.equals( + msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) { + message = new JmsObjectMessage(MsgConvertUtil.objectDeserialize(msg.getBody())); + } + else if (MsgConvertUtil.MSGMODEL_TEXT.equals( + msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) { + message = new JmsTextMessage(MsgConvertUtil.bytes2String(msg.getBody(), + Charsets.UTF_8.toString())); + } + else { + // rocketmq producer sends bytesMessage without setting JMS_MSGMODEL. + message = new JmsBytesMessage(msg.getBody()); + } + + //-------------------------set headers------------------------- + Map<String, Object> properties = new HashMap<String, Object>(); + + message.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + msg.getMsgId()); + + if (msg.getReconsumeTimes() > 0) { + message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.TRUE); + } + else { + message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE); + } + + Map<String, String> propertiesMap = msg.getProperties(); + if (propertiesMap != null) { + for (String properName : propertiesMap.keySet()) { + String properValue = propertiesMap.get(properName); + if (JmsBaseConstant.JMS_DESTINATION.equals(properName)) { + String destinationStr = properValue; + if (null != destinationStr) { + List<String> msgTuple = Arrays.asList(destinationStr.split(":")); + message.setHeader(JmsBaseConstant.JMS_DESTINATION, + new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1))); + } + } + else if (JmsBaseConstant.JMS_DELIVERY_MODE.equals(properName) || + JmsBaseConstant.JMS_PRIORITY.equals(properName)) { + message.setHeader(properName, properValue); + } + else if (JmsBaseConstant.JMS_TIMESTAMP.equals(properName) || + JmsBaseConstant.JMS_EXPIRATION.equals(properName)) { + message.setHeader(properName, properValue); + } + else if (JmsBaseConstant.JMS_CORRELATION_ID.equals(properName) || + JmsBaseConstant.JMS_TYPE.equals(properName)) { + message.setHeader(properName, properValue); + } + else if (JmsBaseConstant.JMS_MESSAGE_ID.equals(properName) || + JmsBaseConstant.JMS_REDELIVERED.equals(properName)) { + //JMS_MESSAGE_ID should set by msg.getMsgID() + continue; + } + else { + properties.put(properName, properValue); + } + } + } + + //Handle System properties, put into header. + //add what? + message.setProperties(properties); + + return message; + } + + public static Message convert2RMQMessage(JmsBaseMessage jmsMsg) throws Exception { + Message rocketmqMsg = new MessageExt(); + // 1. Transform message body + rocketmqMsg.setBody(MessageConverter.getContentFromJms(jmsMsg)); + + // 2. Transform topic and messageType + JmsBaseTopic destination = (JmsBaseTopic) jmsMsg.getHeaders().get(JmsBaseConstant.JMS_DESTINATION); + String topic = destination.getMessageTopic(); + rocketmqMsg.setTopic(topic); + String messageType = destination.getMessageType(); + Preconditions.checkState(!messageType.contains("||"), + "'||' can not be in the destination when sending a message"); + rocketmqMsg.setTags(messageType); + + // 3. Transform message properties + Properties properties = initRocketMQHeaders(jmsMsg, topic, messageType); + for (String name : properties.stringPropertyNames()) { + String value = properties.getProperty(name); + if (MessageConst.PROPERTY_KEYS.equals(name)) { + rocketmqMsg.setKeys(value); + } else if (MessageConst.PROPERTY_TAGS.equals(name)) { + rocketmqMsg.setTags(value); + } else if (MessageConst.PROPERTY_DELAY_TIME_LEVEL.equals(name)) { + rocketmqMsg.setDelayTimeLevel(Integer.parseInt(value)); + } else if (MessageConst.PROPERTY_WAIT_STORE_MSG_OK.equals(name)) { + rocketmqMsg.setWaitStoreMsgOK(Boolean.parseBoolean(value)); + } else if (MessageConst.PROPERTY_BUYER_ID.equals(name)) { + rocketmqMsg.setBuyerId(value); + } else { + rocketmqMsg.putUserProperty(name, value); + } + } + + return rocketmqMsg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java new file mode 100644 index 0000000..ec55bbc --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +public class MsgConvertUtil { + + public static final byte[] EMPTY_BYTES = new byte[0]; + public static final String EMPTY_STRING = ""; + + public static final String JMS_MSGMODEL = "jmsMsgModel"; + /** + * To adapt this scene: "Notify client try to receive ObjectMessage sent by JMS client" Set notify out message + * model, value can be textMessage OR objectMessage + */ + public static final String COMPATIBLE_FIELD_MSGMODEL = "notifyOutMsgModel"; + + public static final String MSGMODEL_TEXT = "textMessage"; + public static final String MSGMODEL_BYTES = "bytesMessage"; + public static final String MSGMODEL_OBJ = "objectMessage"; + + public static final String MSG_TOPIC = "msgTopic"; + public static final String MSG_TYPE = "msgType"; + + public static byte[] objectSerialize(Object object) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.close(); + baos.close(); + return baos.toByteArray(); + } + + public static Serializable objectDeserialize(byte[] bytes) throws IOException, ClassNotFoundException { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais); + ois.close(); + bais.close(); + return (Serializable) ois.readObject(); + } + + public static final byte[] string2Bytes(String s, String charset) { + if (null == s) { + return EMPTY_BYTES; + } + byte[] bs = null; + try { + bs = s.getBytes(charset); + } + catch (Exception e) { + // ignore + } + return bs; + } + + public static final String bytes2String(byte[] bs, String charset) { + if (null == bs) { + return EMPTY_STRING; + } + String s = null; + try { + s = new String(bs, charset); + } + catch (Exception e) { + // ignore + } + return s; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java new file mode 100644 index 0000000..9b29928 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.commons.lang.StringUtils; +import org.apache.rocketmq.jms.domain.CommonConstant; + +public abstract class URISpecParser { + + private static final String DEFAULT_BROKER = "rocketmq"; + + /** + * ConnectionUrl spec is broker://ip:port?key1=value1&key2=value2 + * + * @param uri Just like broker://ip:port?key1=value1&key2=value2 + * @return The parameters' map + */ + public static Map<String, String> parseURI(String uri) { + Preconditions.checkArgument(null != uri && !uri.trim().isEmpty(), "Uri can not be empty!"); + + Map<String, String> results = Maps.newHashMap(); + String broker = uri.substring(0, uri.indexOf(":")); + results.put(CommonConstant.PROVIDER, broker); + + if (broker.equals(DEFAULT_BROKER)) { + //Special handle for alibaba inner mq broker + String queryStr = uri.substring(uri.indexOf("?") + 1, uri.length()); + if (StringUtils.isNotEmpty(queryStr)) { + String[] params = queryStr.split("&"); + for (String param : params) { + if (param.contains("=")) { + String[] values = param.split("=", 2); + results.put(values[0], values[1]); + } + } + } + } + else { + throw new IllegalArgumentException("Broker must be rocketmq"); + } + return results; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/resources/application.conf b/rocketmq-jms/core/src/main/resources/application.conf new file mode 100644 index 0000000..713c915 --- /dev/null +++ b/rocketmq-jms/core/src/main/resources/application.conf @@ -0,0 +1 @@ +version = ${project.version} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java new file mode 100644 index 0000000..d77b13e --- /dev/null +++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Message; +import javax.jms.MessageListener; +import org.junit.Assert; + +public class JmsTestListener implements MessageListener { + + private int expectd; + private CountDownLatch latch; + private AtomicInteger consumedNum = new AtomicInteger(0); + + public JmsTestListener() { + this.expectd = 10; + } + public JmsTestListener(int expectd) { + this.expectd = expectd; + } + public JmsTestListener(int expected, CountDownLatch latch) { + this.expectd = expected; + this.latch = latch; + } + @Override + public void onMessage(Message message) { + try { + Assert.assertNotNull(message); + Assert.assertNotNull(message.getJMSMessageID()); + if (consumedNum.incrementAndGet() == expectd && latch != null) { + latch.countDown(); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public int getConsumedNum() { + return consumedNum.get(); + } + + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + + public void setExpectd(int expectd) { + this.expectd = expectd; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java new file mode 100644 index 0000000..855cb19 --- /dev/null +++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import java.lang.reflect.Field; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.client.producer.MQProducer; +import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer; +import org.apache.rocketmq.jms.domain.JmsBaseMessageProducer; +import org.apache.rocketmq.jms.domain.RMQPushConsumerExt; +import org.junit.Assert; + +public class JmsTestUtil { + public static MQProducer getMQProducer(String producerId) throws Exception { + Assert.assertNotNull(producerId); + Field field = JmsBaseMessageProducer.class.getDeclaredField("producerMap"); + field.setAccessible(true); + ConcurrentMap<String, MQProducer> producerMap = (ConcurrentMap<String, MQProducer>) field.get(null); + return producerMap.get(producerId); + } + public static RMQPushConsumerExt getRMQPushConsumerExt(String consumerId) throws Exception { + Assert.assertNotNull(consumerId); + Field field = JmsBaseMessageConsumer.class.getDeclaredField("consumerMap"); + field.setAccessible(true); + ConcurrentMap<String, RMQPushConsumerExt> consumerMap = (ConcurrentMap<String, RMQPushConsumerExt>) field.get(null); + return consumerMap.get(consumerId); + } + public static void checkConsumerState(String consumerId, boolean isNull, boolean isStarted) throws Exception { + RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId); + if (isNull) { + Assert.assertNull(rmqPushConsumerExt); + } else { + Assert.assertNotNull(rmqPushConsumerExt); + Assert.assertEquals(isStarted, rmqPushConsumerExt.isStarted()); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java new file mode 100644 index 0000000..9fe9f5e --- /dev/null +++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain.message; + +import org.junit.Assert; +import org.junit.Test; + +public class JmsBytesMessageTest { + + private byte[] receiveData = "receive data test".getBytes(); + private byte[] sendData = "send data test".getBytes(); + + @Test + public void testGetData() throws Exception { + JmsBytesMessage readMessage = new JmsBytesMessage(receiveData); + + System.out.println(new String(readMessage.getData())); + Assert.assertEquals(new String(receiveData), new String(readMessage.getData())); + + JmsBytesMessage sendMessage = new JmsBytesMessage(); + sendMessage.writeBytes(sendData, 0, sendData.length); + + System.out.println(new String(sendMessage.getData())); + Assert.assertEquals(new String(sendData), new String(sendMessage.getData())); + + } + + @Test + public void testGetBodyLength() throws Exception { + + JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData); + + System.out.println(bytesMessage.getBodyLength()); + Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length); + } + + @Test + public void testReadBytes() throws Exception { + JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData); + + Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length); + byte[] receiveValue = new byte[receiveData.length]; + bytesMessage.readBytes(receiveValue); + + System.out.println(new String(receiveValue)); + Assert.assertEquals(new String(receiveValue), new String(receiveData)); + + } + + @Test + public void testReadBytes1() throws Exception { + JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData); + + byte[] receiveValue1 = new byte[2]; + bytesMessage.readBytes(receiveValue1, 2); + System.out.println(new String(receiveValue1)); + Assert.assertEquals(new String(receiveData).substring(0, 2), new String(receiveValue1)); + + byte[] receiceValue2 = new byte[2]; + bytesMessage.readBytes(receiceValue2, 2); + System.out.println(new String(receiceValue2)); + Assert.assertEquals(new String(receiveData).substring(2, 4), new String(receiceValue2)); + + } + + @Test + public void testWriteBytes() throws Exception { + JmsBytesMessage jmsBytesMessage = new JmsBytesMessage(); + jmsBytesMessage.writeBytes(sendData); + + System.out.println(new String(jmsBytesMessage.getData())); + Assert.assertEquals(new String(jmsBytesMessage.getData()), new String(sendData)); + + } + + @Test + public void testException() throws Exception { + JmsBytesMessage jmsBytesMessage = new JmsBytesMessage(); + + byte[] receiveValue = new byte[receiveData.length]; +// Throws out NullPointerException +// jmsBytesMessage.readBytes(receiveValue); + + JmsBytesMessage sendMessage = new JmsBytesMessage(sendData); +// Throws out NullPointerException +// sendMessage.writeBytes("hello again".getBytes()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java new file mode 100644 index 0000000..b570142 --- /dev/null +++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java @@ -0,0 +1,52 @@ +package org.apache.rocketmq.jms.domain.message; + +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.domain.JmsBaseConstant; +import org.apache.rocketmq.jms.domain.JmsBaseTopic; +import org.apache.rocketmq.jms.util.MessageConverter; +import org.apache.rocketmq.jms.util.MsgConvertUtil; +import org.junit.Assert; +import org.junit.Test; + +public class JmsMessageConvertTest { + @Test + public void testCovert2RMQ() throws Exception { + //init jmsBaseMessage + String topic = "TestTopic"; + String messageType = "TagA"; + + JmsBaseMessage jmsBaseMessage = new JmsTextMessage("testText"); + jmsBaseMessage.setHeader(JmsBaseConstant.JMS_DESTINATION, new JmsBaseTopic(topic, messageType)); + jmsBaseMessage.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:null"); + jmsBaseMessage.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE); + + jmsBaseMessage.setObjectProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT); + jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TOPIC, topic); + jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TYPE, messageType); + jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_TAGS, messageType); + jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_KEYS, messageType); + + //convert to RMQMessage + MessageExt message = (MessageExt)MessageConverter.convert2RMQMessage(jmsBaseMessage); + + System.out.println(message); + + //then convert back to jmsBaseMessage + JmsBaseMessage jmsBaseMessageBack = MessageConverter.convert2JMSMessage(message); + + JmsTextMessage jmsTextMessage = (JmsTextMessage) jmsBaseMessage; + JmsTextMessage jmsTextMessageBack = (JmsTextMessage) jmsBaseMessageBack; + + Assert.assertEquals(jmsTextMessage.getText(), jmsTextMessageBack.getText()); + Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), jmsTextMessageBack.getJMSDestination().toString()); + Assert.assertEquals(jmsTextMessage.getJMSMessageID(), jmsTextMessageBack.getJMSMessageID()); + Assert.assertEquals(jmsTextMessage.getJMSRedelivered(), jmsTextMessageBack.getJMSRedelivered()); + Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL)); + Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TOPIC), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TOPIC)); + Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TYPE), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TYPE)); + Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_TAGS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_TAGS)); + Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_KEYS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_KEYS)); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java new file mode 100644 index 0000000..6951976 --- /dev/null +++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain.message; + +import java.io.Serializable; +import javax.jms.JMSException; +import org.junit.Assert; +import org.junit.Test; + +public class JmsObjectMessageTest { + + @Test + public void testGetObject() { + JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20)); + try { + Assert.assertEquals(jmsObjectMessage.getObject(), new User("jack", 20)); + } + catch (JMSException e) { + e.printStackTrace(); + } + } + + @Test + public void testGetBody() { + JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20)); + + try { + User user = (User)jmsObjectMessage.getBody(Object.class); + System.out.println(user.getName() + ": " + user.getAge()); + Assert.assertEquals(jmsObjectMessage.getBody(Object.class), jmsObjectMessage.getObject()); + } + catch (JMSException e) { + e.printStackTrace(); + } + } + + private class User implements Serializable { + private String name; + private int age; + + private User(String name, int age) { + this.name = name; + this.age = age; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null || getClass() != obj.getClass()) + return false; + + User user = (User)obj; + if (age != user.getAge()) + return false; + if (name != null ? !name.equals(user.getName()) : user.getName() != null) + return false; + return true; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java new file mode 100644 index 0000000..d3c8287 --- /dev/null +++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain.message; + +import javax.jms.JMSException; +import org.junit.Assert; +import org.junit.Test; + +public class JmsTextMessageTest { + private String text = "jmsTextMessage test"; + + @Test + public void testGetBody() { + JmsTextMessage jmsTextMessage = new JmsTextMessage(text); + try { + Assert.assertEquals(jmsTextMessage.getBody(String.class), text); + } + catch (JMSException e) { + e.printStackTrace(); + } + } + + @Test + public void testSetGetText() { + JmsTextMessage jmsTextMessage = new JmsTextMessage(); + jmsTextMessage.setText(text); + try { + Assert.assertEquals(jmsTextMessage.getText(), text); + } + catch (JMSException e) { + e.printStackTrace(); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java new file mode 100644 index 0000000..02fe111 --- /dev/null +++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.jms.domain.CommonConstant; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IntegrationTestBase { + public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class); + + protected static Random random = new Random(); + protected static final String SEP = File.separator; + + + protected static String topic = "jms-test"; + protected static String topic2 = "jms-test-2"; + protected static String messageType = "TagA"; + protected static String producerId = "PID-jms-test"; + protected static String consumerId = "CID-jms-test"; + protected static String consumerId2 = "CID-jms-test-2"; + protected static String nameServer; + protected static String text = "English test"; + protected static int consumeThreadNums = 16; + + + + + protected static final String BROKER_NAME_PREFIX = "TestBrokerName_"; + protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0); + protected static final List<File> TMPE_FILES = new ArrayList<File>(); + protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<BrokerController>(); + protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<NamesrvController>(); + + + private static String createBaseDir() { + String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID(); + final File file = new File(baseDir); + if (file.exists()) { + System.out.println(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir)); + System.exit(1); + } + TMPE_FILES.add(file); + return baseDir; + } + + public static NamesrvController createAndStartNamesrv() { + String baseDir = createBaseDir(); + NamesrvConfig namesrvConfig = new NamesrvConfig(); + NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); + namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json"); + + nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000)); + NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); + try { + Assert.assertTrue(namesrvController.initialize()); + logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort()); + namesrvController.start(); + } catch (Exception e) { + System.out.println("Name Server start failed"); + System.exit(1); + } + NAMESRV_CONTROLLERS.add(namesrvController); + return namesrvController; + + } + + + public static BrokerController createAndStartBroker(String nsAddr) { + String baseDir = createBaseDir(); + BrokerConfig brokerConfig = new BrokerConfig(); + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + NettyClientConfig nettyClientConfig = new NettyClientConfig(); + MessageStoreConfig storeConfig = new MessageStoreConfig(); + brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); + brokerConfig.setBrokerIP1("127.0.0.1"); + brokerConfig.setNamesrvAddr(nsAddr); + storeConfig.setStorePathRootDir(baseDir); + storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); + storeConfig.setHaListenPort(8000 + random.nextInt(1000)); + nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); + BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); + try { + Assert.assertTrue(brokerController.initialize()); + logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); + brokerController.start(); + } catch (Exception e) { + System.out.println("Broker start failed"); + System.exit(1); + } + BROKER_CONTROLLERS.add(brokerController); + return brokerController; + } + + + + protected static DefaultMQAdminExt defaultMQAdminExt; + + static { + //clear the environment + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { + if (defaultMQAdminExt != null) { + defaultMQAdminExt.shutdown(); + } + for (NamesrvController namesrvController: NAMESRV_CONTROLLERS) { + if (namesrvController != null) { + namesrvController.shutdown(); + } + } + for (BrokerController brokerController: BROKER_CONTROLLERS) { + if (brokerController != null) { + brokerController.shutdown(); + } + } + for (File file : TMPE_FILES) { + deleteFile(file); + } + } + }); + + + NamesrvController namesrvController = IntegrationTestBase.createAndStartNamesrv(); + nameServer = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort(); + BrokerController brokerController = createAndStartBroker(nameServer); + + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExt.setNamesrvAddr(nameServer); + try { + defaultMQAdminExt.start(); + } catch (MQClientException e) { + System.out.println("DefaultMQAdminExt start failed"); + System.exit(1); + } + + createTopic(topic, brokerController.getBrokerAddr()); + + + } + + public static void deleteFile(File file) { + if (!file.exists()) { + return; + } + if (file.isFile()) { + file.delete(); + } else if (file.isDirectory()) { + File[] files = file.listFiles(); + for (int i = 0;i < files.length;i ++) { + deleteFile(files[i]); + } + file.delete(); + } + } + public static void createTopic(String topic, String addr) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setReadQueueNums(4); + topicConfig.setWriteQueueNums(4); + try { + defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); + } catch (Exception e) { + logger.error("Create topic:{} addr:{} failed", addr, topic); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java new file mode 100644 index 0000000..367700a --- /dev/null +++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.jms.JmsTestListener; +import org.apache.rocketmq.jms.JmsTestUtil; +import org.apache.rocketmq.jms.domain.CommonConstant; +import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt; + +public class JmsClientIT extends IntegrationTestBase { + + @Test + public void testConfigInURI() throws Exception { + JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new + URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s", + CommonConstant.PRODUCERID, producerId, + CommonConstant.CONSUMERID, consumerId, + CommonConstant.NAMESERVER, nameServer, + CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums, + CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000, + CommonConstant.INSTANCE_NAME, "JMS_TEST"))); + + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + try { + Destination destination = session.createTopic(topic + ":" + messageType); + session.createConsumer(destination); + session.createProducer(destination); + + DefaultMQPushConsumer rmqPushConsumer = (DefaultMQPushConsumer) getRMQPushConsumerExt(consumerId).getConsumer(); + Assert.assertNotNull(rmqPushConsumer); + Assert.assertEquals(consumerId, rmqPushConsumer.getConsumerGroup()); + Assert.assertEquals("JMS_TEST", rmqPushConsumer.getInstanceName()); + Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMax()); + Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMin()); + Assert.assertEquals(nameServer, rmqPushConsumer.getNamesrvAddr()); + + DefaultMQProducer mqProducer = (DefaultMQProducer) JmsTestUtil.getMQProducer(producerId); + Assert.assertNotNull(mqProducer); + Assert.assertEquals(producerId, mqProducer.getProducerGroup()); + Assert.assertEquals("JMS_TEST", mqProducer.getInstanceName()); + Assert.assertEquals(10 * 1000, mqProducer.getSendMsgTimeout()); + Assert.assertEquals(nameServer, mqProducer.getNamesrvAddr()); + + Thread.sleep(2000); + } + finally { + connection.close(); + } + + } + + + private Connection createConnection(String producerGroup, String consumerGroup) throws Exception { + JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new + URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s", + CommonConstant.PRODUCERID, producerGroup, + CommonConstant.CONSUMERID, consumerGroup, + CommonConstant.NAMESERVER, nameServer, + CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums, + CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000, + CommonConstant.INSTANCE_NAME, "JMS_TEST"))); + return connectionFactory.createConnection(); + } + + @Test + public void testProducerAndConsume_TwoConsumer() throws Exception { + + Connection connection = createConnection(producerId, consumerId); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destinationA = session.createTopic("TopicA"); + Destination destinationB = session.createTopic("TopicB"); + final CountDownLatch countDownLatch = new CountDownLatch(2); + JmsTestListener listenerA = new JmsTestListener(10,countDownLatch); + JmsTestListener listenerB = new JmsTestListener(10, countDownLatch); + + try { + //two consumers + MessageConsumer messageConsumerA = session.createConsumer(destinationA); + messageConsumerA.setMessageListener(listenerA); + MessageConsumer messageConsumerB = session.createConsumer(destinationB); + messageConsumerB.setMessageListener(listenerB); + //producer + MessageProducer messageProducer = session.createProducer(destinationA); + connection.start(); + + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(text + i); + Assert.assertNull(message.getJMSMessageID()); + messageProducer.send(message); + Assert.assertNotNull(message.getJMSMessageID()); + } + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(text + i); + Assert.assertNull(message.getJMSMessageID()); + messageProducer.send(destinationB, message); + Assert.assertNotNull(message.getJMSMessageID()); + } + + if (countDownLatch.await(30, TimeUnit.SECONDS)) { + Thread.sleep(2000); + } + Assert.assertEquals(10, listenerA.getConsumedNum()); + Assert.assertEquals(10, listenerB.getConsumedNum()); + } + finally { + //Close the connection + connection.close(); + } + + } + + @Test + public void testProducerAndConsume_TagFilter() throws Exception { + Connection connection = createConnection(producerId, consumerId); + Connection anotherConnection = createConnection(producerId, consumerId +"other"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session anotherSession = anotherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destinationA = session.createTopic("topic:tagA"); + Destination destinationB = session.createTopic("topic:tagB"); + final CountDownLatch countDownLatch = new CountDownLatch(2); + JmsTestListener listenerForTagA = new JmsTestListener(10, countDownLatch); + JmsTestListener listenerForAll = new JmsTestListener(40, countDownLatch); + try { + session.createConsumer(destinationA).setMessageListener(listenerForTagA); + anotherSession.createConsumer(session.createTopic("topic")).setMessageListener(listenerForAll); + //producer + MessageProducer messageProducer = session.createProducer(destinationA); + connection.start(); + anotherConnection.start(); + + for (int i = 0; i < 20; i++) { + TextMessage message = session.createTextMessage(text + i); + Assert.assertNull(message.getJMSMessageID()); + messageProducer.send(message); + Assert.assertNotNull(message.getJMSMessageID()); + } + for (int i = 0; i < 20; i++) { + TextMessage message = session.createTextMessage(text + i); + Assert.assertNull(message.getJMSMessageID()); + messageProducer.send(destinationB, message); + Assert.assertNotNull(message.getJMSMessageID()); + } + + if (countDownLatch.await(30, TimeUnit.SECONDS)) { + Thread.sleep(2000); + } + Assert.assertEquals(20, listenerForTagA.getConsumedNum()); + Assert.assertEquals(40, listenerForAll.getConsumedNum()); + } + finally { + //Close the connection + connection.close(); + anotherConnection.close(); + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java new file mode 100644 index 0000000..6cbb7b1 --- /dev/null +++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration; + +import java.net.URI; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory; +import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer; +import org.apache.rocketmq.jms.domain.RMQPushConsumerExt; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.rocketmq.jms.JmsTestUtil.checkConsumerState; +import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt; + +public class JmsConsumerIT extends IntegrationTestBase { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + + private MessageListener listener = new MessageListener() { + @Override + public void onMessage(Message message) { + try { + Assert.assertNotNull(message); + Assert.assertNotNull(message.getJMSMessageID()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + + @Test + public void testStartIdempotency() throws Exception { + JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new + URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer)); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + checkConsumerState(consumerId, true, false); + try { + Destination destination = session.createTopic(topic + ":" + messageType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(listener); + + checkConsumerState(consumerId, false, false); + + ((JmsBaseMessageConsumer) consumer).startConsumer(); + checkConsumerState(consumerId, false, true); + + Destination destination1 = session.createTopic(topic2 + ":" + messageType); + MessageConsumer consumer1 = session.createConsumer(destination1); + consumer1.setMessageListener(listener); + + ((JmsBaseMessageConsumer) consumer1).startConsumer(); + checkConsumerState(consumerId, false, true); + + //the start is idempotent + connection.start(); + connection.start(); + + Thread.sleep(5000); + } + finally { + connection.close(); + } + } + + @Test + public void testReferenceCount() throws Exception { + JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new + URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer)); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + try { + Destination destination = session.createTopic(topic + ":" + messageType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(listener); + + RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId); + Assert.assertNotNull(rmqPushConsumerExt); + Assert.assertEquals(1, rmqPushConsumerExt.getReferenceCount()); + + + MessageConsumer consumer2 = session.createConsumer(destination); + Assert.assertEquals(2, rmqPushConsumerExt.getReferenceCount()); + + MessageConsumer consumer3 = session.createConsumer(session.createTopic(topic + ":" + messageType)); + + Assert.assertEquals(3, rmqPushConsumerExt.getReferenceCount()); + + session.close(); + + Assert.assertEquals(0, rmqPushConsumerExt.getReferenceCount()); + Assert.assertEquals(false, rmqPushConsumerExt.isStarted()); + Assert.assertNull(getRMQPushConsumerExt(consumerId)); + + Thread.sleep(5000); + } + finally { + connection.close(); + } + } + +}
