Add integration test: UnsharedDurableConsumeTest

Project: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/9649e02d
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/9649e02d
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/9649e02d

Branch: refs/heads/jms-dev-1.1.0
Commit: 9649e02d0aa25c4428c88e4c7eba74e85ac52320
Parents: 7d4d8a4
Author: zhangke <[email protected]>
Authored: Wed Mar 1 20:09:52 2017 +0800
Committer: zhangke <[email protected]>
Committed: Wed Mar 1 20:09:52 2017 +0800

----------------------------------------------------------------------
 core/pom.xml                                    |   4 +
 .../rocketmq/jms/DeliverMessageService.java     |  26 ++--
 .../apache/rocketmq/jms/RocketMQConsumer.java   |  31 +++--
 .../apache/rocketmq/jms/RocketMQSession.java    |  33 ++++-
 .../rocketmq/jms/RocketMQTopicSubscriber.java   |   4 +-
 .../apache/rocketmq/jms/admin/AdminFactory.java |  55 ++++++++
 .../DuplicateSubscriptionException.java         |  31 +++++
 .../jms/exception/JMSClientException.java       |  31 +++++
 .../apache/rocketmq/jms/support/JMSUtils.java   |  42 +++++++
 .../rocketmq/jms/support/JMSUtilsTest.java      |  18 +++
 pom.xml                                         |   5 +
 test/pom.xml                                    |  10 --
 .../rocketmq/jms/integration/RocketMQAdmin.java |  17 ++-
 .../jms/integration/RocketMQServer.java         |  11 +-
 .../integration/ConsumeAsynchronousTest.java    |  11 +-
 .../jms/integration/NonDurableConsumeTest.java  |  24 ++--
 .../integration/SharedDurableConsumeTest.java   |  30 +++--
 .../integration/UnsharedDurableConsumeTest.java | 124 +++++++++++++++++++
 .../listener/SimpleTextListenerTest.java        |  17 +--
 .../integration/support/ConditionMatcher.java   |  23 ++++
 .../integration/support/TimeLimitAssert.java    |  40 ++++++
 21 files changed, 511 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index e977b30..c01d2e0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -34,6 +34,10 @@
             <artifactId>rocketmq-client</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.jms</groupId>
             <artifactId>javax.jms-api</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java 
