Refactor multi-module to single-module

Project: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/db8e0dd1
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/db8e0dd1
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/db8e0dd1

Branch: refs/heads/jms-dev-1.1.0
Commit: db8e0dd18f7916dcf293e27211b5aa09ed9fa132
Parents: 1ac8377
Author: zhangke <[email protected]>
Authored: Wed Mar 1 22:53:45 2017 +0800
Committer: zhangke <[email protected]>
Committed: Wed Mar 1 22:53:45 2017 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 core/pom.xml                                    |  69 ---
 .../java/org/apache/rocketmq/jms/Constant.java  |  82 ----
 .../rocketmq/jms/ConsumeMessageService.java     |  71 ---
 .../org/apache/rocketmq/jms/ConsumeModel.java   |  23 -
 .../rocketmq/jms/DeliverMessageService.java     | 275 -----------
 .../org/apache/rocketmq/jms/MessageWrapper.java |  52 --
 .../apache/rocketmq/jms/RocketMQConnection.java | 234 ---------
 .../rocketmq/jms/RocketMQConnectionFactory.java | 128 -----
 .../jms/RocketMQConnectionMetaData.java         | 116 -----
 .../apache/rocketmq/jms/RocketMQConsumer.java   | 152 ------
 .../apache/rocketmq/jms/RocketMQProducer.java   | 269 ----------
 .../apache/rocketmq/jms/RocketMQSession.java    | 367 --------------
 .../rocketmq/jms/RocketMQTopicSubscriber.java   |  44 --
 .../rocketmq/jms/SendCompletionListener.java    |  44 --
 .../apache/rocketmq/jms/admin/AdminFactory.java |  55 ---
 .../rocketmq/jms/destination/RocketMQQueue.java |  39 --
 .../rocketmq/jms/destination/RocketMQTopic.java |  43 --
 .../DuplicateSubscriptionException.java         |  31 --
 .../jms/exception/JMSClientException.java       |  31 --
 .../jms/exception/MessageExpiredException.java  |  31 --
 .../UnsupportDeliveryModelException.java        |  27 -
 .../rocketmq/jms/hook/ReceiveMessageHook.java   |  44 --
 .../rocketmq/jms/hook/SendMessageHook.java      | 104 ----
 .../rocketmq/jms/msg/AbstractJMSMessage.java    | 400 ---------------
 .../rocketmq/jms/msg/JMSBytesMessage.java       | 491 -------------------
 .../apache/rocketmq/jms/msg/JMSMapMessage.java  | 229 ---------
 .../rocketmq/jms/msg/JMSObjectMessage.java      |  55 ---
 .../apache/rocketmq/jms/msg/JMSTextMessage.java |  67 ---
 .../jms/msg/convert/JMS2RMQMessageConvert.java  |  66 ---
 .../jms/msg/convert/RMQ2JMSMessageConvert.java  | 103 ----
 .../rocketmq/jms/msg/enums/JMSHeaderEnum.java   |  44 --
 .../jms/msg/enums/JMSMessageModelEnum.java      |  53 --
 .../jms/msg/enums/JMSPropertiesEnum.java        |  26 -
 .../jms/msg/serialize/MapSerialize.java         |  43 --
 .../jms/msg/serialize/ObjectSerialize.java      |  69 ---
 .../rocketmq/jms/msg/serialize/Serialize.java   |  27 -
 .../jms/msg/serialize/StringSerialize.java      |  61 ---
 .../apache/rocketmq/jms/support/JMSUtils.java   |  94 ----
 .../rocketmq/jms/support/ObjectTypeCast.java    |  75 ---
 .../rocketmq/jms/support/PrimitiveTypeCast.java | 220 ---------
 .../rocketmq/jms/support/ProviderVersion.java   |  37 --
 core/src/main/resources/logback.xml             |  56 ---
 .../jms/RocketMQConnectionFactoryTest.java      |  36 --
 .../jms/destination/RocketMQQueueTest.java      |  34 --
 .../jms/destination/RocketMQTopicTest.java      |  35 --
 .../jms/hook/ReceiveMessageHookTest.java        |  67 ---
 .../rocketmq/jms/hook/SendMessageHookTest.java  | 102 ----
 .../rocketmq/jms/msg/JMSBytesMessageTest.java   | 106 ----
 .../rocketmq/jms/msg/JMSMapMessageTest.java     |  70 ---
 .../rocketmq/jms/msg/JMSObjectMessageTest.java  |  73 ---
 .../rocketmq/jms/msg/JMSTextMessageTest.java    |  41 --
 .../msg/convert/JMS2RMQMessageConvertTest.java  |  60 ---
 .../msg/convert/RMQ2JMSMessageConvertTest.java  |  65 ---
 .../jms/msg/enums/JMSMessageModelEnumTest.java  |  31 --
 .../jms/msg/serialize/MapSerializeTest.java     |  42 --
 .../jms/msg/serialize/ObjectSerializeTest.java  |  63 ---
 .../jms/msg/serialize/StringSerializeTest.java  |  36 --
 .../rocketmq/jms/support/JMSUtilsTest.java      |  62 ---
 .../jms/support/ObjectTypeCastTest.java         |  52 --
 .../jms/support/PrimitiveTypeCastTest.java      | 210 --------
 pom.xml                                         | 130 ++---
 .../java/org/apache/rocketmq/jms/Constant.java  |  82 ++++
 .../rocketmq/jms/ConsumeMessageService.java     |  71 +++
 .../org/apache/rocketmq/jms/ConsumeModel.java   |  23 +
 .../rocketmq/jms/DeliverMessageService.java     | 275 +++++++++++
 .../org/apache/rocketmq/jms/MessageWrapper.java |  52 ++
 .../apache/rocketmq/jms/RocketMQConnection.java | 233 +++++++++
 .../rocketmq/jms/RocketMQConnectionFactory.java | 128 +++++
 .../jms/RocketMQConnectionMetaData.java         | 116 +++++
 .../apache/rocketmq/jms/RocketMQConsumer.java   | 152 ++++++
 .../apache/rocketmq/jms/RocketMQProducer.java   | 269 ++++++++++
 .../apache/rocketmq/jms/RocketMQSession.java    | 367 ++++++++++++++
 .../rocketmq/jms/RocketMQTopicSubscriber.java   |  44 ++
 .../rocketmq/jms/SendCompletionListener.java    |  43 ++
 .../apache/rocketmq/jms/admin/AdminFactory.java |  55 +++
 .../rocketmq/jms/destination/RocketMQQueue.java |  39 ++
 .../rocketmq/jms/destination/RocketMQTopic.java |  43 ++
 .../DuplicateSubscriptionException.java         |  31 ++
 .../jms/exception/JMSClientException.java       |  31 ++
 .../jms/exception/MessageExpiredException.java  |  31 ++
 .../UnsupportDeliveryModelException.java        |  27 +
 .../rocketmq/jms/hook/ReceiveMessageHook.java   |  44 ++
 .../rocketmq/jms/hook/SendMessageHook.java      | 104 ++++
 .../rocketmq/jms/msg/AbstractJMSMessage.java    | 400 +++++++++++++++
 .../rocketmq/jms/msg/JMSBytesMessage.java       | 491 +++++++++++++++++++
 .../apache/rocketmq/jms/msg/JMSMapMessage.java  | 229 +++++++++
 .../rocketmq/jms/msg/JMSObjectMessage.java      |  55 +++
 .../apache/rocketmq/jms/msg/JMSTextMessage.java |  67 +++
 .../jms/msg/convert/JMS2RMQMessageConvert.java  |  66 +++
 .../jms/msg/convert/RMQ2JMSMessageConvert.java  | 103 ++++
 .../rocketmq/jms/msg/enums/JMSHeaderEnum.java   |  44 ++
 .../jms/msg/enums/JMSMessageModelEnum.java      |  53 ++
 .../jms/msg/enums/JMSPropertiesEnum.java        |  26 +
 .../jms/msg/serialize/MapSerialize.java         |  43 ++
 .../jms/msg/serialize/ObjectSerialize.java      |  69 +++
 .../rocketmq/jms/msg/serialize/Serialize.java   |  27 +
 .../jms/msg/serialize/StringSerialize.java      |  61 +++
 .../apache/rocketmq/jms/support/JMSUtils.java   |  94 ++++
 .../rocketmq/jms/support/ObjectTypeCast.java    |  75 +++
 .../rocketmq/jms/support/PrimitiveTypeCast.java | 220 +++++++++
 .../rocketmq/jms/support/ProviderVersion.java   |  37 ++
 src/main/resources/logback.xml                  |  56 +++
 .../jms/RocketMQConnectionFactoryTest.java      |  36 ++
 .../jms/destination/RocketMQQueueTest.java      |  34 ++
 .../jms/destination/RocketMQTopicTest.java      |  35 ++
 .../jms/hook/ReceiveMessageHookTest.java        |  67 +++
 .../rocketmq/jms/hook/SendMessageHookTest.java  | 102 ++++
 .../jms/integration/source/AppConfig.java       |  59 +++
 .../jms/integration/source/Constant.java        |  39 ++
 .../jms/integration/source/RocketMQAdmin.java   |  93 ++++
 .../jms/integration/source/RocketMQServer.java  | 162 ++++++
 .../integration/source/SimpleTextListener.java  |  60 +++
 .../source/support/ConditionMatcher.java        |  23 +
 .../source/support/TimeLimitAssert.java         |  40 ++
 .../test/ConsumeAsynchronousTest.java           |  90 ++++
 .../test/ConsumeSynchronousTest.java            |  82 ++++
 .../integration/test/NonDurableConsumeTest.java | 132 +++++
 .../test/SharedDurableConsumeTest.java          | 120 +++++
 .../test/UnsharedDurableConsumeTest.java        | 127 +++++
 .../test/listener/SimpleTextListenerTest.java   |  60 +++
 .../rocketmq/jms/msg/JMSBytesMessageTest.java   | 106 ++++
 .../rocketmq/jms/msg/JMSMapMessageTest.java     |  70 +++
 .../rocketmq/jms/msg/JMSObjectMessageTest.java  |  73 +++
 .../rocketmq/jms/msg/JMSTextMessageTest.java    |  41 ++
 .../msg/convert/JMS2RMQMessageConvertTest.java  |  60 +++
 .../msg/convert/RMQ2JMSMessageConvertTest.java  |  65 +++
 .../jms/msg/enums/JMSMessageModelEnumTest.java  |  31 ++
 .../jms/msg/serialize/MapSerializeTest.java     |  42 ++
 .../jms/msg/serialize/ObjectSerializeTest.java  |  63 +++
 .../jms/msg/serialize/StringSerializeTest.java  |  36 ++
 .../rocketmq/jms/support/JMSUtilsTest.java      |  62 +++
 .../jms/support/ObjectTypeCastTest.java         |  52 ++
 .../jms/support/PrimitiveTypeCastTest.java      | 210 ++++++++
 test/pom.xml                                    |  70 ---
 .../rocketmq/jms/integration/AppConfig.java     |  59 ---
 .../rocketmq/jms/integration/Constant.java      |  39 --
 .../rocketmq/jms/integration/RocketMQAdmin.java |  93 ----
 .../jms/integration/RocketMQServer.java         | 162 ------
 .../listener/SimpleTextListener.java            |  61 ---
 test/src/main/resources/logback.xml             |  56 ---
 .../integration/ConsumeAsynchronousTest.java    |  90 ----
 .../jms/integration/ConsumeSynchronousTest.java |  79 ---
 .../jms/integration/NonDurableConsumeTest.java  | 129 -----
 .../integration/SharedDurableConsumeTest.java   | 120 -----
 .../integration/UnsharedDurableConsumeTest.java | 124 -----
 .../listener/SimpleTextListenerTest.java        |  59 ---
 .../integration/support/ConditionMatcher.java   |  23 -
 .../integration/support/TimeLimitAssert.java    |  40 --
 149 files changed, 6819 insertions(+), 6997 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index ac9e6b6..33841d0 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -35,7 +35,7 @@ before_install:
 
 
 script:
