http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java b/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java deleted file mode 100644 index f7dc15a..0000000 --- a/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnum.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.enums; - -import org.apache.rocketmq.jms.msg.AbstractJMSMessage; -import org.apache.rocketmq.jms.msg.JMSBytesMessage; -import org.apache.rocketmq.jms.msg.JMSMapMessage; -import org.apache.rocketmq.jms.msg.JMSObjectMessage; -import org.apache.rocketmq.jms.msg.JMSTextMessage; - -public enum JMSMessageModelEnum { - BYTE(JMSBytesMessage.class), - MAP(JMSMapMessage.class), - OBJECT(JMSObjectMessage.class), - STRING(JMSTextMessage.class); - - public static final String MSG_MODEL_NAME = "MsgModel"; - - private Class jmsClass; - - JMSMessageModelEnum(Class jmsClass) { - this.jmsClass = jmsClass; - } - - public static JMSMessageModelEnum toMsgModelEnum(AbstractJMSMessage jmsMsg) { - for (JMSMessageModelEnum e : values()) { - if (e.getJmsClass().isInstance(jmsMsg)) { - return e; - } - } - - throw new IllegalArgumentException(String.format("Not supported class[%s]", jmsMsg.getClass().getSimpleName())); - } - - public Class getJmsClass() { - return jmsClass; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java b/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java deleted file mode 100644 index dd5955b..0000000 --- a/src/main/java/org/apache/rocketmq/jms/msg/enums/JMSPropertiesEnum.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.enums; - -public enum JMSPropertiesEnum { - JMSXUserID, - JMSXDeliveryCount, - JMSXGroupID, - JMSXGroupSeq, - JMSXRcvTimestamp -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java b/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java deleted file mode 100644 index 7c7f1ea..0000000 --- a/src/main/java/org/apache/rocketmq/jms/msg/serialize/MapSerialize.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.serialize; - -import com.alibaba.fastjson.JSON; -import java.util.HashMap; -import java.util.Map; -import javax.jms.JMSException; - -public class MapSerialize implements Serialize<Map> { - - private static MapSerialize ins = new MapSerialize(); - - public static MapSerialize instance() { - return ins; - } - - @Override public byte[] serialize(Map map) throws JMSException { - return JSON.toJSONBytes(map); - } - - private MapSerialize() { - } - - @Override public Map deserialize(byte[] bytes) throws JMSException { - return JSON.parseObject(bytes, HashMap.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java b/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java deleted file mode 100644 index a685808..0000000 --- a/src/main/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerialize.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.serialize; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import javax.jms.JMSException; -import org.apache.commons.lang3.exception.ExceptionUtils; - -public class ObjectSerialize implements Serialize<Object> { - - private static ObjectSerialize ins = new ObjectSerialize(); - - public static ObjectSerialize instance() { - return ins; - } - - private ObjectSerialize() { - } - - public byte[] serialize(Object object) throws JMSException { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(object); - oos.close(); - baos.close(); - return baos.toByteArray(); - } - catch (IOException e) { - throw new JMSException(ExceptionUtils.getStackTrace(e)); - } - } - - public Serializable deserialize(byte[] bytes) throws JMSException { - try { - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - ObjectInputStream ois = new ObjectInputStream(bais); - ois.close(); - bais.close(); - return (Serializable) ois.readObject(); - } - catch (IOException e) { - throw new JMSException(e.getMessage()); - } - catch (ClassNotFoundException e) { - throw new JMSException(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java b/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java deleted file mode 100644 index 78a499c..0000000 --- a/src/main/java/org/apache/rocketmq/jms/msg/serialize/Serialize.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.serialize; - -import javax.jms.JMSException; - -public interface Serialize<T> { - - byte[] serialize(T t) throws JMSException; - - T deserialize(byte[] bytes) throws JMSException; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java b/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java deleted file mode 100644 index b6119a5..0000000 --- a/src/main/java/org/apache/rocketmq/jms/msg/serialize/StringSerialize.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg.serialize; - -import javax.jms.JMSException; -import org.apache.rocketmq.jms.support.JMSUtils; - -public class StringSerialize implements Serialize<String> { - - private static final String EMPTY_STRING = ""; - private static final byte[] EMPTY_BYTES = new byte[0]; - private static StringSerialize ins = new StringSerialize(); - - public static StringSerialize instance() { - return ins; - } - - private StringSerialize() { - } - - @Override public byte[] serialize(String s) throws JMSException { - if (null == s) { - return EMPTY_BYTES; - } - return JMSUtils.string2Bytes(s); - } - - @Override public String deserialize(byte[] bytes) throws JMSException { - if (null == bytes) { - return EMPTY_STRING; - } - return JMSUtils.bytes2String(bytes); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java b/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java deleted file mode 100644 index 67c54e9..0000000 --- a/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.support; - -import java.io.UnsupportedEncodingException; -import java.nio.charset.Charset; -import java.util.UUID; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Queue; -import javax.jms.Topic; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.rocketmq.jms.RocketMQConsumer; - -public class JMSUtils { - - public static final String DEFAULT_CHARSET = "UTF-8"; - - public static String getDestinationName(Destination destination) { - try { - String topicName; - if (destination instanceof Topic) { - topicName = ((Topic) destination).getTopicName(); - } - else if (destination instanceof Queue) { - topicName = ((Queue) destination).getQueueName(); - } - else { - throw new JMSException(String.format("Unsupported Destination type:", destination.getClass())); - } - return topicName; - } - catch (JMSException e) { - throw new JMSRuntimeException(e.getMessage()); - } - } - - public static String getConsumerGroup(RocketMQConsumer consumer) { - try { - return getConsumerGroup(consumer.getSubscriptionName(), - consumer.getSession().getConnection().getClientID(), - consumer.isShared() - ); - } - catch (JMSException e) { - throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e)); - } - } - - public static String getConsumerGroup(String subscriptionName, String clientID, boolean shared) { - StringBuffer consumerGroup = new StringBuffer(); - - if (StringUtils.isNotBlank(subscriptionName)) { - consumerGroup.append(subscriptionName); - } - - if (StringUtils.isNotBlank(clientID)) { - if (consumerGroup.length() != 0) { - consumerGroup.append("-"); - } - consumerGroup.append(clientID); - } - - if (shared) { - if (consumerGroup.length() != 0) { - consumerGroup.append("-"); - } - consumerGroup.append(uuid()); - } - - if (consumerGroup.length() == 0) { - consumerGroup.append(uuid()); - } - - return consumerGroup.toString(); - } - - public static String uuid() { - return UUID.randomUUID().toString(); - } - - public static String bytes2String(byte[] bytes) { - Prediction.checkNotNull(bytes, "bytes could not be null"); - return new String(bytes, Charset.forName(DEFAULT_CHARSET)); - } - - public static byte[] string2Bytes(String source) { - Prediction.checkNotNull(source, "source could be null"); - try { - return source.getBytes(DEFAULT_CHARSET); - } - catch (UnsupportedEncodingException e) { - throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java b/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java deleted file mode 100644 index 3ff1d69..0000000 --- a/src/main/java/org/apache/rocketmq/jms/support/ObjectTypeCast.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.support; - -/** - * Converter that convert object directly, which means Integer can only be - * converted to Integer,rather than Integer and Long. - */ -public class ObjectTypeCast { - - public static String cast2String(Object obj) { - if (obj == null) { - return null; - } - if (String.class.isInstance(obj)) { - return (String) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not String.class"); - } - - public static Long cast2Long(Object obj) { - if (obj == null) { - return null; - } - if (Long.class.isInstance(obj)) { - return (Long) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not Long.class"); - } - - public static Integer cast2Integer(Object obj) { - if (obj == null) { - return null; - } - if (Integer.class.isInstance(obj)) { - return (Integer) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not Integer.class"); - } - - public static Boolean cast2Boolean(Object obj) { - if (obj == null) { - return null; - } - if (Boolean.class.isInstance(obj)) { - return (Boolean) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not Boolean.class"); - } - - public static <T> T cast2Object(Object obj, Class<T> target) { - if (obj == null) { - return null; - } - if (target.isInstance(obj)) { - return (T) obj; - } - throw new ClassCastException("To casted object is " + obj.getClass() + ", not " + target.getSimpleName() + ".class"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/support/Prediction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/support/Prediction.java b/src/main/java/org/apache/rocketmq/jms/support/Prediction.java deleted file mode 100644 index 868c5d8..0000000 --- a/src/main/java/org/apache/rocketmq/jms/support/Prediction.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.support; - -public class Prediction { - - public static void checkNotNull(Object obj, String errorMsg) { - if (obj == null) { - throw new IllegalArgumentException(errorMsg); - } - } - - public static void checkNotBlank(String source, String errorMsg) { - if (source == null || source.trim().length() == 0) { - throw new IllegalArgumentException(errorMsg); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java b/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java deleted file mode 100644 index 6e24ab0..0000000 --- a/src/main/java/org/apache/rocketmq/jms/support/PrimitiveTypeCast.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.support; - -import javax.jms.JMSException; -import javax.jms.MapMessage; - -/** - * Primitive type converter, according to the conversion table in {@link MapMessage}. - */ -public class PrimitiveTypeCast { - - /** - * Indicate if the parameter obj is primitive type. - * - * @param obj that to be check - * @return true if the obj is primitive type, otherwise return false - */ - public static boolean isPrimitiveType(Object obj) { - if (obj == null) { - return false; - } - if (Boolean.class.isInstance(obj) - || Byte.class.isInstance(obj) - || Short.class.isInstance(obj) - || Character.class.isInstance(obj) - || Integer.class.isInstance(obj) - || Long.class.isInstance(obj) - || Float.class.isInstance(obj) - || Double.class.isInstance(obj) - || String.class.isInstance(obj) - || byte[].class.isInstance(obj)) { - return true; - } - - return false; - } - - public static boolean cast2Boolean(Object obj) throws JMSException { - if (obj == null) { - return Boolean.valueOf(null); - } - - if (Boolean.class.isInstance(obj)) { - return (Boolean) obj; - } - if (String.class.isInstance(obj)) { - return Boolean.valueOf((String) obj); - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } - - public static byte cast2Byte(Object obj) throws JMSException { - if (obj == null) { - return Byte.valueOf(null); - } - - if (Byte.class.isInstance(obj)) { - return (Byte) obj; - } - if (String.class.isInstance(obj)) { - return Byte.valueOf((String) obj); - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } - - public static short cast2Short(Object obj) throws JMSException { - if (obj == null) { - return Short.valueOf(null); - } - - if (Byte.class.isInstance(obj)) { - return ((Byte) obj).shortValue(); - } - if (Short.class.isInstance(obj)) { - return (Short) obj; - } - if (String.class.isInstance(obj)) { - return Short.valueOf((String) obj); - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } - - public static char cast2Char(Object obj) throws JMSException { - if (obj == null) { - throw new NullPointerException("Obj is required"); - } - - if (Character.class.isInstance(obj)) { - return (Character) obj; - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } - - public static int cast2Int(Object obj) throws JMSException { - if (obj == null) { - return Integer.valueOf(null); - } - - if (Byte.class.isInstance(obj)) { - return ((Byte) obj).intValue(); - } - if (Short.class.isInstance(obj)) { - return ((Short) obj).intValue(); - } - if (Integer.class.isInstance(obj)) { - return (Integer) obj; - } - if (String.class.isInstance(obj)) { - return Integer.parseInt((String) obj); - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } - - public static long cast2Long(Object obj) throws JMSException { - if (obj == null) { - return Long.valueOf(null); - } - - if (Byte.class.isInstance(obj)) { - return ((Byte) obj).longValue(); - } - if (Short.class.isInstance(obj)) { - return ((Short) obj).longValue(); - } - if (Integer.class.isInstance(obj)) { - return ((Integer) obj).longValue(); - } - if (Long.class.isInstance(obj)) { - return (Long) obj; - } - if (String.class.isInstance(obj)) { - return Long.parseLong((String) obj); - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } - - public static float cast2Float(Object obj) throws JMSException { - if (obj == null) { - return Float.valueOf(null); - } - - if (Float.class.isInstance(obj)) { - return (Float) obj; - } - if (String.class.isInstance(obj)) { - return Float.parseFloat((String) obj); - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } - - public static double cast2Double(Object obj) throws JMSException { - if (obj == null) { - return Double.valueOf(null); - } - - if (Float.class.isInstance(obj)) { - return ((Float) obj).doubleValue(); - } - if (Double.class.isInstance(obj)) { - return (Double) obj; - } - if (String.class.isInstance(obj)) { - return Double.parseDouble((String) obj); - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } - - public static String cast2String(Object obj) throws JMSException { - if (obj == null) { - return String.valueOf(null); - } - - if (Boolean.class.isInstance(obj) - || Byte.class.isInstance(obj) - || Short.class.isInstance(obj) - || Character.class.isInstance(obj) - || Integer.class.isInstance(obj) - || Long.class.isInstance(obj) - || Float.class.isInstance(obj) - || Double.class.isInstance(obj) - || String.class.isInstance(obj) - ) { - return obj.toString(); - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } - - public static byte[] cast2ByteArray(Object obj) throws JMSException { - if (obj instanceof byte[]) { - return (byte[]) obj; - } - - throw new JMSException("Incorrect type[" + obj.getClass() + "] to cast"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java b/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java deleted file mode 100644 index c67e71c..0000000 --- a/src/main/java/org/apache/rocketmq/jms/support/ProviderVersion.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.support; - -public class ProviderVersion { - - public static final Version CURRENT_VERSION = Version.V1_1_0; - - public enum Version { - - V1_1_0(1); - private int value; - - Version(int value) { - this.value = value; - } - - public int getValue() { - return value; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index 39da112..0000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,56 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="DefaultAppender" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>${user.home}/logs/rocketmq/jms.log</file> - <append>true</append> - <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> - <fileNamePattern>${user.home}/logs/rocketmq/otherdays/jms.%i.log - </fileNamePattern> - <minIndex>1</minIndex> - <maxIndex>10</maxIndex> - </rollingPolicy> - <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> - <maxFileSize>100MB</maxFileSize> - </triggeringPolicy> - <encoder> - <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern> - <charset class="java.nio.charset.Charset">UTF-8</charset> - </encoder> - </appender> - - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <append>true</append> - <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern> - <charset class="java.nio.charset.Charset">UTF-8</charset> - </encoder> - </appender> - - <logger name="org.apache.rocketmq.jms"> - <level value="DEBUG"/> - </logger> - - <root> - <level value="ERROR"/> - <appender-ref ref="STDOUT"/> - </root> -</configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java b/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java deleted file mode 100644 index 61f1e54..0000000 --- a/src/test/java/org/apache/rocketmq/jms/RocketMQConnectionFactoryTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import org.junit.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; -import static org.hamcrest.core.IsNull.notNullValue; - -public class RocketMQConnectionFactoryTest { - - @Test - public void testClientId() throws Exception { - final String nameServerAddress = "localhost:6789"; - RocketMQConnectionFactory connectionFactory = new RocketMQConnectionFactory(nameServerAddress); - - assertThat(connectionFactory.getNameServerAddress(), is(nameServerAddress)); - assertThat(connectionFactory.getClientId(), notNullValue()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java b/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java deleted file mode 100644 index 0a3b36b..0000000 --- a/src/test/java/org/apache/rocketmq/jms/destination/RocketMQQueueTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.destination; - -import org.junit.Test; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -public class RocketMQQueueTest { - - @Test - public void test() throws Exception { - RocketMQQueue queue = new RocketMQQueue("MyQueue"); - - assertThat(queue.getQueueName(), is("MyQueue")); - assertThat(queue.toString(), is("MyQueue")); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java b/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java deleted file mode 100644 index c482e1c..0000000 --- a/src/test/java/org/apache/rocketmq/jms/destination/RocketMQTopicTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.destination; - -import org.junit.Test; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -public class RocketMQTopicTest { - - @Test - public void test() throws Exception { - RocketMQTopic topic = new RocketMQTopic("MyTopic"); - - assertThat(topic.getTopicName(), is("MyTopic")); - assertThat(topic.toString(), is("MyTopic")); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java b/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java deleted file mode 100644 index 62b5056..0000000 --- a/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.hook; - -import java.text.SimpleDateFormat; -import java.util.Date; -import javax.jms.Message; -import org.apache.rocketmq.jms.exception.MessageExpiredException; -import org.apache.rocketmq.jms.msg.JMSTextMessage; -import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; -import org.junit.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThan; - -public class ReceiveMessageHookTest { - - @Test(expected = MessageExpiredException.class) - public void testValidateFail() throws Exception { - ReceiveMessageHook hook = new ReceiveMessageHook(); - - Message message = new JMSTextMessage("text"); - message.setJMSExpiration(new Date().getTime()); - Thread.sleep(100); - hook.before(message); - } - - @Test - public void testValidateSuccess() throws Exception { - ReceiveMessageHook hook = new ReceiveMessageHook(); - - Message message = new JMSTextMessage("text"); - // never expired - message.setJMSExpiration(0); - hook.before(message); - - // expired in the future - message.setJMSExpiration(new SimpleDateFormat("yyyy-MM-dd").parse("2999-01-01").getTime()); - hook.before(message); - } - - @Test - public void setProviderProperties() throws Exception { - ReceiveMessageHook hook = new ReceiveMessageHook(); - - Message message = new JMSTextMessage("text"); - hook.before(message); - - assertThat(message.getLongProperty(JMSPropertiesEnum.JMSXRcvTimestamp.name()), greaterThan(0L)); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java b/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java deleted file mode 100644 index 29e91ec..0000000 --- a/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.hook; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import org.apache.rocketmq.jms.RocketMQProducer; -import org.apache.rocketmq.jms.destination.RocketMQTopic; -import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException; -import org.apache.rocketmq.jms.msg.JMSTextMessage; -import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; -import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; -import org.junit.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; -import static org.hamcrest.core.IsNot.not; -import static org.hamcrest.core.IsNull.notNullValue; -import static org.hamcrest.core.IsNull.nullValue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class SendMessageHookTest { - - @Test(expected = UnsupportDeliveryModelException.class) - public void testValidate() throws Exception { - final JMSTextMessage message = new JMSTextMessage("text"); - final RocketMQTopic destination = new RocketMQTopic("destination"); - final int deliveryMode = DeliveryMode.NON_PERSISTENT; - final int priority = 4; - final long timeToLive = 1000 * 100L; - - SendMessageHook hook = new SendMessageHook(); - hook.before(message, destination, deliveryMode, priority, timeToLive); - } - - @Test - public void testSetHeader() throws Exception { - RocketMQProducer producer = mock(RocketMQProducer.class); - when(producer.getDeliveryDelay()).thenReturn(0L); - - final JMSTextMessage message = new JMSTextMessage("text"); - final Destination destination = new RocketMQTopic("destination"); - final int deliveryMode = DeliveryMode.PERSISTENT; - final int priority = 5; - long timeToLive = JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE; - SendMessageHook hook = new SendMessageHook(producer); - hook.before(message, destination, deliveryMode, priority, timeToLive); - - assertThat(message.getJMSDestination(), is(destination)); - assertThat(message.getJMSDeliveryMode(), is(JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE)); - assertThat(message.getJMSExpiration(), is(0L)); - assertThat(message.getJMSDeliveryTime(), notNullValue()); - assertThat(message.getJMSPriority(), is(5)); - assertThat(message.getJMSMessageID(), notNullValue()); - assertThat(message.getJMSTimestamp(), notNullValue()); - } - - /** - * Disable ID,timestamp, and set expired time - * - * @throws Exception - */ - @Test - public void testSetHeader2() throws Exception { - RocketMQProducer producer = mock(RocketMQProducer.class); - when(producer.getUserName()).thenReturn("user"); - when(producer.getDisableMessageID()).thenReturn(true); - when(producer.getDisableMessageTimestamp()).thenReturn(true); - - final JMSTextMessage message = new JMSTextMessage("text"); - final Destination destination = new RocketMQTopic("destination"); - final int deliveryMode = DeliveryMode.PERSISTENT; - final int priority = 5; - final long timeToLive = 1000 * 100L; - SendMessageHook hook = new SendMessageHook(producer); - hook.before(message, destination, deliveryMode, priority, timeToLive); - - // assert header - assertThat(message.getJMSMessageID(), nullValue()); - assertThat(message.getJMSTimestamp(), is(0L)); - assertThat(message.getJMSExpiration(), not(is(0L))); - - // assert properties - assertThat(message.getStringProperty(JMSPropertiesEnum.JMSXUserID.name()), is("user")); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java b/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java deleted file mode 100644 index ae08bec..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.source; - -import javax.jms.ConnectionFactory; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.jms.annotation.EnableJms; -import org.springframework.jms.config.DefaultJmsListenerContainerFactory; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.support.converter.SimpleMessageConverter; - -@Configuration -@ComponentScan(basePackageClasses = {RocketMQServer.class}) -@EnableJms -public class AppConfig { - - @Bean - public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { - DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); - factory.setConnectionFactory(connectionFactory()); - factory.setConcurrency("1"); - return factory; - } - - @Bean - public ConnectionFactory connectionFactory() { -// CachingConnectionFactory factory = new CachingConnectionFactory(); -// factory.setTargetConnectionFactory(new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS)); -// return factory; - //todo - return new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS); - } - - @Bean - public JmsTemplate jmsTemplate() { - JmsTemplate jmsTemplate = new JmsTemplate(); - jmsTemplate.setConnectionFactory(connectionFactory()); - jmsTemplate.setMessageConverter(new SimpleMessageConverter()); - return jmsTemplate; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java b/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java deleted file mode 100644 index 0f7a6b1..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.source; - -public class Constant { - - public static final String NAME_SERVER_IP = "127.0.0.1"; - - public static final int NAME_SERVER_PORT = 9153; - - public static final String NAME_SERVER_ADDRESS = NAME_SERVER_IP + ":" + NAME_SERVER_PORT; - - public static final String BROKER_IP = "127.0.0.1"; - - public static final int BROKER_PORT = 9055; - - public static final String BROKER_ADDRESS = BROKER_IP + ":" + BROKER_PORT; - - public static final int BROKER_HA_PORT = 9043; - - public static final String CLIENT_ID = "coffee"; - - public static final String CLIENT_ID_SECOND = "tea"; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java deleted file mode 100644 index 0561dd9..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.source; - -import java.util.Arrays; -import java.util.HashSet; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import static org.apache.rocketmq.jms.integration.source.Constant.BROKER_ADDRESS; -import static org.apache.rocketmq.jms.integration.source.Constant.NAME_SERVER_ADDRESS; - -@Service -public class RocketMQAdmin { - - private static final Logger log = LoggerFactory.getLogger(RocketMQAdmin.class); - - @Autowired - // make sure RocketMQServer start ahead - private RocketMQServer rocketMQServer; - - //MQAdmin client - private DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); - - @PostConstruct - public void start() { - // reduce rebalance waiting time - System.setProperty("rocketmq.client.rebalance.waitInterval", "1000"); - - defaultMQAdminExt.setNamesrvAddr(NAME_SERVER_ADDRESS); - try { - defaultMQAdminExt.start(); - log.info("Start RocketMQAdmin Successfully"); - } - catch (MQClientException e) { - log.error("Failed to start MQAdmin", e); - System.exit(1); - } - } - - @PreDestroy - public void shutdown() { - defaultMQAdminExt.shutdown(); - } - - public void createTopic(String topic) { - createTopic(topic, 1); - } - - public void createTopic(String topic, int queueNum) { - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setTopicName(topic); - topicConfig.setReadQueueNums(queueNum); - topicConfig.setWriteQueueNums(queueNum); - try { - defaultMQAdminExt.createAndUpdateTopicConfig(BROKER_ADDRESS, topicConfig); - } - catch (Exception e) { - log.error("Create topic:{}, addr:{} failed:{}", topic, BROKER_ADDRESS, ExceptionUtils.getStackTrace(e)); - } - } - - public void deleteTopic(String topic) { - try { - defaultMQAdminExt.deleteTopicInBroker(new HashSet(Arrays.asList(BROKER_ADDRESS)), topic); - } - catch (Exception e) { - log.error("Delete topic:{}, addr:{} failed:{}", topic, BROKER_ADDRESS, ExceptionUtils.getStackTrace(e)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java deleted file mode 100644 index 71ae6b1..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.source; - -import java.io.File; -import java.text.SimpleDateFormat; -import java.util.Date; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.namesrv.NamesrvConfig; -import org.apache.rocketmq.namesrv.NamesrvController; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import static java.io.File.separator; -import static org.apache.rocketmq.jms.integration.source.Constant.BROKER_HA_PORT; -import static org.apache.rocketmq.jms.integration.source.Constant.BROKER_PORT; -import static org.apache.rocketmq.jms.integration.source.Constant.NAME_SERVER_ADDRESS; - -@Service -public class RocketMQServer { - public static Logger log = LoggerFactory.getLogger(RocketMQServer.class); - private final SimpleDateFormat sf = new SimpleDateFormat("yyyyMMddHHmmss"); - private final String rootDir = System.getProperty("user.home") + separator + "rocketmq-jms" + separator; - // fixed location of config files which is updated after RMQ3.2.6 - private final String configDir = System.getProperty("user.home") + separator + "store/config"; - - private String serverDir; - private volatile boolean started = false; - - //name server - private NamesrvConfig namesrvConfig = new NamesrvConfig(); - private NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); - private NamesrvController namesrvController; - - //broker - private final String brokerName = "JmsTestBrokerName"; - private BrokerController brokerController; - private BrokerConfig brokerConfig = new BrokerConfig(); - private NettyServerConfig nettyServerConfig = new NettyServerConfig(); - private NettyClientConfig nettyClientConfig = new NettyClientConfig(); - private MessageStoreConfig storeConfig = new MessageStoreConfig(); - - public RocketMQServer() { - this.storeConfig.setDiskMaxUsedSpaceRatio(95); - } - - @PostConstruct - public void start() { - if (started) { - return; - } - - createServerDir(); - - startNameServer(); - - startBroker(); - - started = true; - - log.info("Start RocketServer Successfully"); - } - - private void createServerDir() { - for (int i = 0; i < 5; i++) { - serverDir = rootDir + sf.format(new Date()); - final File file = new File(serverDir); - if (!file.exists()) { - return; - } - } - log.error("Has retry 5 times to register base dir,but still failed."); - System.exit(1); - } - - private void startNameServer() { - namesrvConfig.setKvConfigPath(serverDir + separator + "namesrv" + separator + "kvConfig.json"); - nameServerNettyServerConfig.setListenPort(Constant.NAME_SERVER_PORT); - namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); - try { - namesrvController.initialize(); - log.info("Success to start Name Server:{}", NAME_SERVER_ADDRESS); - namesrvController.start(); - } - catch (Exception e) { - log.error("Failed to start Name Server", e); - System.exit(1); - } - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, NAME_SERVER_ADDRESS); - } - - private void startBroker() { - System.setProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.98"); - brokerConfig.setBrokerName(brokerName); - brokerConfig.setBrokerIP1(Constant.BROKER_IP); - brokerConfig.setNamesrvAddr(NAME_SERVER_ADDRESS); - storeConfig.setStorePathRootDir(serverDir); - storeConfig.setStorePathCommitLog(serverDir + separator + "commitlog"); - storeConfig.setHaListenPort(BROKER_HA_PORT); - nettyServerConfig.setListenPort(BROKER_PORT); - brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); - - try { - brokerController.initialize(); - log.info("Broker Start name:{} address:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); - brokerController.start(); - - } - catch (Exception e) { - log.error("Failed to start Broker", e); - System.exit(1); - } - } - - @PreDestroy - private void shutdown() { - brokerController.shutdown(); - namesrvController.shutdown(); - deleteFile(new File(rootDir)); - deleteFile(new File(configDir)); - } - - public void deleteFile(File file) { - if (!file.exists()) { - return; - } - if (file.isFile()) { - file.delete(); - } - else if (file.isDirectory()) { - File[] files = file.listFiles(); - for (int i = 0; i < files.length; i++) { - deleteFile(files[i]); - } - file.delete(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java b/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java deleted file mode 100644 index f4f8df8..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.source; - -import java.util.ArrayList; -import java.util.List; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jms.annotation.JmsListener; -import org.springframework.messaging.Message; -import org.springframework.stereotype.Component; - -@Component -public class SimpleTextListener { - - public static final String DESTINATION = "orderTest"; - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - private List<String> receivedMsgs = new ArrayList(); - - public SimpleTextListener() { - } - - @PostConstruct - public void init() { - this.rocketMQAdmin.createTopic(DESTINATION); - } - - @PreDestroy - public void destroy() { - this.rocketMQAdmin.deleteTopic(DESTINATION); - } - - @JmsListener(destination = DESTINATION) - public void processOrder(Message<String> message) { - receivedMsgs.add(message.getPayload()); - } - - public List<String> getReceivedMsg() { - return receivedMsgs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java b/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java deleted file mode 100644 index 5136f37..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.source.support; - -public interface ConditionMatcher { - - boolean match(); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java b/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java deleted file mode 100644 index 0417b58..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.source.support; - -import org.apache.commons.lang3.time.StopWatch; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class TimeLimitAssert { - - public static void doAssert(ConditionMatcher conditionMatcher, int timeLimit) throws InterruptedException { - StopWatch watch = new StopWatch(); - watch.start(); - - while (!conditionMatcher.match()) { - Thread.sleep(500); - if (watch.getTime() > timeLimit * 1000) { - assertFalse(String.format("Doesn't match assert condition in %s second", timeLimit), true); - } - } - - assertTrue(true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java deleted file mode 100644 index 018e844..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.test; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.apache.rocketmq.jms.integration.source.AppConfig; -import org.apache.rocketmq.jms.integration.source.Constant; -import org.apache.rocketmq.jms.integration.source.RocketMQAdmin; -import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher; -import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class ConsumeAsynchronousTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - @Test - public void testConsumeAsynchronous() throws Exception { - final String rmqTopicName = "coffee-async" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connection = factory.createConnection(); - Session session = connection.createSession(); - Topic topic = session.createTopic(rmqTopicName); - - try { - //producer - TextMessage message = session.createTextMessage("mocha coffee,please"); - MessageProducer producer = session.createProducer(topic); - producer.send(message); - - //consumer - final List<Message> received = new ArrayList(); - MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); - consumer.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { - received.add(message); - } - }); - - connection.start(); - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return received.size() == 1; - } - }, 5); - } - finally { - connection.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java deleted file mode 100644 index 8a9fa41..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.test; - -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.apache.rocketmq.jms.integration.source.AppConfig; -import org.apache.rocketmq.jms.integration.source.Constant; -import org.apache.rocketmq.jms.integration.source.RocketMQAdmin; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsNull.notNullValue; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class ConsumeSynchronousTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - @Test - public void testConsumeSynchronous() throws Exception { - final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connection = factory.createConnection(); - Session session = connection.createSession(); - connection.start(); - Topic topic = session.createTopic(rmqTopicName); - - try { - //producer - TextMessage message = session.createTextMessage("a"); - MessageProducer producer = session.createProducer(topic); - producer.send(message); - - //consumer - MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); - - connection.start(); - - Message msg = consumer.receive(); - - assertThat(msg, notNullValue()); - } - finally { - connection.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java deleted file mode 100644 index 045d615..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.test; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.apache.rocketmq.jms.integration.source.AppConfig; -import org.apache.rocketmq.jms.integration.source.Constant; -import org.apache.rocketmq.jms.integration.source.RocketMQAdmin; -import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher; -import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class NonDurableConsumeTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - /** - * Test messages that producer after consumer inactive will not be delivered to consumer when it start again. - * - * <p>Test step: - * 1. Create a consumer and start the connection - * 2. Create a producer and send a message(msgA) to the topic subscribed by previous consumer - * 3. MsgA should be consumed successfully - * 4. Close the consumer and stop the connection - * 5. Producer sends a message(msgB) after the consumer closed - * 6. Create another consumer which is a non-durable one, and start the connection - * 7. Result: msgB should be consumed by the previous non-durable consumer - * - * @throws Exception - */ - @Test - public void testConsumeNotDurable() throws Exception { - final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connection = factory.createConnection(); - Session session = connection.createSession(); - connection.start(); - Topic topic = session.createTopic(rmqTopicName); - - try { - //consumer - final List<Message> received = new ArrayList(); - final MessageListener msgListener = new MessageListener() { - @Override public void onMessage(Message message) { - received.add(message); - } - }; - MessageConsumer consumer = session.createConsumer(topic); - consumer.setMessageListener(msgListener); - - connection.start(); - - Thread.sleep(1000 * 3); - - //producer - TextMessage message = session.createTextMessage("a"); - MessageProducer producer = session.createProducer(topic); - producer.send(message); - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return received.size() == 1; - } - }, 3); - - received.clear(); - - // close the consumer - connection.stop(); - consumer.close(); - - // send message - TextMessage lostMessage = session.createTextMessage("b"); - producer.send(lostMessage); - - Thread.sleep(1000 * 2); - - // start the non-durable consumer again - consumer = session.createConsumer(topic, "topic"); - consumer.setMessageListener(msgListener); - connection.start(); - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return received.size() == 0; - } - }, 5); - - } - finally { - connection.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java deleted file mode 100644 index c3731de..0000000 --- a/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.test; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.apache.rocketmq.jms.integration.source.AppConfig; -import org.apache.rocketmq.jms.integration.source.Constant; -import org.apache.rocketmq.jms.integration.source.RocketMQAdmin; -import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher; -import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class SharedDurableConsumeTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - /** - * Test messages will be deliver to every consumer if these consumers are in shared durable subscription. - * - * <p>Test step: - * 1. Create a share durable consumer(consumerA) via the first connection(connectionA) - * 2. Create a share durable consumer(consumerB) via another connection(connectionB) - * 3. The two consumer must subscribe the same topic with identical subscriptionName, - * and they also have the same clientID. - * 4. Send several(eg:10) messages to this topic - * 5. Result: all messages should be received by both consumerA and consumerB - * - * @throws Exception - */ - @Test - public void testConsumeAllMessages() throws Exception { - final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connectionA = null, connectionB = null; - final String subscriptionName = "MySubscription"; - final List<Message> receivedA = new ArrayList(), receivedB = new ArrayList(); - - try { - // consumerA - connectionA = factory.createConnection(); - Session sessionA = connectionA.createSession(); - connectionA.start(); - Topic topic = sessionA.createTopic(rmqTopicName); - MessageConsumer consumerA = sessionA.createSharedDurableConsumer(topic, subscriptionName); - consumerA.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { - receivedA.add(message); - } - }); - - // consumerB - connectionB = factory.createConnection(); - Session sessionB = connectionB.createSession(); - MessageConsumer consumerB = sessionB.createSharedDurableConsumer(topic, subscriptionName); - consumerB.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { - receivedB.add(message); - } - }); - connectionB.start(); - - //producer - TextMessage message = sessionA.createTextMessage("a"); - MessageProducer producer = sessionA.createProducer(topic); - for (int i = 0; i < 10; i++) { - producer.send(message); - } - - Thread.sleep(1000 * 2); - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return receivedA.size()==10 && receivedB.size()==10; - } - },5); - } - finally { - connectionA.close(); - connectionB.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - } - -}