b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
index e568d4c..1043f5d 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
@@ -17,7 +17,9 @@
 
 package org.apache.rocketmq.jms;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -72,6 +74,7 @@ public class DeliverMessageService extends ServiceThread {
      * Otherwise consume from the max offset
      */
     private boolean durable = false;
+    private boolean shared = false;
 
     private BlockingQueue<MessageWrapper> msgQueue = new 
ArrayBlockingQueue(PULL_BATCH_SIZE);
     private volatile boolean pause = true;
@@ -79,10 +82,14 @@ public class DeliverMessageService extends ServiceThread {
 
     private Map<MessageQueue, Long> offsetMap = new HashMap();
 
-    public DeliverMessageService(RocketMQConsumer consumer, Destination 
destination, String consumerGroup) {
+    public DeliverMessageService(RocketMQConsumer consumer, Destination 
destination, String consumerGroup,
+        String messageSelector, boolean durable, boolean shared) {
         this.consumer = consumer;
         this.destination = destination;
         this.consumerGroup = consumerGroup;
+        this.messageSelector = messageSelector;
+        this.durable = durable;
+        this.shared = shared;
 
         this.topicName = JMSUtils.getDestinationName(destination);
 
@@ -101,6 +108,7 @@ public class DeliverMessageService extends ServiceThread {
         this.rocketMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
         
this.rocketMQPullConsumer.setNamesrvAddr(clientConfig.getNamesrvAddr());
         
this.rocketMQPullConsumer.setInstanceName(clientConfig.getInstanceName());
+        this.rocketMQPullConsumer.setRegisterTopics(new 
HashSet(Arrays.asList(this.topicName)));
 
         try {
             this.rocketMQPullConsumer.start();
@@ -136,7 +144,8 @@ public class DeliverMessageService extends ServiceThread {
     }
 
     private void pullMessage() throws Exception {
-        Set<MessageQueue> mqs = 
this.rocketMQPullConsumer.fetchSubscribeMessageQueues(this.topicName);
+        Set<MessageQueue> mqs = getMessageQueues();
+
         for (MessageQueue mq : mqs) {
             Long offset = offsetMap.get(mq);
             if (offset == null) {
@@ -162,6 +171,11 @@ public class DeliverMessageService extends ServiceThread {
         }
     }
 
+    private Set<MessageQueue> getMessageQueues() throws MQClientException {
+        Set<MessageQueue> mqs = 
this.rocketMQPullConsumer.fetchSubscribeMessageQueues(this.topicName);
+        return mqs;
+    }
+
     /**
      * Refer to {@link #durable}.
      *
@@ -255,14 +269,6 @@ public class DeliverMessageService extends ServiceThread {
         log.debug("Success to close message delivery service:{}", 
getServiceName());
     }
 
-    public void setMessageSelector(String messageSelector) {
-        this.messageSelector = messageSelector;
-    }
-
-    public void setDurable(boolean durable) {
-        this.durable = durable;
-    }
-
     public void setConsumeModel(ConsumeModel consumeModel) {
         this.consumeModel = consumeModel;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
index 0927a2b..af147e0 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
+import org.apache.rocketmq.jms.support.JMSUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,29 +35,31 @@ public class RocketMQConsumer implements MessageConsumer {
     private Destination destination;
     private String messageSelector;
     private MessageListener messageListener;
-    private String sharedSubscriptionName;
+    private String subscriptionName;
     private boolean durable;
+    private boolean shared;
 
     private DeliverMessageService deliverMessageService;
 
     public RocketMQConsumer(RocketMQSession session, Destination destination,
         String messageSelector,
-        boolean durable) {
-        this(session, destination, messageSelector, 
UUID.randomUUID().toString(), durable);
+        boolean durable, boolean shared) {
+        this(session, destination, messageSelector, 
UUID.randomUUID().toString(), durable, shared);
     }
 
     public RocketMQConsumer(RocketMQSession session, Destination destination,
         String messageSelector,
-        String sharedSubscriptionName, boolean durable) {
+        String subscriptionName, boolean durable, boolean shared) {
         this.session = session;
         this.destination = destination;
         this.messageSelector = messageSelector;
-        this.sharedSubscriptionName = sharedSubscriptionName;
+        this.subscriptionName = subscriptionName;
         this.durable = durable;
+        this.shared = shared;
 
-        this.deliverMessageService = new DeliverMessageService(this, 
this.destination, this.sharedSubscriptionName);
-        this.deliverMessageService.setMessageSelector(this.messageSelector);
-        this.deliverMessageService.setDurable(this.durable);
+        String consumerGroup = JMSUtils.getConsumerGroup(this);
+        this.deliverMessageService = new DeliverMessageService(this, 
this.destination, consumerGroup,
+            this.messageSelector, this.durable, this.shared);
         this.deliverMessageService.start();
     }
 
@@ -134,4 +137,16 @@ public class RocketMQConsumer implements MessageConsumer {
     public RocketMQSession getSession() {
         return session;
     }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public boolean isShared() {
+        return shared;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
index 8825dff..0094c47 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
@@ -42,12 +42,21 @@ import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.jms.admin.AdminFactory;
 import org.apache.rocketmq.jms.destination.RocketMQQueue;
 import org.apache.rocketmq.jms.destination.RocketMQTopic;
+import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException;
 import org.apache.rocketmq.jms.msg.JMSBytesMessage;
 import org.apache.rocketmq.jms.msg.JMSMapMessage;
 import org.apache.rocketmq.jms.msg.JMSObjectMessage;
 import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.apache.rocketmq.jms.support.JMSUtils;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +64,9 @@ import static 
org.apache.rocketmq.jms.Constant.DEFAULT_DURABLE;
 import static org.apache.rocketmq.jms.Constant.DEFAULT_NO_LOCAL;
 import static org.apache.rocketmq.jms.Constant.NO_MESSAGE_SELECTOR;
 
+/**
+ * Implement of {@link Session}.
+ */
 public class RocketMQSession implements Session {
 
     private static final Logger log = 
LoggerFactory.getLogger(RocketMQSession.class);
@@ -201,8 +213,9 @@ public class RocketMQSession implements Session {
     @Override
     public MessageConsumer createConsumer(Destination destination, String 
messageSelector,
         boolean noLocal) throws JMSException {
+
         // ignore noLocal param as RMQ not support
-        RocketMQConsumer consumer = new RocketMQConsumer(this, destination, 
messageSelector, DEFAULT_DURABLE);
+        RocketMQConsumer consumer = new RocketMQConsumer(this, destination, 
messageSelector, DEFAULT_DURABLE, false);
         this.consumerList.add(consumer);
 
         return consumer;
@@ -216,7 +229,7 @@ public class RocketMQSession implements Session {
     @Override
     public MessageConsumer createSharedConsumer(Topic topic, String 
sharedSubscriptionName,
         String messageSelector) throws JMSException {
-        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, sharedSubscriptionName, DEFAULT_DURABLE);
+        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, sharedSubscriptionName, DEFAULT_DURABLE, true);
         this.consumerList.add(consumer);
 
         return consumer;
@@ -242,7 +255,7 @@ public class RocketMQSession implements Session {
     @Override
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, 
String messageSelector,
         boolean noLocal) throws JMSException {
-        RocketMQTopicSubscriber subscriber = new RocketMQTopicSubscriber(this, 
topic, messageSelector, name, true);
+        RocketMQTopicSubscriber subscriber = new RocketMQTopicSubscriber(this, 
topic, messageSelector, name, true, true);
         this.consumerList.add(subscriber);
 
         return subscriber;
@@ -256,7 +269,17 @@ public class RocketMQSession implements Session {
     @Override
     public MessageConsumer createDurableConsumer(Topic topic, String name, 
String messageSelector,
         boolean noLocal) throws JMSException {
-        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, name, true);
+        DefaultMQAdminExt admin = 
AdminFactory.getAdmin(this.getConnection().getClientConfig().getNamesrvAddr());
+        try {
+            GroupList groupList = 
admin.queryTopicConsumeByWho(topic.getTopicName());
+            if 
(groupList.getGroupList().contains(JMSUtils.getConsumerGroup(name, 
this.getConnection().getClientID(), false))) {
+                throw new DuplicateSubscriptionException("The same 
subscription( join subscriptionName with clientID) has existed, so couldn't 
create consumer on them again ");
+            }
+        }
+        catch (InterruptedException | MQBrokerException | RemotingException | 
MQClientException e) {
+            throw new JMSException(ExceptionUtils.getStackTrace(e));
+        }
+        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, name, true, false);
         this.consumerList.add(consumer);
 
         return consumer;
@@ -270,7 +293,7 @@ public class RocketMQSession implements Session {
     @Override
     public MessageConsumer createSharedDurableConsumer(Topic topic, String 
name,
         String messageSelector) throws JMSException {
-        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, name, true);
+        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, name, true, true);
         this.consumerList.add(consumer);
 
         return consumer;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
index fcd0494..51b732b 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
@@ -26,8 +26,8 @@ public class RocketMQTopicSubscriber extends RocketMQConsumer 
implements TopicSu
     private Topic topic;
 
     public RocketMQTopicSubscriber(RocketMQSession session, Topic topic, 
String messageSelector,
-        String sharedSubscriptionName, boolean durable) {
-        super(session, topic, messageSelector, sharedSubscriptionName, 
durable);
+        String sharedSubscriptionName, boolean durable, boolean shared) {
+        super(session, topic, messageSelector, sharedSubscriptionName, 
durable, shared);
         this.topic = topic;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java 
b/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java
new file mode 100644
index 0000000..c551a22
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.admin;
+
+import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.JMSException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.jms.exception.JMSClientException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+public class AdminFactory {
+
+    private static ConcurrentHashMap<String/*nameServerAddress*/, 
DefaultMQAdminExt> admins = new ConcurrentHashMap();
+
+    public static DefaultMQAdminExt getAdmin(String nameServerAddress) throws 
JMSException {
+        if (nameServerAddress == null) {
+            throw new IllegalArgumentException("NameServerAddress could be 
null");
+        }
+
+        DefaultMQAdminExt admin = admins.get(nameServerAddress);
+        if (admin != null) {
+            return admin;
+        }
+
+        admin = new DefaultMQAdminExt(nameServerAddress);
+        try {
+            admin.start();
+        }
+        catch (MQClientException e) {
+            throw new JMSClientException("Error during starting admin client");
+        }
+        DefaultMQAdminExt old = admins.putIfAbsent(nameServerAddress, admin);
+        if (old != null) {
+            admin.shutdown();
+            return old;
+        }
+
+        return admin;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java
 
b/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java
new file mode 100644
index 0000000..51ce57d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.exception;
+
+import javax.jms.JMSException;
+
+public class DuplicateSubscriptionException extends JMSException {
+
+    public DuplicateSubscriptionException(String reason, String errorCode) {
+        super(reason, errorCode);
+    }
+
+    public DuplicateSubscriptionException(String reason) {
+        super(reason);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java 
b/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java
new file mode 100644
index 0000000..1335eb9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.exception;
+
+import javax.jms.JMSException;
+
+public class JMSClientException extends JMSException {
+
+    public JMSClientException(String reason, String errorCode) {
+        super(reason, errorCode);
+    }
+
+    public JMSClientException(String reason) {
+        super(reason);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java 
b/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java
index ba5431a..bed8165 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/support/JMSUtils.java
@@ -23,6 +23,9 @@ import javax.jms.JMSException;
 import javax.jms.JMSRuntimeException;
 import javax.jms.Queue;
 import javax.jms.Topic;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.rocketmq.jms.RocketMQConsumer;
 
 public class JMSUtils {
 
@@ -45,6 +48,45 @@ public class JMSUtils {
         }
     }
 
+    public static String getConsumerGroup(RocketMQConsumer consumer) {
+        try {
+            return getConsumerGroup(consumer.getSubscriptionName(),
+                consumer.getSession().getConnection().getClientID(),
+                consumer.isShared()
+            );
+        }
+        catch (JMSException e) {
+            throw new JMSRuntimeException(ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    public static String getConsumerGroup(String subscriptionName, String 
clientID, boolean shared) {
+        StringBuffer consumerGroup = new StringBuffer();
+
+        if (StringUtils.isNotBlank(subscriptionName)) {
+            consumerGroup.append(subscriptionName);
+        }
+
+        if (StringUtils.isNotBlank(clientID)) {
+            if (consumerGroup.length() != 0) {
+                consumerGroup.append("-");
+            }
+            consumerGroup.append(clientID);
+        }
+
+        if (shared) {
+            if (consumerGroup.length() != 0) {
+                consumerGroup.append("-");
+            }
+            consumerGroup.append(uuid());
+        }
+
+        if (consumerGroup.length() == 0) {
+            consumerGroup.append(uuid());
+        }
+
+        return consumerGroup.toString();
+    }
 
     public static String uuid() {
         return UUID.randomUUID().toString();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java 
b/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java
index d926fac..db15fee 100644
--- a/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java
+++ b/core/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java
@@ -37,6 +37,24 @@ public class JMSUtilsTest {
     }
 
     @Test
+    public void getConsumerGroup() throws Exception {
+        final String subscriptionName = "subscriptionName";
+        final String clientID = "clientID";
+        String consumerGroupA = JMSUtils.getConsumerGroup(subscriptionName, 
clientID, true);
+        assertThat(consumerGroupA.contains(subscriptionName), is(true));
+        assertThat(consumerGroupA.contains(clientID), is(true));
+        assertThat(consumerGroupA.substring(subscriptionName.length() + 
clientID.length() + 2).length(), is(36));
+
+        String consumerGroupB = JMSUtils.getConsumerGroup(subscriptionName, 
clientID, false);
+        assertThat(consumerGroupB.contains(subscriptionName), is(true));
+        assertThat(consumerGroupB.contains(clientID), is(true));
+        assertThat(consumerGroupB.length(), is(subscriptionName.length() + 
clientID.length() + 1));
+
+        String consumerGroupC = JMSUtils.getConsumerGroup(null, null, true);
+        assertThat(consumerGroupC.length(), is(36));
+    }
+
+    @Test
     public void uuid() throws Exception {
         assertThat(JMSUtils.uuid(), notNullValue());
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 48977f3..df376d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,11 @@
                 <version>${rocketmq.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-tools</artifactId>
+                <version>${rocketmq.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>javax.jms</groupId>
                 <artifactId>javax.jms-api</artifactId>
                 <version>2.0.1</version>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index f05c080..bd23f68 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -46,16 +46,6 @@
             <artifactId>hamcrest-all</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-namesrv</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-broker</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-jms</artifactId>
             <version>${spring.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java 
b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java
index a00370e..7ee2fbb 100644
--- a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java
+++ b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java
@@ -17,13 +17,13 @@
 
 package org.apache.rocketmq.jms.integration;
 
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import com.google.common.collect.Sets;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -46,6 +46,9 @@ public class RocketMQAdmin {
 
     @PostConstruct
     public void start() {
+        // reduce rebalance waiting time
+        System.setProperty("rocketmq.client.rebalance.waitInterval", "1000");
+
         defaultMQAdminExt.setNamesrvAddr(NAME_SERVER_ADDRESS);
         try {
             defaultMQAdminExt.start();
@@ -63,10 +66,14 @@ public class RocketMQAdmin {
     }
 
     public void createTopic(String topic) {
+        createTopic(topic, 1);
+    }
+
+    public void createTopic(String topic, int queueNum) {
         TopicConfig topicConfig = new TopicConfig();
         topicConfig.setTopicName(topic);
-        topicConfig.setReadQueueNums(1);
-        topicConfig.setWriteQueueNums(1);
+        topicConfig.setReadQueueNums(queueNum);
+        topicConfig.setWriteQueueNums(queueNum);
         try {
             defaultMQAdminExt.createAndUpdateTopicConfig(BROKER_ADDRESS, 
topicConfig);
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java 
b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java
index 416e8d9..c2d0f49 100644
--- a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java
+++ b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java
@@ -17,6 +17,11 @@
 
 package org.apache.rocketmq.jms.integration;
 
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
@@ -25,11 +30,6 @@ import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import java.io.File;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
@@ -113,6 +113,7 @@ public class RocketMQServer {
     }
 
     private void startBroker() {
+        System.setProperty("rocketmq.broker.diskSpaceWarningLevelRatio", 
"0.98");
         brokerConfig.setBrokerName(brokerName);
         brokerConfig.setBrokerIP1(Constant.BROKER_IP);
         brokerConfig.setNamesrvAddr(NAME_SERVER_ADDRESS);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java
index fc2cc34..ea3fa60 100644
--- 
a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java
@@ -30,6 +30,8 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.apache.rocketmq.jms.integration.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.support.TimeLimitAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -54,7 +56,6 @@ public class ConsumeAsynchronousTest {
         ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
         Connection connection = factory.createConnection();
         Session session = connection.createSession();
-        connection.start();
         Topic topic = session.createTopic(rmqTopicName);
 
         try {
@@ -74,9 +75,11 @@ public class ConsumeAsynchronousTest {
 
             connection.start();
 
-            Thread.sleep(1000 * 5);
-
-            assertThat(received.size(), is(1));
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return received.size() == 1;
+                }
+            }, 5);
         }
         finally {
             connection.close();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java
index 82f73fb..d222f98 100644
--- 
a/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java
@@ -30,15 +30,14 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.apache.rocketmq.jms.integration.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.support.TimeLimitAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(classes = AppConfig.class)
 public class NonDurableConsumeTest {
@@ -62,7 +61,7 @@ public class NonDurableConsumeTest {
      */
     @Test
     public void testConsumeNotDurable() throws Exception {
-        final String rmqTopicName = "coffee-syn" + 
UUID.randomUUID().toString();
+        final String rmqTopicName = "coffee" + UUID.randomUUID().toString();
         rocketMQAdmin.createTopic(rmqTopicName);
 
         ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
@@ -84,14 +83,19 @@ public class NonDurableConsumeTest {
 
             connection.start();
 
+            Thread.sleep(1000 * 3);
+
             //producer
             TextMessage message = session.createTextMessage("a");
             MessageProducer producer = session.createProducer(topic);
             producer.send(message);
 
-            Thread.sleep(1000 * 2);
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return received.size() == 1;
+                }
+            }, 3);
 
-            assertThat(received.size(), is(1));
             received.clear();
 
             // close the consumer
@@ -109,9 +113,11 @@ public class NonDurableConsumeTest {
             consumer.setMessageListener(msgListener);
             connection.start();
 
-            Thread.sleep(1000 * 3);
-
-            assertThat(received.size(), is(0));
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return received.size() == 0;
+                }
+            }, 5);
 
         }
         finally {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java
index 715f70c..a22872a 100644
--- 
a/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java
@@ -30,6 +30,8 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.apache.rocketmq.jms.integration.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.support.TimeLimitAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -46,9 +48,22 @@ public class SharedDurableConsumeTest {
     @Autowired
     private RocketMQAdmin rocketMQAdmin;
 
+    /**
+     * Test messages will be deliver to every consumer if these consumers are 
in shared durable subscription.
+     *
+     * <p>Test step:
+     * 1. Create a share durable consumer(consumerA) via the first 
connection(connectionA)
+     * 2. Create a share durable consumer(consumerB) via another 
connection(connectionB)
+     * 3. The two consumer must subscribe the same topic with identical 
subscriptionName,
+     * and they also have the same clientID.
+     * 4. Send several(eg:10) messages to this topic
+     * 5. Result: all messages should be received by both consumerA and 
consumerB
+     *
+     * @throws Exception
+     */
     @Test
-    public void test() throws Exception {
-        final String rmqTopicName = "coffee-syn" + 
UUID.randomUUID().toString();
+    public void testConsumeAllMessages() throws Exception {
+        final String rmqTopicName = "coffee" + UUID.randomUUID().toString();
         rocketMQAdmin.createTopic(rmqTopicName);
 
         ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
@@ -78,8 +93,6 @@ public class SharedDurableConsumeTest {
                     receivedB.add(message);
                 }
             });
-
-            connectionA.start();
             connectionB.start();
 
             //producer
@@ -89,10 +102,13 @@ public class SharedDurableConsumeTest {
                 producer.send(message);
             }
 
-            Thread.sleep(1000 * 5);
+            Thread.sleep(1000 * 2);
 
-            assertThat(receivedA.size(), is(10));
-            assertThat(receivedB.size(), is(10));
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return receivedA.size()==10 && receivedB.size()==10;
+                }
+            },5);
         }
         finally {
             connectionA.close();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java
new file mode 100644
index 0000000..c0e4e79
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.apache.rocketmq.jms.RocketMQSession;
+import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException;
+import org.apache.rocketmq.jms.integration.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.support.TimeLimitAssert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class UnsharedDurableConsumeTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    /**
+     * Test each message will be deliver to only one consumer if these 
consumers are in unshared durable subscription.
+     *
+     * <p>Test step:
+     * 1. Create a unshared durable consumer(consumerA) via the first 
connection(connectionA)
+     * 2. Create a unshared durable consumer(consumerB) via another 
connection(connectionB)
+     * 3. Result:
+     * a. The creating consumerB should throw a JMSException as consumerA and 
consumberB have the same subscription
+     * b. All messages should be received by consumerA
+     *
+     * @throws Exception
+     * @see {@link RocketMQSession}
+     */
+    @Test
+    public void testEachMessageOnlyConsumeByOneConsumer() throws Exception {
+        final String rmqTopicName = "coffee" + UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName, 2);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connectionA = null, connectionB = null;
+        final String subscriptionName = "MySubscription";
+        final List<Message> receivedA = new ArrayList();
+
+        try {
+            // consumerA
+            connectionA = factory.createConnection();
+            Session sessionA = connectionA.createSession();
+            connectionA.start();
+            Topic topic = sessionA.createTopic(rmqTopicName);
+            MessageConsumer consumerA = sessionA.createDurableConsumer(topic, 
subscriptionName);
+            consumerA.setMessageListener(new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    receivedA.add(message);
+                }
+            });
+
+            Thread.sleep(1000 * 2);
+
+            // consumerB
+            try {
+                connectionB = factory.createConnection();
+                Session sessionB = connectionB.createSession();
+                sessionB.createDurableConsumer(topic, subscriptionName);
+                assertFalse("Doesn't get the expected " + 
DuplicateSubscriptionException.class.getSimpleName(), true);
+            }
+            catch (DuplicateSubscriptionException e) {
+                assertTrue(true);
+            }
+
+            connectionA.start();
+
+            //producer
+            TextMessage message = sessionA.createTextMessage("a");
+            MessageProducer producer = sessionA.createProducer(topic);
+            for (int i = 0; i < 10; i++) {
+                producer.send(message);
+            }
+
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return receivedA.size() == 10;
+                }
+            }, 5);
+        }
+        finally {
+            connectionA.close();
+            connectionB.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java
index 4b994a3..8518ab5 100644
--- 
a/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.jms.integration.listener;
 
 import org.apache.commons.lang.time.StopWatch;
 import org.apache.rocketmq.jms.integration.AppConfig;
+import org.apache.rocketmq.jms.integration.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.support.TimeLimitAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -29,8 +31,6 @@ import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import static 
org.apache.rocketmq.jms.integration.listener.SimpleTextListener.DESTINATION;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 @RunWith(SpringRunner.class)
 @ContextConfiguration(classes = AppConfig.class)
@@ -50,15 +50,10 @@ public class SimpleTextListenerTest {
         StopWatch watch = new StopWatch();
         watch.start();
 
-        int count = 1;
-        while (simpleTextListener.getReceivedMsg().size() != count) {
-            Thread.sleep(1000);
-            log.info("Waiting for receiving {} messages sent to {} topic,now 
has received {}",
-                count, DESTINATION, 
simpleTextListener.getReceivedMsg().size());
-            if (watch.getTime() > 1000 * 10) {
-                assertFalse(true);
+        TimeLimitAssert.doAssert(new ConditionMatcher() {
+            @Override public boolean match() {
+                return simpleTextListener.getReceivedMsg().size() == 1;
             }
-        }
-        assertTrue(true);
+        }, 60);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java
new file mode 100644
index 0000000..22d7683
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.support;
+
+public interface ConditionMatcher {
+
+    boolean match();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9649e02d/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java
new file mode 100644
index 0000000..6877cbd
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.support;
+
+import org.apache.commons.lang.time.StopWatch;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TimeLimitAssert {
+
+    public static void doAssert(ConditionMatcher conditionMatcher, int 
timeLimit) throws InterruptedException {
+        StopWatch watch = new StopWatch();
+        watch.start();
+
+        while (!conditionMatcher.match()) {
+            Thread.sleep(500);
+            if (watch.getTime() > timeLimit * 1000) {
+                assertFalse(String.format("Doesn't match assert condition in 
%s second", timeLimit), true);
+            }
+        }
+
+        assertTrue(true);
+    }
+}

Reply via email to