-  - travis_retry mvn -B clean install coveralls:report
+  - travis_retry mvn -B clean install jacoco:report
 
 #after_success:
 #  - mvn clean install

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
deleted file mode 100644
index c01d2e0..0000000
--- a/core/pom.xml
+++ /dev/null
@@ -1,69 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>rocketmq-jms-all</artifactId>
-        <groupId>org.apache.rocketmq</groupId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>rocketmq-jms-core</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-tools</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>javax.jms</groupId>
-            <artifactId>javax.jms-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-all</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <finalName>rocketmq-jms-${project.version}</finalName>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/Constant.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/Constant.java 
b/core/src/main/java/org/apache/rocketmq/jms/Constant.java
deleted file mode 100644
index 9519bea..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/Constant.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-public interface Constant {
-
-    String NO_MESSAGE_SELECTOR = "*";
-
-    boolean DEFAULT_NO_LOCAL = true;
-
-    boolean DEFAULT_DURABLE = false;
-
-    //-------------------------JMS defined properties 
constant----------------------------
-    /**
-     * The identity of the user sending the Send message
-     */
-    String JMS_XUSER_ID = "jmsXUserID";
-    /**
-     * The identity of the application Send sending the message
-     */
-    String JMS_XAPP_ID = "jmsXAppID";
-    /**
-     * The number of message delivery Receive attempts
-     */
-    String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount";
-    /**
-     * The identity of the message group this message is part of
-     */
-    String JMS_XGROUP_ID = "jmsXGroupID";
-    /**
-     * The sequence number of this message within the group; the first message 
is 1, the second 2,...
-     */
-    String JMS_XGROUP_SEQ = "jmsXGroupSeq";
-    /**
-     * The transaction identifier of the Send transaction within which this 
message was produced
-     */
-    String JMS_XPRODUCER_TXID = "jmsXProducerTXID";
-    /**
-     * The transaction identifier of the Receive transaction within which this 
message was consumed
-     */
-    String JMS_XCONSUMER_TXID = "jmsXConsumerTXID";
-
-    /**
-     * The time JMS delivered the Receive message to the consumer
-     */
-    String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp";
-    /**
-     * Assume there exists a message warehouse that contains a separate copy 
of each message sent to each consumer and
-     * that these copies exist from the time the original message was sent. 
Each copy’s state is one of: 1(waiting),
-     * 2(ready), 3(expired) or 4(retained) Since state is of no interest to 
producers and consumers it is not provided
-     * to either. It is only of relevance to messages looked up in a warehouse 
and JMS provides no API for this.
-     */
-    String JMS_XSTATE = "jmsXState";
-
-    //---------------------------JMS Headers' value 
constant---------------------------
-    /**
-     * Default time to live
-     */
-    long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000;
-
-    /**
-     * Default Jms Type
-     */
-    String DEFAULT_JMS_TYPE = "rocketmq";
-
-    String MESSAGE_ID_PREFIX = "ID:";
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java 
b/core/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java
deleted file mode 100644
index 79efa6a..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import org.apache.rocketmq.common.ServiceThread;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.JMSRuntimeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumeMessageService extends ServiceThread {
-
-    private static final Logger log = 
LoggerFactory.getLogger(DeliverMessageService.class);
-    private static final AtomicLong COUNTER = new AtomicLong(0L);
-
-    private BlockingQueue<MessageWrapper> queue = new ArrayBlockingQueue(1000);
-    private RocketMQSession session;
-    private final long index = COUNTER.incrementAndGet();
-
-    public ConsumeMessageService(RocketMQSession session) {
-        this.session = session;
-    }
-
-    @Override public String getServiceName() {
-        return ConsumeMessageService.class.getSimpleName() + "-" + this.index;
-    }
-
-    @Override public void run() {
-        while (!this.isStopped()) {
-            try {
-                MessageWrapper wrapper = queue.take();
-                RocketMQConsumer consumer = wrapper.getConsumer();
-                consumer.getMessageListener().onMessage(wrapper.getMessage());
-                consumer.getDeliverMessageService().ack(wrapper.getMq(), 
wrapper.getOffset());
-            }
-            catch (Exception e) {
-                log.error(e.getMessage(), e);
-            }
-        }
-    }
-
-    public void put(MessageWrapper wrapper) {
-        try {
-            this.queue.put(wrapper);
-        }
-        catch (InterruptedException e) {
-            throw new JMSRuntimeException(e.getMessage());
-        }
-    }
-
-    public RocketMQSession getSession() {
-        return session;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java 
b/core/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java
deleted file mode 100644
index 356cbd7..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-public enum ConsumeModel {
-    SYNC,
-    ASYNC
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java 
b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
deleted file mode 100644
index 1043f5d..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.JMSRuntimeException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.jms.exception.MessageExpiredException;
-import org.apache.rocketmq.jms.hook.ReceiveMessageHook;
-import org.apache.rocketmq.jms.msg.convert.RMQ2JMSMessageConvert;
-import org.apache.rocketmq.jms.support.JMSUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.String.format;
-
-/**
- * Service deliver messages synchronous or asynchronous.
- */
-public class DeliverMessageService extends ServiceThread {
-
-    private static final Logger log = 
LoggerFactory.getLogger(DeliverMessageService.class);
-    private static final AtomicLong COUNTER = new AtomicLong(0L);
-    private static final int PULL_BATCH_SIZE = 100;
-
-    private RocketMQConsumer consumer;
-    private DefaultMQPullConsumer rocketMQPullConsumer;
-    private Destination destination;
-    private String consumerGroup;
-    private String topicName;
-    private ConsumeModel consumeModel = ConsumeModel.SYNC;
-
-    /** only support RMQ subExpression */
-    private String messageSelector;
-    private ReceiveMessageHook hook = new ReceiveMessageHook();
-
-    /**
-     * If durable is true, consume message from the offset consumed last time.
-     * Otherwise consume from the max offset
-     */
-    private boolean durable = false;
-    private boolean shared = false;
-
-    private BlockingQueue<MessageWrapper> msgQueue = new 
ArrayBlockingQueue(PULL_BATCH_SIZE);
-    private volatile boolean pause = true;
-    private final long index = COUNTER.incrementAndGet();
-
-    private Map<MessageQueue, Long> offsetMap = new HashMap();
-
-    public DeliverMessageService(RocketMQConsumer consumer, Destination 
destination, String consumerGroup,
-        String messageSelector, boolean durable, boolean shared) {
-        this.consumer = consumer;
-        this.destination = destination;
-        this.consumerGroup = consumerGroup;
-        this.messageSelector = messageSelector;
-        this.durable = durable;
-        this.shared = shared;
-
-        this.topicName = JMSUtils.getDestinationName(destination);
-
-        createAndStartRocketMQPullConsumer();
-
-        if (this.consumer.getSession().getConnection().isStarted()) {
-            this.recover();
-        }
-        else {
-            this.pause();
-        }
-    }
-
-    private void createAndStartRocketMQPullConsumer() {
-        final ClientConfig clientConfig = 
this.consumer.getSession().getConnection().getClientConfig();
-        this.rocketMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
-        
this.rocketMQPullConsumer.setNamesrvAddr(clientConfig.getNamesrvAddr());
-        
this.rocketMQPullConsumer.setInstanceName(clientConfig.getInstanceName());
-        this.rocketMQPullConsumer.setRegisterTopics(new 
HashSet(Arrays.asList(this.topicName)));
-
-        try {
-            this.rocketMQPullConsumer.start();
-        }
-        catch (MQClientException e) {
-            throw new JMSRuntimeException(format("Fail to start RocketMQ pull 
consumer, error msg:%s", ExceptionUtils.getStackTrace(e)));
-        }
-    }
-
-    @Override
-    public String getServiceName() {
-        return DeliverMessageService.class.getSimpleName() + "-" + this.index;
-    }
-
-    @Override
-    public void run() {
-        while (!isStopped()) {
-            if (pause) {
-                this.waitForRunning(1000);
-                continue;
-            }
-
-            try {
-                pullMessage();
-            }
-            catch (InterruptedException e) {
-                log.debug("Pulling messages service has been interrupted");
-            }
-            catch (Exception e) {
-                log.error("Error during pulling messages", e);
-            }
-        }
-    }
-
-    private void pullMessage() throws Exception {
-        Set<MessageQueue> mqs = getMessageQueues();
-
-        for (MessageQueue mq : mqs) {
-            Long offset = offsetMap.get(mq);
-            if (offset == null) {
-                offset = beginOffset(mq);
-            }
-            PullResult pullResult = 
this.rocketMQPullConsumer.pullBlockIfNotFound(mq, this.messageSelector, offset, 
PULL_BATCH_SIZE);
-
-            switch (pullResult.getPullStatus()) {
-                case FOUND:
-                    List<MessageExt> msgs = pullResult.getMsgFoundList();
-                    offsetMap.put(mq, pullResult.getMaxOffset());
-                    for (MessageExt msg : msgs) {
-                        handleMessage(msg, mq);
-                    }
-                    log.debug("Pull {} messages from 
topic:{},broker:{},queueId:{}", msgs.size(), mq.getTopic(), mq.getBrokerName(), 
mq.getQueueId());
-                    break;
-                case NO_NEW_MSG:
-                case NO_MATCHED_MSG:
-                    break;
-                case OFFSET_ILLEGAL:
-                    throw new JMSException("Error during pull 
message[reason:OFFSET_ILLEGAL]");
-            }
-        }
-    }
-
-    private Set<MessageQueue> getMessageQueues() throws MQClientException {
-        Set<MessageQueue> mqs = 
this.rocketMQPullConsumer.fetchSubscribeMessageQueues(this.topicName);
-        return mqs;
-    }
-
-    /**
-     * Refer to {@link #durable}.
-     *
-     * @param mq message queue
-     * @return offset
-     * @throws MQClientException
-     */
-    private Long beginOffset(MessageQueue mq) throws MQClientException {
-        return this.durable ? this.rocketMQPullConsumer.fetchConsumeOffset(mq, 
false) : this.rocketMQPullConsumer.maxOffset(mq);
-    }
-
-    /**
-     * If {@link #consumeModel} is {@link ConsumeModel#ASYNC}, messages pulled 
from broker
-     * are handled in {@link ConsumeMessageService} owned by its session.
-     *
-     * If {@link #consumeModel} is {@link ConsumeModel#SYNC}, messages pulled 
from broker are put
-     * into a memory blocking queue, waiting for the {@link 
MessageConsumer#receive()}
-     * using {@link BlockingQueue#poll()} to handle messages synchronous.
-     *
-     * @param msg to handle message
-     * @throws InterruptedException
-     * @throws JMSException
-     */
-    private void handleMessage(MessageExt msg, MessageQueue mq) throws 
InterruptedException, JMSException {
-        Message jmsMessage = RMQ2JMSMessageConvert.convert(msg);
-
-        try {
-            hook.before(jmsMessage);
-        }
-        catch (MessageExpiredException e) {
-            log.debug(e.getMessage());
-        }
-
-        final MessageWrapper wrapper = new MessageWrapper(jmsMessage, 
this.consumer, mq, msg.getQueueOffset());
-
-        switch (this.consumeModel) {
-            case SYNC:
-                this.msgQueue.put(wrapper);
-                break;
-            case ASYNC:
-                
this.consumer.getSession().getConsumeMessageService().put(wrapper);
-                break;
-            default:
-                throw new JMSException(format("Unsupported consume model[%s]", 
this.consumeModel));
-        }
-    }
-
-    public void ack(MessageQueue mq, Long offset) throws JMSException {
-        try {
-            this.rocketMQPullConsumer.updateConsumeOffset(mq, offset);
-        }
-        catch (MQClientException e) {
-            throw new JMSException(format("Fail to ack 
offset[mq:%s,offset:%s]", mq, offset));
-        }
-    }
-
-    public MessageWrapper poll() throws JMSException {
-        try {
-            return this.msgQueue.take();
-        }
-        catch (InterruptedException e) {
-            throw new JMSException(e.getMessage());
-        }
-    }
-
-    public MessageWrapper poll(long timeout, TimeUnit timeUnit) throws 
JMSException {
-        try {
-            return this.msgQueue.poll(timeout, timeUnit);
-        }
-        catch (InterruptedException e) {
-            throw new JMSException(e.getMessage());
-        }
-    }
-
-    public void pause() {
-        this.pause = true;
-    }
-
-    public void recover() {
-        this.pause = false;
-    }
-
-    public void close() {
-
-        this.stop();
-
-        this.rocketMQPullConsumer.shutdown();
-
-        this.shutdown(true);
-
-        log.debug("Success to close message delivery service:{}", 
getServiceName());
-    }
-
-    public void setConsumeModel(ConsumeModel consumeModel) {
-        this.consumeModel = consumeModel;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java 
b/core/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java
deleted file mode 100644
index 1bca541..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import org.apache.rocketmq.common.message.MessageQueue;
-import javax.jms.Message;
-
-public class MessageWrapper {
-
-    private Message message;
-    private RocketMQConsumer consumer;
-    private MessageQueue mq;
-    private long offset;
-
-    public MessageWrapper(Message message, RocketMQConsumer consumer, 
MessageQueue mq, long offset) {
-        this.message = message;
-        this.consumer = consumer;
-        this.mq = mq;
-        this.offset = offset;
-    }
-
-    public Message getMessage() {
-        return message;
-    }
-
-    public RocketMQConsumer getConsumer() {
-        return consumer;
-    }
-
-    public MessageQueue getMq() {
-        return mq;
-    }
-
-    public long getOffset() {
-        return offset;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
deleted file mode 100644
index 727ebca..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.JMSRuntimeException;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.String.format;
-import static javax.jms.Session.AUTO_ACKNOWLEDGE;
-import static javax.jms.Session.SESSION_TRANSACTED;
-import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace;
-
-public class RocketMQConnection implements Connection {
-
-    private static final Logger log = 
LoggerFactory.getLogger(RocketMQConnection.class);
-
-    private String clientID;
-    private ClientConfig clientConfig;
-    private MQClientInstance clientInstance;
-    private String userName;
-    private String password;
-
-    private List<RocketMQSession> sessionList = new ArrayList();
-    private AtomicBoolean started = new AtomicBoolean(false);
-
-    public RocketMQConnection(String nameServerAddress, String clientID, 
String instanceName, String userName,
-        String password) {
-        this.clientID = clientID;
-        this.userName = userName;
-        this.password = password;
-
-        this.clientConfig = new ClientConfig();
-        this.clientConfig.setNamesrvAddr(nameServerAddress);
-        this.clientConfig.setInstanceName(instanceName);
-
-        startClientInstance();
-    }
-
-    private void startClientInstance() {
-        try {
-            // create a tcp connection to broker and some other background 
thread
-            this.clientInstance = 
MQClientManager.getInstance().getAndCreateMQClientInstance(this.clientConfig);
-            clientInstance.start();
-        }
-        catch (MQClientException e) {
-            throw new JMSRuntimeException(format("Fail to startClientInstance 
connection object[namesrvAddr:%s,instanceName:%s]. Error message:%s",
-                this.clientConfig.getNamesrvAddr(), 
this.clientConfig.getInstanceName(), getStackTrace(e)));
-        }
-    }
-
-    @Override
-    public Session createSession() throws JMSException {
-        return createSession(false, AUTO_ACKNOWLEDGE);
-    }
-
-    @Override
-    public Session createSession(int sessionMode) throws JMSException {
-        if (sessionMode == SESSION_TRANSACTED) {
-            return createSession(true, Session.AUTO_ACKNOWLEDGE);
-        }
-        else {
-            return createSession(false, sessionMode);
-        }
-    }
-
-    @Override
-    public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
-        //todo: support transacted and more acknowledge mode
-        if (transacted) {
-            throw new JMSException("Not support local transaction session");
-        }
-        if (acknowledgeMode != AUTO_ACKNOWLEDGE) {
-            throw new JMSException("Only support AUTO_ACKNOWLEDGE mode now");
-        }
-
-        RocketMQSession session = new RocketMQSession(this, acknowledgeMode, 
transacted);
-        this.sessionList.add(session);
-
-        return session;
-    }
-
-    @Override
-    public ConnectionConsumer createConnectionConsumer(Destination destination,
-        String messageSelector,
-        ServerSessionPool sessionPool,
-        int maxMessages) throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 
String subscriptionName,
-        String messageSelector,
-        ServerSessionPool sessionPool,
-        int maxMessages) throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public ConnectionConsumer createSharedConnectionConsumer(Topic topic, 
String subscriptionName,
-        String messageSelector, ServerSessionPool sessionPool, int 
maxMessages) throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public ConnectionConsumer createSharedDurableConnectionConsumer(Topic 
topic, String subscriptionName,
-        String messageSelector, ServerSessionPool sessionPool, int 
maxMessages) throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public String getClientID() throws JMSException {
-        return this.clientID;
-    }
-
-    @Override
-    public void setClientID(String clientID) throws JMSException {
-        if (isNotBlank(this.clientID)) {
-            throw new IllegalStateException("administratively client 
identifier has been configured.");
-        }
-        this.clientID = clientID;
-    }
-
-    @Override
-    public ConnectionMetaData getMetaData() throws JMSException {
-        return RocketMQConnectionMetaData.instance();
-    }
-
-    @Override
-    public ExceptionListener getExceptionListener() throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public void setExceptionListener(ExceptionListener listener) throws 
JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public void start() throws JMSException {
-        if (this.started.compareAndSet(false, true)) {
-            for (RocketMQSession session : sessionList) {
-                for (RocketMQConsumer consumer : session.getConsumerList()) {
-                    consumer.getDeliverMessageService().recover();
-                }
-            }
-            log.debug("Start connection successfully:{}", toString());
-        }
-    }
-
-    @Override
-    public void stop() throws JMSException {
-        if (this.started.compareAndSet(true, false)) {
-            for (RocketMQSession session : sessionList) {
-                for (RocketMQConsumer consumer : session.getConsumerList()) {
-                    consumer.getDeliverMessageService().pause();
-                }
-            }
-            log.debug("Stop connection successfully:{}", toString());
-        }
-    }
-
-    @Override
-    public void close() throws JMSException {
-
-        for (RocketMQSession session : sessionList) {
-            session.close();
-        }
-
-        this.clientInstance.shutdown();
-
-        log.info("Success to close connection:{}", toString());
-    }
-
-    public boolean isStarted() {
-        return started.get();
-    }
-
-    public ClientConfig getClientConfig() {
-        return clientConfig;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    @Override public String toString() {
-        return new ToStringBuilder(this)
-            .append("nameServerAddress", this.clientConfig.getNamesrvAddr())
-            .append("instanceName", this.clientConfig.getInstanceName())
-            .append("clientIdentifier", this.clientID)
-            .toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java
deleted file mode 100644
index c81e8b5..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSContext;
-import javax.jms.JMSException;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.jms.support.JMSUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implement of {@link ConnectionFactory} using RocketMQ client.
- *
- * <P>In RocketMQ, all producers and consumers interactive with broker
- * by an {@link MQClientInstance} object, which encapsulates tcp connection,
- * schedule task and so on. The best way to control the behavior of 
producers/consumers
- * derived from a connection is to manipulate the {@link MQClientInstance} 
directly.
- *
- * <P>However, this object is not easy to access as it is maintained within 
RocketMQ Client.
- * Fortunately another equivalent identifier called "instanceName" is provided.
- * The "instanceName" is a one-to-one conception with {@link MQClientInstance} 
object.
- * Just like there is a hash map,"instanceName" is the key and a {@link 
MQClientInstance}
- * object is the value. So the essential keyword passed through all objects 
created by a
- * connection is "instanceName".
- */
-public class RocketMQConnectionFactory implements ConnectionFactory {
-
-    private static final Logger log = 
LoggerFactory.getLogger(RocketMQConnectionFactory.class);
-
-    private String nameServerAddress;
-
-    private String clientId;
-
-    public RocketMQConnectionFactory(String nameServerAddress) {
-        this.nameServerAddress = nameServerAddress;
-        this.clientId = JMSUtils.uuid();
-    }
-
-    public RocketMQConnectionFactory(String nameServerAddress, String 
clientId) {
-        this.nameServerAddress = nameServerAddress;
-        this.clientId = clientId;
-    }
-
-    @Override
-    public Connection createConnection() throws JMSException {
-        return createConnection(null, null);
-    }
-
-    /**
-     * Using userName and Password to register a connection. Now access RMQ 
broker
-     * is anonymous and any userName/password is legal.
-     *
-     * @param userName ignored
-     * @param password ignored
-     * @return the new JMS Connection
-     * @throws JMSException
-     */
-    @Override
-    public Connection createConnection(String userName, String password) 
throws JMSException {
-        return createRocketMQConnection(userName, password);
-    }
-
-    private Connection createRocketMQConnection(String userName, String 
password) throws JMSException {
-        final String instanceName = JMSUtils.uuid();
-        RocketMQConnection connection = new 
RocketMQConnection(this.nameServerAddress, this.clientId, instanceName, 
userName, password);
-
-        log.info("Create a connection 
successfully[instanceName:{},clientIdentifier:{},userName:{}", instanceName, 
clientId, userName);
-        return connection;
-    }
-
-    @Override
-    public JMSContext createContext() {
-        //todo:
-        return null;
-    }
-
-    @Override
-    public JMSContext createContext(String userName, String password) {
-        //todo:
-        return null;
-    }
-
-    @Override
-    public JMSContext createContext(String userName, String password, int 
sessionMode) {
-        //todo:
-        return null;
-    }
-
-    @Override
-    public JMSContext createContext(int sessionMode) {
-        //todo:
-        return null;
-    }
-
-    public String getClientId() {
-        return clientId;
-    }
-
-    public void setClientId(String clientId) {
-        this.clientId = clientId;
-    }
-
-    public String getNameServerAddress() {
-        return nameServerAddress;
-    }
-
-    public void setNameServerAddress(String nameServerAddress) {
-        this.nameServerAddress = nameServerAddress;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java
deleted file mode 100644
index e4353e1..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import java.util.Enumeration;
-import java.util.Vector;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.jms.ConnectionMetaData;
-import javax.jms.JMSException;
-import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum;
-import org.apache.rocketmq.jms.support.ProviderVersion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RocketMQConnectionMetaData implements ConnectionMetaData {
-
-    private static final Logger log = 
LoggerFactory.getLogger(RocketMQConnectionMetaData.class);
-    private static final String PROVIDER_NAME = "Apache RocketMQ";
-
-    private String jmsVersion;
-    private int jmsMajorVersion;
-    private int jmsMinorVersion;
-
-    private String providerVersion;
-    private int providerMajorVersion;
-    private int providerMinorVersion;
-
-    private static RocketMQConnectionMetaData metaData = new 
RocketMQConnectionMetaData();
-
-    private RocketMQConnectionMetaData() {
-        Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
-
-        String jmsVersion = null;
-        int jmsMajor = 0;
-        int jmsMinor = 0;
-        try {
-            Package p = Package.getPackage("javax.jms");
-            if (p != null) {
-                jmsVersion = p.getImplementationVersion();
-                Matcher m = pattern.matcher(jmsVersion);
-                if (m.matches()) {
-                    jmsMajor = Integer.parseInt(m.group(1));
-                    jmsMinor = Integer.parseInt(m.group(2));
-                }
-            }
-        }
-        catch (Throwable e) {
-            log.error("Error during getting jms version", e);
-        }
-
-        this.jmsVersion = jmsVersion;
-        this.jmsMajorVersion = jmsMajor;
-        this.jmsMinorVersion = jmsMinor;
-
-        this.providerVersion = ProviderVersion.CURRENT_VERSION.name();
-        this.providerMinorVersion = ProviderVersion.CURRENT_VERSION.getValue();
-        this.providerMajorVersion = ProviderVersion.CURRENT_VERSION.getValue();
-    }
-
-    public static RocketMQConnectionMetaData instance() {
-        return metaData;
-    }
-
-    public String getJMSVersion() throws JMSException {
-        return jmsVersion;
-    }
-
-    public int getJMSMajorVersion() throws JMSException {
-        return jmsMajorVersion;
-    }
-
-    public int getJMSMinorVersion() throws JMSException {
-        return jmsMinorVersion;
-    }
-
-    public String getJMSProviderName() throws JMSException {
-        return PROVIDER_NAME;
-    }
-
-    public String getProviderVersion() throws JMSException {
-        return providerVersion;
-    }
-
-    public int getProviderMajorVersion() throws JMSException {
-        return providerMajorVersion;
-    }
-
-    public int getProviderMinorVersion() throws JMSException {
-        return providerMinorVersion;
-    }
-
-    public Enumeration<?> getJMSXPropertyNames() throws JMSException {
-        Vector<String> result = new Vector<String>();
-        for (JMSPropertiesEnum e : JMSPropertiesEnum.values()) {
-            result.add(e.name());
-        }
-        return result.elements();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
deleted file mode 100644
index af147e0..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import org.apache.rocketmq.jms.support.JMSUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RocketMQConsumer implements MessageConsumer {
-
-    private static final Logger log = 
LoggerFactory.getLogger(RocketMQConsumer.class);
-    private RocketMQSession session;
-    private Destination destination;
-    private String messageSelector;
-    private MessageListener messageListener;
-    private String subscriptionName;
-    private boolean durable;
-    private boolean shared;
-
-    private DeliverMessageService deliverMessageService;
-
-    public RocketMQConsumer(RocketMQSession session, Destination destination,
-        String messageSelector,
-        boolean durable, boolean shared) {
-        this(session, destination, messageSelector, 
UUID.randomUUID().toString(), durable, shared);
-    }
-
-    public RocketMQConsumer(RocketMQSession session, Destination destination,
-        String messageSelector,
-        String subscriptionName, boolean durable, boolean shared) {
-        this.session = session;
-        this.destination = destination;
-        this.messageSelector = messageSelector;
-        this.subscriptionName = subscriptionName;
-        this.durable = durable;
-        this.shared = shared;
-
-        String consumerGroup = JMSUtils.getConsumerGroup(this);
-        this.deliverMessageService = new DeliverMessageService(this, 
this.destination, consumerGroup,
-            this.messageSelector, this.durable, this.shared);
-        this.deliverMessageService.start();
-    }
-
-    @Override
-    public String getMessageSelector() throws JMSException {
-        return messageSelector;
-    }
-
-    @Override
-    public MessageListener getMessageListener() throws JMSException {
-        return this.messageListener;
-    }
-
-    @Override
-    public void setMessageListener(MessageListener listener) throws 
JMSException {
-        if (this.session.isSyncModel()) {
-            throw new JMSException("A asynchronously call is not permitted 
when a session is being used synchronously");
-        }
-
-        this.messageListener = listener;
-        this.deliverMessageService.setConsumeModel(ConsumeModel.ASYNC);
-        this.session.addAsyncConsumer(this);
-    }
-
-    @Override
-    public Message receive() throws JMSException {
-        return this.receive(0);
-    }
-
-    @Override
-    public Message receive(long timeout) throws JMSException {
-        if (this.session.isAsyncModel()) {
-            throw new JMSException("A synchronous call is not permitted when a 
session is being used asynchronously.");
-        }
-
-        this.session.addSyncConsumer(this);
-
-        if (timeout == 0) {
-            MessageWrapper wrapper = this.deliverMessageService.poll();
-            
wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), 
wrapper.getOffset());
-            return wrapper.getMessage();
-        }
-        else {
-            MessageWrapper wrapper = this.deliverMessageService.poll(timeout, 
TimeUnit.MILLISECONDS);
-            if (wrapper == null) {
-                return null;
-            }
-            
wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), 
wrapper.getOffset());
-            return wrapper.getMessage();
-        }
-    }
-
-    @Override
-    public Message receiveNoWait() throws JMSException {
-        return receive(1);
-    }
-
-    @Override
-    public void close() throws JMSException {
-        this.deliverMessageService.close();
-    }
-
-    public void start() {
-        this.deliverMessageService.recover();
-    }
-
-    public void stop() {
-        this.deliverMessageService.pause();
-    }
-
-    public DeliverMessageService getDeliverMessageService() {
-        return deliverMessageService;
-    }
-
-    public RocketMQSession getSession() {
-        return session;
-    }
-
-    public String getSubscriptionName() {
-        return subscriptionName;
-    }
-
-    public boolean isDurable() {
-        return durable;
-    }
-
-    public boolean isShared() {
-        return shared;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
deleted file mode 100644
index ab1eb69..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import java.util.UUID;
-import javax.jms.CompletionListener;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.JMSRuntimeException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException;
-import org.apache.rocketmq.jms.hook.SendMessageHook;
-import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
-import org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.String.format;
-import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace;
-import static 
org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE;
-import static 
org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE;
-import static 
org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE;
-import static 
org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE;
-import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Object;
-
-public class RocketMQProducer implements MessageProducer {
-
-    private static final Logger log = 
LoggerFactory.getLogger(RocketMQProducer.class);
-    private RocketMQSession session;
-    private DefaultMQProducer rocketMQProducer;
-    private Destination destination;
-
-    private boolean disableMessageID;
-    private boolean disableMessageTimestamp;
-    private long timeToLive = JMS_TIME_TO_LIVE_DEFAULT_VALUE;
-    private int deliveryMode = JMS_DELIVERY_MODE_DEFAULT_VALUE;
-    private int priority = JMS_PRIORITY_DEFAULT_VALUE;
-    private long deliveryDelay = JMS_DELIVERY_TIME_DEFAULT_VALUE;
-
-    private SendMessageHook sendMessageHook;
-
-    public RocketMQProducer() {
-    }
-
-    public RocketMQProducer(RocketMQSession session, Destination destination) {
-        this.session = session;
-        this.destination = destination;
-
-        this.rocketMQProducer = new 
DefaultMQProducer(UUID.randomUUID().toString());
-        ClientConfig clientConfig = 
this.session.getConnection().getClientConfig();
-        this.rocketMQProducer.setNamesrvAddr(clientConfig.getNamesrvAddr());
-        this.rocketMQProducer.setInstanceName(clientConfig.getInstanceName());
-        try {
-            this.rocketMQProducer.start();
-        }
-        catch (MQClientException e) {
-            throw new JMSRuntimeException(format("Fail to start producer, 
error msg:%s", getStackTrace(e)));
-        }
-
-        this.sendMessageHook = new SendMessageHook(this);
-    }
-
-    @Override
-    public void setDisableMessageID(boolean value) throws JMSException {
-        this.disableMessageID = value;
-    }
-
-    @Override
-    public boolean getDisableMessageID() throws JMSException {
-        return this.disableMessageID;
-    }
-
-    @Override
-    public void setDisableMessageTimestamp(boolean value) throws JMSException {
-        this.disableMessageTimestamp = value;
-    }
-
-    @Override
-    public boolean getDisableMessageTimestamp() throws JMSException {
-        return this.disableMessageTimestamp;
-    }
-
-    @Override
-    public void setDeliveryMode(int deliveryMode) throws JMSException {
-        throw new UnsupportDeliveryModelException();
-    }
-
-    @Override
-    public int getDeliveryMode() throws JMSException {
-        return this.deliveryMode;
-    }
-
-    @Override
-    public void setPriority(int priority) throws JMSException {
-        this.priority = priority;
-    }
-
-    @Override
-    public int getPriority() throws JMSException {
-        return this.priority;
-    }
-
-    @Override
-    public void setTimeToLive(long timeToLive) throws JMSException {
-        this.timeToLive = timeToLive;
-    }
-
-    @Override
-    public long getTimeToLive() throws JMSException {
-        return this.timeToLive;
-    }
-
-    @Override
-    public void setDeliveryDelay(long deliveryDelay) throws JMSException {
-        this.deliveryDelay = deliveryDelay;
-    }
-
-    @Override
-    public long getDeliveryDelay() throws JMSException {
-        return this.deliveryDelay;
-    }
-
-    @Override
-    public Destination getDestination() throws JMSException {
-        return this.destination;
-    }
-
-    @Override
-    public void close() throws JMSException {
-        this.rocketMQProducer.shutdown();
-    }
-
-    @Override
-    public void send(Message message) throws JMSException {
-        this.send(this.destination, message);
-    }
-
-    @Override
-    public void send(Message message, int deliveryMode, int priority, long 
timeToLive) throws JMSException {
-        this.send(this.destination, message, deliveryMode, priority, 
timeToLive);
-    }
-
-    @Override
-    public void send(Destination destination, Message message) throws 
JMSException {
-        this.send(destination, message, getDeliveryMode(), getPriority(), 
getTimeToLive());
-    }
-
-    @Override
-    public void send(Destination destination, Message message, int 
deliveryMode, int priority,
-        long timeToLive) throws JMSException {
-
-        sendMessageHook.before(message, destination, deliveryMode, priority, 
timeToLive);
-
-        MessageExt rmqMsg = createRocketMQMessage(message);
-
-        SendResult sendResult = sendSync(rmqMsg);
-        if (sendResult != null && sendResult.getSendStatus() == 
SendStatus.SEND_OK) {
-            log.debug("Success to send message[key={}]", rmqMsg.getKeys());
-            return;
-        }
-        else {
-            throw new JMSException(format("Sending message error with result 
status:%s", sendResult.getSendStatus().name()));
-        }
-    }
-
-    private SendResult sendSync(org.apache.rocketmq.common.message.Message 
rmqMsg) throws JMSException {
-
-        try {
-            return rocketMQProducer.send(rmqMsg);
-        }
-        catch (Exception e) {
-            throw new JMSException(format("Fail to send message. Error: %s", 
getStackTrace(e)));
-        }
-    }
-
-    private void sendAsync(org.apache.rocketmq.common.message.Message rmqMsg,
-        CompletionListener completionListener) throws JMSException {
-        try {
-            rocketMQProducer.send(rmqMsg, new 
SendCompletionListener(completionListener));
-        }
-        catch (Exception e) {
-            throw new JMSException(format("Fail to send message. Error: %s", 
getStackTrace(e)));
-        }
-    }
-
-    private MessageExt createRocketMQMessage(Message jmsMsg) throws 
JMSException {
-        AbstractJMSMessage abstractJMSMessage = cast2Object(jmsMsg, 
AbstractJMSMessage.class);
-        try {
-            return JMS2RMQMessageConvert.convert(abstractJMSMessage);
-        }
-        catch (Exception e) {
-            throw new JMSException(format("Fail to convert to RocketMQ jmsMsg. 
Error: %s", getStackTrace(e)));
-        }
-    }
-
-    @Override
-    public void send(Message message, CompletionListener completionListener) 
throws JMSException {
-        this.send(this.destination, message, getDeliveryMode(), getPriority(), 
getTimeToLive(), completionListener);
-    }
-
-    @Override
-    public void send(Message message, int deliveryMode, int priority, long 
timeToLive,
-        CompletionListener completionListener) throws JMSException {
-        this.send(this.destination, message, deliveryMode, priority, 
timeToLive, completionListener);
-    }
-
-    @Override
-    public void send(Destination destination, Message message,
-        CompletionListener completionListener) throws JMSException {
-        this.send(destination, message, getDeliveryMode(), getPriority(), 
getTimeToLive(), completionListener);
-    }
-
-    @Override
-    public void send(Destination destination, Message message, int 
deliveryMode, int priority, long timeToLive,
-        CompletionListener completionListener) throws JMSException {
-
-        sendMessageHook.before(message, destination, deliveryMode, priority, 
timeToLive);
-
-        MessageExt rmqMsg = createRocketMQMessage(message);
-
-        sendAsync(rmqMsg, completionListener);
-    }
-
-    public RocketMQSession getSession() {
-        return session;
-    }
-
-    public void setSession(RocketMQSession session) {
-        this.session = session;
-    }
-
-    public void setRocketMQProducer(DefaultMQProducer rocketMQProducer) {
-        this.rocketMQProducer = rocketMQProducer;
-    }
-
-    public void setDestination(Destination destination) {
-        this.destination = destination;
-    }
-
-    public void setSendMessageHook(SendMessageHook sendMessageHook) {
-        this.sendMessageHook = sendMessageHook;
-    }
-
-    public String getUserName() {
-        return this.session.getConnection().getUserName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
deleted file mode 100644
index 0094c47..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import com.google.common.base.Preconditions;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.JMSRuntimeException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.protocol.body.GroupList;
-import org.apache.rocketmq.jms.admin.AdminFactory;
-import org.apache.rocketmq.jms.destination.RocketMQQueue;
-import org.apache.rocketmq.jms.destination.RocketMQTopic;
-import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException;
-import org.apache.rocketmq.jms.msg.JMSBytesMessage;
-import org.apache.rocketmq.jms.msg.JMSMapMessage;
-import org.apache.rocketmq.jms.msg.JMSObjectMessage;
-import org.apache.rocketmq.jms.msg.JMSTextMessage;
-import org.apache.rocketmq.jms.support.JMSUtils;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.rocketmq.jms.Constant.DEFAULT_DURABLE;
-import static org.apache.rocketmq.jms.Constant.DEFAULT_NO_LOCAL;
-import static org.apache.rocketmq.jms.Constant.NO_MESSAGE_SELECTOR;
-
-/**
- * Implement of {@link Session}.
- */
-public class RocketMQSession implements Session {
-
-    private static final Logger log = 
LoggerFactory.getLogger(RocketMQSession.class);
-
-    private RocketMQConnection connection;
-
-    private int acknowledgeMode;
-
-    private boolean transacted;
-
-    private ConsumeMessageService consumeMessageService;
-
-    private final List<RocketMQProducer> producerList = new ArrayList();
-
-    private final List<RocketMQConsumer> consumerList = new ArrayList();
-
-    private final Set<RocketMQConsumer> asyncConsumerSet = new HashSet();
-
-    private final Set<RocketMQConsumer> syncConsumerSet = new HashSet();
-
-    public RocketMQSession(RocketMQConnection connection, int acknowledgeMode, 
boolean transacted) {
-        this.connection = connection;
-        this.acknowledgeMode = acknowledgeMode;
-        this.transacted = transacted;
-
-        this.consumeMessageService = new ConsumeMessageService(this);
-        this.consumeMessageService.start();
-    }
-
-    @Override
-    public BytesMessage createBytesMessage() throws JMSException {
-        return new JMSBytesMessage();
-    }
-
-    @Override
-    public MapMessage createMapMessage() throws JMSException {
-        return new JMSMapMessage();
-    }
-
-    @Override
-    public Message createMessage() throws JMSException {
-        return new JMSBytesMessage();
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage() throws JMSException {
-        return new JMSObjectMessage();
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage(Serializable serializable) throws 
JMSException {
-        return new JMSObjectMessage(serializable);
-    }
-
-    @Override
-    public StreamMessage createStreamMessage() throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public TextMessage createTextMessage() throws JMSException {
-        return new JMSTextMessage();
-    }
-
-    @Override
-    public TextMessage createTextMessage(String text) throws JMSException {
-        return new JMSTextMessage(text);
-    }
-
-    @Override
-    public boolean getTransacted() throws JMSException {
-        return this.transacted;
-    }
-
-    @Override
-    public int getAcknowledgeMode() throws JMSException {
-        return this.acknowledgeMode;
-    }
-
-    @Override
-    public void commit() throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public void rollback() throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public void close() throws JMSException {
-        for (RocketMQProducer producer : this.producerList) {
-            producer.close();
-        }
-        for (RocketMQConsumer consumer : this.consumerList) {
-            consumer.close();
-        }
-    }
-
-    @Override
-    public void recover() throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public MessageListener getMessageListener() throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public void setMessageListener(MessageListener listener) throws 
JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public void run() {
-        //todo
-        throw new JMSRuntimeException("Not support yet");
-    }
-
-    @Override
-    public MessageProducer createProducer(Destination destination) throws 
JMSException {
-        RocketMQProducer producer = new RocketMQProducer(this, destination);
-        this.producerList.add(producer);
-        return producer;
-    }
-
-    @Override
-    public MessageConsumer createConsumer(Destination destination) throws 
JMSException {
-        return createConsumer(destination, NO_MESSAGE_SELECTOR);
-    }
-
-    @Override
-    public MessageConsumer createConsumer(Destination destination, String 
messageSelector) throws JMSException {
-        return createConsumer(destination, messageSelector, DEFAULT_NO_LOCAL);
-    }
-
-    @Override
-    public MessageConsumer createConsumer(Destination destination, String 
messageSelector,
-        boolean noLocal) throws JMSException {
-
-        // ignore noLocal param as RMQ not support
-        RocketMQConsumer consumer = new RocketMQConsumer(this, destination, 
messageSelector, DEFAULT_DURABLE, false);
-        this.consumerList.add(consumer);
-
-        return consumer;
-    }
-
-    @Override
-    public MessageConsumer createSharedConsumer(Topic topic, String 
sharedSubscriptionName) throws JMSException {
-        return createSharedConsumer(topic, sharedSubscriptionName, 
NO_MESSAGE_SELECTOR);
-    }
-
-    @Override
-    public MessageConsumer createSharedConsumer(Topic topic, String 
sharedSubscriptionName,
-        String messageSelector) throws JMSException {
-        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, sharedSubscriptionName, DEFAULT_DURABLE, true);
-        this.consumerList.add(consumer);
-
-        return consumer;
-    }
-
-    @Override
-    public Queue createQueue(String queueName) throws JMSException {
-        return new RocketMQQueue(queueName);
-    }
-
-    @Override
-    public Topic createTopic(String topicName) throws JMSException {
-        Preconditions.checkNotNull(topicName);
-
-        return new RocketMQTopic(topicName);
-    }
-
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException {
-        return createDurableSubscriber(topic, name, NO_MESSAGE_SELECTOR, 
DEFAULT_NO_LOCAL);
-    }
-
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name, 
String messageSelector,
-        boolean noLocal) throws JMSException {
-        RocketMQTopicSubscriber subscriber = new RocketMQTopicSubscriber(this, 
topic, messageSelector, name, true, true);
-        this.consumerList.add(subscriber);
-
-        return subscriber;
-    }
-
-    @Override
-    public MessageConsumer createDurableConsumer(Topic topic, String name) 
throws JMSException {
-        return createDurableConsumer(topic, name, NO_MESSAGE_SELECTOR, true);
-    }
-
-    @Override
-    public MessageConsumer createDurableConsumer(Topic topic, String name, 
String messageSelector,
-        boolean noLocal) throws JMSException {
-        DefaultMQAdminExt admin = 
AdminFactory.getAdmin(this.getConnection().getClientConfig().getNamesrvAddr());
-        try {
-            GroupList groupList = 
admin.queryTopicConsumeByWho(topic.getTopicName());
-            if 
(groupList.getGroupList().contains(JMSUtils.getConsumerGroup(name, 
this.getConnection().getClientID(), false))) {
-                throw new DuplicateSubscriptionException("The same 
subscription( join subscriptionName with clientID) has existed, so couldn't 
create consumer on them again ");
-            }
-        }
-        catch (InterruptedException | MQBrokerException | RemotingException | 
MQClientException e) {
-            throw new JMSException(ExceptionUtils.getStackTrace(e));
-        }
-        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, name, true, false);
-        this.consumerList.add(consumer);
-
-        return consumer;
-    }
-
-    @Override
-    public MessageConsumer createSharedDurableConsumer(Topic topic, String 
name) throws JMSException {
-        return createSharedDurableConsumer(topic, name, NO_MESSAGE_SELECTOR);
-    }
-
-    @Override
-    public MessageConsumer createSharedDurableConsumer(Topic topic, String 
name,
-        String messageSelector) throws JMSException {
-        RocketMQConsumer consumer = new RocketMQConsumer(this, topic, 
messageSelector, name, true, true);
-        this.consumerList.add(consumer);
-
-        return consumer;
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue) throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue, String messageSelector) 
throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public TemporaryQueue createTemporaryQueue() throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public TemporaryTopic createTemporaryTopic() throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    @Override
-    public void unsubscribe(String name) throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
-    }
-
-    public List<RocketMQProducer> getProducerList() {
-        return producerList;
-    }
-
-    public List<RocketMQConsumer> getConsumerList() {
-        return consumerList;
-    }
-
-    public RocketMQConnection getConnection() {
-        return connection;
-    }
-
-    public boolean isTransacted() {
-        return transacted;
-    }
-
-    public void addSyncConsumer(RocketMQConsumer consumer) {
-        this.syncConsumerSet.add(consumer);
-    }
-
-    public void addAsyncConsumer(RocketMQConsumer consumer) {
-        this.asyncConsumerSet.add(consumer);
-    }
-
-    public boolean isAsyncModel() {
-        return !this.asyncConsumerSet.isEmpty();
-    }
-
-    public boolean isSyncModel() {
-        return !this.syncConsumerSet.isEmpty();
-    }
-
-    public ConsumeMessageService getConsumeMessageService() {
-        return consumeMessageService;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
deleted file mode 100644
index 51b732b..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import javax.jms.JMSException;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-public class RocketMQTopicSubscriber extends RocketMQConsumer implements 
TopicSubscriber {
-
-    private Topic topic;
-
-    public RocketMQTopicSubscriber(RocketMQSession session, Topic topic, 
String messageSelector,
-        String sharedSubscriptionName, boolean durable, boolean shared) {
-        super(session, topic, messageSelector, sharedSubscriptionName, 
durable, shared);
-        this.topic = topic;
-    }
-
-    @Override
-    public Topic getTopic() throws JMSException {
-        return this.topic;
-    }
-
-    @Override
-    public boolean getNoLocal() throws JMSException {
-        //todo: not inhibit now
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java 
b/core/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java
deleted file mode 100644
index a99a607..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms;
-
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-
-import javax.jms.CompletionListener;
-
-public class SendCompletionListener implements SendCallback {
-
-    private CompletionListener completionListener;
-
-    public SendCompletionListener(CompletionListener completionListener) {
-        this.completionListener = completionListener;
-    }
-
-    @Override
-    public void onSuccess(SendResult sendResult) {
-        //todo: how to transmit message into
-        this.completionListener.onCompletion(null);
-    }
-
-    @Override
-    public void onException(Throwable e) {
-        //todo: how to transmit message into
-        this.completionListener.onException(null, new Exception(e));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java 
b/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java
deleted file mode 100644
index c551a22..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.jms.admin;
-
-import java.util.concurrent.ConcurrentHashMap;
-import javax.jms.JMSException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.jms.exception.JMSClientException;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-
-public class AdminFactory {
-
-    private static ConcurrentHashMap<String/*nameServerAddress*/, 
DefaultMQAdminExt> admins = new ConcurrentHashMap();
-
-    public static DefaultMQAdminExt getAdmin(String nameServerAddress) throws 
JMSException {
-        if (nameServerAddress == null) {
-            throw new IllegalArgumentException("NameServerAddress could be 
null");
-        }
-
-        DefaultMQAdminExt admin = admins.get(nameServerAddress);
-        if (admin != null) {
-            return admin;
-        }
-
-        admin = new DefaultMQAdminExt(nameServerAddress);
-        try {
-            admin.start();
-        }
-        catch (MQClientException e) {
-            throw new JMSClientException("Error during starting admin client");
-        }
-        DefaultMQAdminExt old = admins.putIfAbsent(nameServerAddress, admin);
-        if (old != null) {
-            admin.shutdown();
-            return old;
-        }
-
-        return admin;
-    }
-}


Reply via email to