Add integration test: UnsharedDurableConsumeTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/9649e02d Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/9649e02d Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/9649e02d Branch: refs/heads/jms-dev-1.1.0 Commit: 9649e02d0aa25c4428c88e4c7eba74e85ac52320 Parents: 7d4d8a4 Author: zhangke <[email protected]> Authored: Wed Mar 1 20:09:52 2017 +0800 Committer: zhangke <[email protected]> Committed: Wed Mar 1 20:09:52 2017 +0800 ---------------------------------------------------------------------- core/pom.xml | 4 + .../rocketmq/jms/DeliverMessageService.java | 26 ++-- .../apache/rocketmq/jms/RocketMQConsumer.java | 31 +++-- .../apache/rocketmq/jms/RocketMQSession.java | 33 ++++- .../rocketmq/jms/RocketMQTopicSubscriber.java | 4 +- .../apache/rocketmq/jms/admin/AdminFactory.java | 55 ++++++++ .../DuplicateSubscriptionException.java | 31 +++++ .../jms/exception/JMSClientException.java | 31 +++++ .../apache/rocketmq/jms/support/JMSUtils.java | 42 +++++++ .../rocketmq/jms/support/JMSUtilsTest.java | 18 +++ pom.xml | 5 + test/pom.xml | 10 -- .../rocketmq/jms/integration/RocketMQAdmin.java | 17 ++- .../jms/integration/RocketMQServer.java | 11 +- .../integration/ConsumeAsynchronousTest.java | 11 +- .../jms/integration/NonDurableConsumeTest.java | 24 ++-- .../integration/SharedDurableConsumeTest.java | 30 +++-- .../integration/UnsharedDurableConsumeTest.java | 124 +++++++++++++++++++ .../listener/SimpleTextListenerTest.java | 17 +-- .../integration/support/ConditionMatcher.java | 23 ++++ .../integration/support/TimeLimitAssert.java | 40 ++++++ 21 files changed, 511 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index e977b30..c01d2e0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,6 +34,10 @@ <artifactId>rocketmq-client</artifactId> </dependency> <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-tools</artifactId> + </dependency> + <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java index e568d4c..1043f5d 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java +++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java @@ -17,7 +17,9 @@ package org.apache.rocketmq.jms; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,6 +74,7 @@ public class DeliverMessageService extends ServiceThread { * Otherwise consume from the max offset */ private boolean durable = false; + private boolean shared = false; private BlockingQueue<MessageWrapper> msgQueue = new ArrayBlockingQueue(PULL_BATCH_SIZE); private volatile boolean pause = true; @@ -79,10 +82,14 @@ public class DeliverMessageService extends ServiceThread { private Map<MessageQueue, Long> offsetMap = new HashMap(); - public DeliverMessageService(RocketMQConsumer consumer, Destination destination, String consumerGroup) { + public DeliverMessageService(RocketMQConsumer consumer, Destination destination, String consumerGroup, + String messageSelector, boolean durable, boolean shared) { this.consumer = consumer; this.destination = destination; this.consumerGroup = consumerGroup; + this.messageSelector = messageSelector; + this.durable = durable; + this.shared = shared; this.topicName = JMSUtils.getDestinationName(destination); @@ -101,6 +108,7 @@ public class DeliverMessageService extends ServiceThread { this.rocketMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); this.rocketMQPullConsumer.setNamesrvAddr(clientConfig.getNamesrvAddr()); this.rocketMQPullConsumer.setInstanceName(clientConfig.getInstanceName()); + this.rocketMQPullConsumer.setRegisterTopics(new HashSet(Arrays.asList(this.topicName))); try { this.rocketMQPullConsumer.start(); @@ -136,7 +144,8 @@ public class DeliverMessageService extends ServiceThread { } private void pullMessage() throws Exception { - Set<MessageQueue> mqs = this.rocketMQPullConsumer.fetchSubscribeMessageQueues(this.topicName); + Set<MessageQueue> mqs = getMessageQueues(); + for (MessageQueue mq : mqs) { Long offset = offsetMap.get(mq); if (offset == null) { @@ -162,6 +171,11 @@ public class DeliverMessageService extends ServiceThread { } } + private Set<MessageQueue> getMessageQueues() throws MQClientException { + Set<MessageQueue> mqs = this.rocketMQPullConsumer.fetchSubscribeMessageQueues(this.topicName); + return mqs; + } + /** * Refer to {@link #durable}. * @@ -255,14 +269,6 @@ public class DeliverMessageService extends ServiceThread { log.debug("Success to close message delivery service:{}", getServiceName()); } - public void setMessageSelector(String messageSelector) { - this.messageSelector = messageSelector; - } - - public void setDurable(boolean durable) { - this.durable = durable; - } - public void setConsumeModel(ConsumeModel consumeModel) { this.consumeModel = consumeModel; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java index 0927a2b..af147e0 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java @@ -24,6 +24,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import org.apache.rocketmq.jms.support.JMSUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,29 +35,31 @@ public class RocketMQConsumer implements MessageConsumer { private Destination destination; private String messageSelector; private MessageListener messageListener; - private String sharedSubscriptionName; + private String subscriptionName; private boolean durable; + private boolean shared; private DeliverMessageService deliverMessageService; public RocketMQConsumer(RocketMQSession session, Destination destination, String messageSelector, - boolean durable) { - this(session, destination, messageSelector, UUID.randomUUID().toString(), durable); + boolean durable, boolean shared) { + this(session, destination, messageSelector, UUID.randomUUID().toString(), durable, shared); } public RocketMQConsumer(RocketMQSession session, Destination destination, String messageSelector, - String sharedSubscriptionName, boolean durable) { + String subscriptionName, boolean durable, boolean shared) { this.session = session; this.destination = destination; this.messageSelector = messageSelector; - this.sharedSubscriptionName = sharedSubscriptionName; + this.subscriptionName = subscriptionName; this.durable = durable; + this.shared = shared; - this.deliverMessageService = new DeliverMessageService(this, this.destination, this.sharedSubscriptionName); - this.deliverMessageService.setMessageSelector(this.messageSelector); - this.deliverMessageService.setDurable(this.durable); + String consumerGroup = JMSUtils.getConsumerGroup(this); + this.deliverMessageService = new DeliverMessageService(this, this.destination, consumerGroup, + this.messageSelector, this.durable, this.shared); this.deliverMessageService.start(); } @@ -134,4 +137,16 @@ public class RocketMQConsumer implements MessageConsumer { public RocketMQSession getSession() { return session; } + + public String getSubscriptionName() { + return subscriptionName; + } + + public boolean isDurable() { + return durable; + } + + public boolean isShared() { + return shared; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java index 8825dff..0094c47 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java @@ -42,12 +42,21 @@ import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.jms.admin.AdminFactory; import org.apache.rocketmq.jms.destination.RocketMQQueue; import org.apache.rocketmq.jms.destination.RocketMQTopic; +import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException; import org.apache.rocketmq.jms.msg.JMSBytesMessage; import org.apache.rocketmq.jms.msg.JMSMapMessage; import org.apache.rocketmq.jms.msg.JMSObjectMessage; import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.support.JMSUtils; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +64,9 @@ import static org.apache.rocketmq.jms.Constant.DEFAULT_DURABLE; import static org.apache.rocketmq.jms.Constant.DEFAULT_NO_LOCAL; import static org.apache.rocketmq.jms.Constant.NO_MESSAGE_SELECTOR; +/** + * Implement of {@link Session}. + */ public class RocketMQSession implements Session { private static final Logger log = LoggerFactory.getLogger(RocketMQSession.class); @@ -201,8 +213,9 @@ public class RocketMQSession implements Session { @Override public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { + // ignore noLocal param as RMQ not support - RocketMQConsumer consumer = new RocketMQConsumer(this, destination, messageSelector, DEFAULT_DURABLE); + RocketMQConsumer consumer = new RocketMQConsumer(this, destination, messageSelector, DEFAULT_DURABLE, false); this.consumerList.add(consumer); return consumer; @@ -216,7 +229,7 @@ public class RocketMQSession implements Session { @Override public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) throws JMSException { - RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, sharedSubscriptionName, DEFAULT_DURABLE); + RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, sharedSubscriptionName, DEFAULT_DURABLE, true); this.consumerList.add(consumer); return consumer; @@ -242,7 +255,7 @@ public class RocketMQSession implements Session { @Override public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { - RocketMQTopicSubscriber subscriber = new RocketMQTopicSubscriber(this, topic, messageSelector, name, true); + RocketMQTopicSubscriber subscriber = new RocketMQTopicSubscriber(this, topic, messageSelector, name, true, true); this.consumerList.add(subscriber); return subscriber; @@ -256,7 +269,17 @@ public class RocketMQSession implements Session { @Override public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { - RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, name, true); + DefaultMQAdminExt admin = AdminFactory.getAdmin(this.getConnection().getClientConfig().getNamesrvAddr()); + try { + GroupList groupList = admin.queryTopicConsumeByWho(topic.getTopicName()); + if (groupList.getGroupList().contains(JMSUtils.getConsumerGroup(name, this.getConnection().getClientID(), false))) { + throw new DuplicateSubscriptionException("The same subscription( join subscriptionName with clientID) has existed, so couldn't create consumer on them again "); + } + } + catch (InterruptedException | MQBrokerException | RemotingException | MQClientException e) { + throw new JMSException(ExceptionUtils.getStackTrace(e)); + } + RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, name, true, false); this.consumerList.add(consumer); return consumer; @@ -270,7 +293,7 @@ public class RocketMQSession implements Session { @Override public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException { - RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, name, true); + RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, name, true, true); this.consumerList.add(consumer); return consumer; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java index fcd0494..51b732b 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java @@ -26,8 +26,8 @@ public class RocketMQTopicSubscriber extends RocketMQConsumer implements TopicSu private Topic topic; public RocketMQTopicSubscriber(RocketMQSession session, Topic topic, String messageSelector, - String sharedSubscriptionName, boolean durable) { - super(session, topic, messageSelector, sharedSubscriptionName, durable); + String sharedSubscriptionName, boolean durable, boolean shared) { + super(session, topic, messageSelector, sharedSubscriptionName, durable, shared); this.topic = topic; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java b/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java new file mode 100644 index 0000000..c551a22 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.admin; + +import java.util.concurrent.ConcurrentHashMap; +import javax.jms.JMSException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.jms.exception.JMSClientException; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; + +public class AdminFactory { + + private static ConcurrentHashMap<String/*nameServerAddress*/, DefaultMQAdminExt> admins = new ConcurrentHashMap(); + + public static DefaultMQAdminExt getAdmin(String nameServerAddress) throws JMSException { + if (nameServerAddress == null) { + throw new IllegalArgumentException("NameServerAddress could be null"); + } + + DefaultMQAdminExt admin = admins.get(nameServerAddress); + if (admin != null) { + return admin; + } + + admin = new DefaultMQAdminExt(nameServerAddress); + try { + admin.start(); + } + catch (MQClientException e) { + throw new JMSClientException("Error during starting admin client"); + } + DefaultMQAdminExt old = admins.putIfAbsent(nameServerAddress, admin); + if (old != null) { + admin.shutdown(); + return old; + } + + return admin; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java b/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java new file mode 100644 index 0000000..51ce57d --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.exception; + +import javax.jms.JMSException; + +public class DuplicateSubscriptionException extends JMSException { + + public DuplicateSubscriptionException(String reason, String errorCode) { + super(reason, errorCode); + } + + public DuplicateSubscriptionException(String reason) { + super(reason); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java b/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java new file mode 100644 index 0000000..1335eb9 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.exception; + +import javax.jms.JMSException; + +public class JMSClientException extends JMSException { + + public JMSClientException(String reason, String errorCode) { + super(reason, errorCode); + } + + public JMSClientException(String reason) { + super(reason); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java b/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java index ba5431a..bed8165 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java +++ b/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java @@ -23,6 +23,9 @@ import javax.jms.JMSException; import javax.jms.JMSRuntimeException; import javax.jms.Queue; import javax.jms.Topic; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.rocketmq.jms.RocketMQConsumer; public class JMSUtils { @@ -45,6 +48,45 @@ public class JMSUtils { } } + public static String getConsumerGroup(RocketMQConsumer consumer) { + try { + return getConsumerGroup(consumer.getSubscriptionName(), + consumer.getSession().getConnection().getClientID(), + consumer.isShared() + ); + } + catch (JMSException e) { + throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e)); + } + } + + public static String getConsumerGroup(String subscriptionName, String clientID, boolean shared) { + StringBuffer consumerGroup = new StringBuffer(); + + if (StringUtils.isNotBlank(subscriptionName)) { + consumerGroup.append(subscriptionName); + } + + if (StringUtils.isNotBlank(clientID)) { + if (consumerGroup.length() != 0) { + consumerGroup.append("-"); + } + consumerGroup.append(clientID); + } + + if (shared) { + if (consumerGroup.length() != 0) { + consumerGroup.append("-"); + } + consumerGroup.append(uuid()); + } + + if (consumerGroup.length() == 0) { + consumerGroup.append(uuid()); + } + + return consumerGroup.toString(); + } public static String uuid() { return UUID.randomUUID().toString(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java b/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java index d926fac..db15fee 100644 --- a/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java +++ b/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java @@ -37,6 +37,24 @@ public class JMSUtilsTest { } @Test + public void getConsumerGroup() throws Exception { + final String subscriptionName = "subscriptionName"; + final String clientID = "clientID"; + String consumerGroupA = JMSUtils.getConsumerGroup(subscriptionName, clientID, true); + assertThat(consumerGroupA.contains(subscriptionName), is(true)); + assertThat(consumerGroupA.contains(clientID), is(true)); + assertThat(consumerGroupA.substring(subscriptionName.length() + clientID.length() + 2).length(), is(36)); + + String consumerGroupB = JMSUtils.getConsumerGroup(subscriptionName, clientID, false); + assertThat(consumerGroupB.contains(subscriptionName), is(true)); + assertThat(consumerGroupB.contains(clientID), is(true)); + assertThat(consumerGroupB.length(), is(subscriptionName.length() + clientID.length() + 1)); + + String consumerGroupC = JMSUtils.getConsumerGroup(null, null, true); + assertThat(consumerGroupC.length(), is(36)); + } + + @Test public void uuid() throws Exception { assertThat(JMSUtils.uuid(), notNullValue()); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 48977f3..df376d1 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,11 @@ <version>${rocketmq.version}</version> </dependency> <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-tools</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index f05c080..bd23f68 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -46,16 +46,6 @@ <artifactId>hamcrest-all</artifactId> </dependency> <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-namesrv</artifactId> - <version>${rocketmq.version}</version> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-broker</artifactId> - <version>${rocketmq.version}</version> - </dependency> - <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java index a00370e..7ee2fbb 100644 --- a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java +++ b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java @@ -17,13 +17,13 @@ package org.apache.rocketmq.jms.integration; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import com.google.common.collect.Sets; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,6 +46,9 @@ public class RocketMQAdmin { @PostConstruct public void start() { + // reduce rebalance waiting time + System.setProperty("rocketmq.client.rebalance.waitInterval", "1000"); + defaultMQAdminExt.setNamesrvAddr(NAME_SERVER_ADDRESS); try { defaultMQAdminExt.start(); @@ -63,10 +66,14 @@ public class RocketMQAdmin { } public void createTopic(String topic) { + createTopic(topic, 1); + } + + public void createTopic(String topic, int queueNum) { TopicConfig topicConfig = new TopicConfig(); topicConfig.setTopicName(topic); - topicConfig.setReadQueueNums(1); - topicConfig.setWriteQueueNums(1); + topicConfig.setReadQueueNums(queueNum); + topicConfig.setWriteQueueNums(queueNum); try { defaultMQAdminExt.createAndUpdateTopicConfig(BROKER_ADDRESS, topicConfig); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java index 416e8d9..c2d0f49 100644 --- a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java +++ b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java @@ -17,6 +17,11 @@ package org.apache.rocketmq.jms.integration; +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.Date; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; @@ -25,11 +30,6 @@ import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.config.MessageStoreConfig; -import java.io.File; -import java.text.SimpleDateFormat; -import java.util.Date; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -113,6 +113,7 @@ public class RocketMQServer { } private void startBroker() { + System.setProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.98"); brokerConfig.setBrokerName(brokerName); brokerConfig.setBrokerIP1(Constant.BROKER_IP); brokerConfig.setNamesrvAddr(NAME_SERVER_ADDRESS); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java index fc2cc34..ea3fa60 100644 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java @@ -30,6 +30,8 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.apache.rocketmq.jms.integration.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -54,7 +56,6 @@ public class ConsumeAsynchronousTest { ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); Connection connection = factory.createConnection(); Session session = connection.createSession(); - connection.start(); Topic topic = session.createTopic(rmqTopicName); try { @@ -74,9 +75,11 @@ public class ConsumeAsynchronousTest { connection.start(); - Thread.sleep(1000 * 5); - - assertThat(received.size(), is(1)); + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return received.size() == 1; + } + }, 5); } finally { connection.close(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java index 82f73fb..d222f98 100644 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java @@ -30,15 +30,14 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.apache.rocketmq.jms.integration.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = AppConfig.class) public class NonDurableConsumeTest { @@ -62,7 +61,7 @@ public class NonDurableConsumeTest { */ @Test public void testConsumeNotDurable() throws Exception { - final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); + final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); rocketMQAdmin.createTopic(rmqTopicName); ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); @@ -84,14 +83,19 @@ public class NonDurableConsumeTest { connection.start(); + Thread.sleep(1000 * 3); + //producer TextMessage message = session.createTextMessage("a"); MessageProducer producer = session.createProducer(topic); producer.send(message); - Thread.sleep(1000 * 2); + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return received.size() == 1; + } + }, 3); - assertThat(received.size(), is(1)); received.clear(); // close the consumer @@ -109,9 +113,11 @@ public class NonDurableConsumeTest { consumer.setMessageListener(msgListener); connection.start(); - Thread.sleep(1000 * 3); - - assertThat(received.size(), is(0)); + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return received.size() == 0; + } + }, 5); } finally { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java index 715f70c..a22872a 100644 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java @@ -30,6 +30,8 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.apache.rocketmq.jms.integration.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -46,9 +48,22 @@ public class SharedDurableConsumeTest { @Autowired private RocketMQAdmin rocketMQAdmin; + /** + * Test messages will be deliver to every consumer if these consumers are in shared durable subscription. + * + * <p>Test step: + * 1. Create a share durable consumer(consumerA) via the first connection(connectionA) + * 2. Create a share durable consumer(consumerB) via another connection(connectionB) + * 3. The two consumer must subscribe the same topic with identical subscriptionName, + * and they also have the same clientID. + * 4. Send several(eg:10) messages to this topic + * 5. Result: all messages should be received by both consumerA and consumerB + * + * @throws Exception + */ @Test - public void test() throws Exception { - final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); + public void testConsumeAllMessages() throws Exception { + final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); rocketMQAdmin.createTopic(rmqTopicName); ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); @@ -78,8 +93,6 @@ public class SharedDurableConsumeTest { receivedB.add(message); } }); - - connectionA.start(); connectionB.start(); //producer @@ -89,10 +102,13 @@ public class SharedDurableConsumeTest { producer.send(message); } - Thread.sleep(1000 * 5); + Thread.sleep(1000 * 2); - assertThat(receivedA.size(), is(10)); - assertThat(receivedB.size(), is(10)); + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return receivedA.size()==10 && receivedB.size()==10; + } + },5); } finally { connectionA.close(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java new file mode 100644 index 0000000..c0e4e79 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.apache.rocketmq.jms.RocketMQSession; +import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException; +import org.apache.rocketmq.jms.integration.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class UnsharedDurableConsumeTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + /** + * Test each message will be deliver to only one consumer if these consumers are in unshared durable subscription. + * + * <p>Test step: + * 1. Create a unshared durable consumer(consumerA) via the first connection(connectionA) + * 2. Create a unshared durable consumer(consumerB) via another connection(connectionB) + * 3. Result: + * a. The creating consumerB should throw a JMSException as consumerA and consumberB have the same subscription + * b. All messages should be received by consumerA + * + * @throws Exception + * @see {@link RocketMQSession} + */ + @Test + public void testEachMessageOnlyConsumeByOneConsumer() throws Exception { + final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName, 2); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connectionA = null, connectionB = null; + final String subscriptionName = "MySubscription"; + final List<Message> receivedA = new ArrayList(); + + try { + // consumerA + connectionA = factory.createConnection(); + Session sessionA = connectionA.createSession(); + connectionA.start(); + Topic topic = sessionA.createTopic(rmqTopicName); + MessageConsumer consumerA = sessionA.createDurableConsumer(topic, subscriptionName); + consumerA.setMessageListener(new MessageListener() { + @Override public void onMessage(Message message) { + receivedA.add(message); + } + }); + + Thread.sleep(1000 * 2); + + // consumerB + try { + connectionB = factory.createConnection(); + Session sessionB = connectionB.createSession(); + sessionB.createDurableConsumer(topic, subscriptionName); + assertFalse("Doesn't get the expected " + DuplicateSubscriptionException.class.getSimpleName(), true); + } + catch (DuplicateSubscriptionException e) { + assertTrue(true); + } + + connectionA.start(); + + //producer + TextMessage message = sessionA.createTextMessage("a"); + MessageProducer producer = sessionA.createProducer(topic); + for (int i = 0; i < 10; i++) { + producer.send(message); + } + + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return receivedA.size() == 10; + } + }, 5); + } + finally { + connectionA.close(); + connectionB.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java index 4b994a3..8518ab5 100644 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.jms.integration.listener; import org.apache.commons.lang.time.StopWatch; import org.apache.rocketmq.jms.integration.AppConfig; +import org.apache.rocketmq.jms.integration.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -29,8 +31,6 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import static org.apache.rocketmq.jms.integration.listener.SimpleTextListener.DESTINATION; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; @RunWith(SpringRunner.class) @ContextConfiguration(classes = AppConfig.class) @@ -50,15 +50,10 @@ public class SimpleTextListenerTest { StopWatch watch = new StopWatch(); watch.start(); - int count = 1; - while (simpleTextListener.getReceivedMsg().size() != count) { - Thread.sleep(1000); - log.info("Waiting for receiving {} messages sent to {} topic,now has received {}", - count, DESTINATION, simpleTextListener.getReceivedMsg().size()); - if (watch.getTime() > 1000 * 10) { - assertFalse(true); + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return simpleTextListener.getReceivedMsg().size() == 1; } - } - assertTrue(true); + }, 60); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java b/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java new file mode 100644 index 0000000..22d7683 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.support; + +public interface ConditionMatcher { + + boolean match(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java b/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java new file mode 100644 index 0000000..6877cbd --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.support; + +import org.apache.commons.lang.time.StopWatch; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TimeLimitAssert { + + public static void doAssert(ConditionMatcher conditionMatcher, int timeLimit) throws InterruptedException { + StopWatch watch = new StopWatch(); + watch.start(); + + while (!conditionMatcher.match()) { + Thread.sleep(500); + if (watch.getTime() > timeLimit * 1000) { + assertFalse(String.format("Doesn't match assert condition in %s second", timeLimit), true); + } + } + + assertTrue(true); + } +}
