Refactor multi-module to single-module
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/db8e0dd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/db8e0dd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/db8e0dd1 Branch: refs/heads/jms-dev-1.1.0 Commit: db8e0dd18f7916dcf293e27211b5aa09ed9fa132 Parents: 1ac8377 Author: zhangke <[email protected]> Authored: Wed Mar 1 22:53:45 2017 +0800 Committer: zhangke <[email protected]> Committed: Wed Mar 1 22:53:45 2017 +0800 ---------------------------------------------------------------------- .travis.yml | 2 +- core/pom.xml | 69 --- .../java/org/apache/rocketmq/jms/Constant.java | 82 ---- .../rocketmq/jms/ConsumeMessageService.java | 71 --- .../org/apache/rocketmq/jms/ConsumeModel.java | 23 - .../rocketmq/jms/DeliverMessageService.java | 275 ----------- .../org/apache/rocketmq/jms/MessageWrapper.java | 52 -- .../apache/rocketmq/jms/RocketMQConnection.java | 234 --------- .../rocketmq/jms/RocketMQConnectionFactory.java | 128 ----- .../jms/RocketMQConnectionMetaData.java | 116 ----- .../apache/rocketmq/jms/RocketMQConsumer.java | 152 ------ .../apache/rocketmq/jms/RocketMQProducer.java | 269 ---------- .../apache/rocketmq/jms/RocketMQSession.java | 367 -------------- .../rocketmq/jms/RocketMQTopicSubscriber.java | 44 -- .../rocketmq/jms/SendCompletionListener.java | 44 -- .../apache/rocketmq/jms/admin/AdminFactory.java | 55 --- .../rocketmq/jms/destination/RocketMQQueue.java | 39 -- .../rocketmq/jms/destination/RocketMQTopic.java | 43 -- .../DuplicateSubscriptionException.java | 31 -- .../jms/exception/JMSClientException.java | 31 -- .../jms/exception/MessageExpiredException.java | 31 -- .../UnsupportDeliveryModelException.java | 27 - .../rocketmq/jms/hook/ReceiveMessageHook.java | 44 -- .../rocketmq/jms/hook/SendMessageHook.java | 104 ---- .../rocketmq/jms/msg/AbstractJMSMessage.java | 400 --------------- .../rocketmq/jms/msg/JMSBytesMessage.java | 491 ------------------- .../apache/rocketmq/jms/msg/JMSMapMessage.java | 229 --------- .../rocketmq/jms/msg/JMSObjectMessage.java | 55 --- .../apache/rocketmq/jms/msg/JMSTextMessage.java | 67 --- .../jms/msg/convert/JMS2RMQMessageConvert.java | 66 --- .../jms/msg/convert/RMQ2JMSMessageConvert.java | 103 ---- .../rocketmq/jms/msg/enums/JMSHeaderEnum.java | 44 -- .../jms/msg/enums/JMSMessageModelEnum.java | 53 -- .../jms/msg/enums/JMSPropertiesEnum.java | 26 - .../jms/msg/serialize/MapSerialize.java | 43 -- .../jms/msg/serialize/ObjectSerialize.java | 69 --- .../rocketmq/jms/msg/serialize/Serialize.java | 27 - .../jms/msg/serialize/StringSerialize.java | 61 --- .../apache/rocketmq/jms/support/JMSUtils.java | 94 ---- .../rocketmq/jms/support/ObjectTypeCast.java | 75 --- .../rocketmq/jms/support/PrimitiveTypeCast.java | 220 --------- .../rocketmq/jms/support/ProviderVersion.java | 37 -- core/src/main/resources/logback.xml | 56 --- .../jms/RocketMQConnectionFactoryTest.java | 36 -- .../jms/destination/RocketMQQueueTest.java | 34 -- .../jms/destination/RocketMQTopicTest.java | 35 -- .../jms/hook/ReceiveMessageHookTest.java | 67 --- .../rocketmq/jms/hook/SendMessageHookTest.java | 102 ---- .../rocketmq/jms/msg/JMSBytesMessageTest.java | 106 ---- .../rocketmq/jms/msg/JMSMapMessageTest.java | 70 --- .../rocketmq/jms/msg/JMSObjectMessageTest.java | 73 --- .../rocketmq/jms/msg/JMSTextMessageTest.java | 41 -- .../msg/convert/JMS2RMQMessageConvertTest.java | 60 --- .../msg/convert/RMQ2JMSMessageConvertTest.java | 65 --- .../jms/msg/enums/JMSMessageModelEnumTest.java | 31 -- .../jms/msg/serialize/MapSerializeTest.java | 42 -- .../jms/msg/serialize/ObjectSerializeTest.java | 63 --- .../jms/msg/serialize/StringSerializeTest.java | 36 -- .../rocketmq/jms/support/JMSUtilsTest.java | 62 --- .../jms/support/ObjectTypeCastTest.java | 52 -- .../jms/support/PrimitiveTypeCastTest.java | 210 -------- pom.xml | 130 ++--- .../java/org/apache/rocketmq/jms/Constant.java | 82 ++++ .../rocketmq/jms/ConsumeMessageService.java | 71 +++ .../org/apache/rocketmq/jms/ConsumeModel.java | 23 + .../rocketmq/jms/DeliverMessageService.java | 275 +++++++++++ .../org/apache/rocketmq/jms/MessageWrapper.java | 52 ++ .../apache/rocketmq/jms/RocketMQConnection.java | 233 +++++++++ .../rocketmq/jms/RocketMQConnectionFactory.java | 128 +++++ .../jms/RocketMQConnectionMetaData.java | 116 +++++ .../apache/rocketmq/jms/RocketMQConsumer.java | 152 ++++++ .../apache/rocketmq/jms/RocketMQProducer.java | 269 ++++++++++ .../apache/rocketmq/jms/RocketMQSession.java | 367 ++++++++++++++ .../rocketmq/jms/RocketMQTopicSubscriber.java | 44 ++ .../rocketmq/jms/SendCompletionListener.java | 43 ++ .../apache/rocketmq/jms/admin/AdminFactory.java | 55 +++ .../rocketmq/jms/destination/RocketMQQueue.java | 39 ++ .../rocketmq/jms/destination/RocketMQTopic.java | 43 ++ .../DuplicateSubscriptionException.java | 31 ++ .../jms/exception/JMSClientException.java | 31 ++ .../jms/exception/MessageExpiredException.java | 31 ++ .../UnsupportDeliveryModelException.java | 27 + .../rocketmq/jms/hook/ReceiveMessageHook.java | 44 ++ .../rocketmq/jms/hook/SendMessageHook.java | 104 ++++ .../rocketmq/jms/msg/AbstractJMSMessage.java | 400 +++++++++++++++ .../rocketmq/jms/msg/JMSBytesMessage.java | 491 +++++++++++++++++++ .../apache/rocketmq/jms/msg/JMSMapMessage.java | 229 +++++++++ .../rocketmq/jms/msg/JMSObjectMessage.java | 55 +++ .../apache/rocketmq/jms/msg/JMSTextMessage.java | 67 +++ .../jms/msg/convert/JMS2RMQMessageConvert.java | 66 +++ .../jms/msg/convert/RMQ2JMSMessageConvert.java | 103 ++++ .../rocketmq/jms/msg/enums/JMSHeaderEnum.java | 44 ++ .../jms/msg/enums/JMSMessageModelEnum.java | 53 ++ .../jms/msg/enums/JMSPropertiesEnum.java | 26 + .../jms/msg/serialize/MapSerialize.java | 43 ++ .../jms/msg/serialize/ObjectSerialize.java | 69 +++ .../rocketmq/jms/msg/serialize/Serialize.java | 27 + .../jms/msg/serialize/StringSerialize.java | 61 +++ .../apache/rocketmq/jms/support/JMSUtils.java | 94 ++++ .../rocketmq/jms/support/ObjectTypeCast.java | 75 +++ .../rocketmq/jms/support/PrimitiveTypeCast.java | 220 +++++++++ .../rocketmq/jms/support/ProviderVersion.java | 37 ++ src/main/resources/logback.xml | 56 +++ .../jms/RocketMQConnectionFactoryTest.java | 36 ++ .../jms/destination/RocketMQQueueTest.java | 34 ++ .../jms/destination/RocketMQTopicTest.java | 35 ++ .../jms/hook/ReceiveMessageHookTest.java | 67 +++ .../rocketmq/jms/hook/SendMessageHookTest.java | 102 ++++ .../jms/integration/source/AppConfig.java | 59 +++ .../jms/integration/source/Constant.java | 39 ++ .../jms/integration/source/RocketMQAdmin.java | 93 ++++ .../jms/integration/source/RocketMQServer.java | 162 ++++++ .../integration/source/SimpleTextListener.java | 60 +++ .../source/support/ConditionMatcher.java | 23 + .../source/support/TimeLimitAssert.java | 40 ++ .../test/ConsumeAsynchronousTest.java | 90 ++++ .../test/ConsumeSynchronousTest.java | 82 ++++ .../integration/test/NonDurableConsumeTest.java | 132 +++++ .../test/SharedDurableConsumeTest.java | 120 +++++ .../test/UnsharedDurableConsumeTest.java | 127 +++++ .../test/listener/SimpleTextListenerTest.java | 60 +++ .../rocketmq/jms/msg/JMSBytesMessageTest.java | 106 ++++ .../rocketmq/jms/msg/JMSMapMessageTest.java | 70 +++ .../rocketmq/jms/msg/JMSObjectMessageTest.java | 73 +++ .../rocketmq/jms/msg/JMSTextMessageTest.java | 41 ++ .../msg/convert/JMS2RMQMessageConvertTest.java | 60 +++ .../msg/convert/RMQ2JMSMessageConvertTest.java | 65 +++ .../jms/msg/enums/JMSMessageModelEnumTest.java | 31 ++ .../jms/msg/serialize/MapSerializeTest.java | 42 ++ .../jms/msg/serialize/ObjectSerializeTest.java | 63 +++ .../jms/msg/serialize/StringSerializeTest.java | 36 ++ .../rocketmq/jms/support/JMSUtilsTest.java | 62 +++ .../jms/support/ObjectTypeCastTest.java | 52 ++ .../jms/support/PrimitiveTypeCastTest.java | 210 ++++++++ test/pom.xml | 70 --- .../rocketmq/jms/integration/AppConfig.java | 59 --- .../rocketmq/jms/integration/Constant.java | 39 -- .../rocketmq/jms/integration/RocketMQAdmin.java | 93 ---- .../jms/integration/RocketMQServer.java | 162 ------ .../listener/SimpleTextListener.java | 61 --- test/src/main/resources/logback.xml | 56 --- .../integration/ConsumeAsynchronousTest.java | 90 ---- .../jms/integration/ConsumeSynchronousTest.java | 79 --- .../jms/integration/NonDurableConsumeTest.java | 129 ----- .../integration/SharedDurableConsumeTest.java | 120 ----- .../integration/UnsharedDurableConsumeTest.java | 124 ----- .../listener/SimpleTextListenerTest.java | 59 --- .../integration/support/ConditionMatcher.java | 23 - .../integration/support/TimeLimitAssert.java | 40 -- 149 files changed, 6819 insertions(+), 6997 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index ac9e6b6..33841d0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -35,7 +35,7 @@ before_install: script: - - travis_retry mvn -B clean install coveralls:report + - travis_retry mvn -B clean install jacoco:report #after_success: # - mvn clean install http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml deleted file mode 100644 index c01d2e0..0000000 --- a/core/pom.xml +++ /dev/null @@ -1,69 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>rocketmq-jms-all</artifactId> - <groupId>org.apache.rocketmq</groupId> - <version>1.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>rocketmq-jms-core</artifactId> - - <dependencies> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-client</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-tools</artifactId> - </dependency> - <dependency> - <groupId>javax.jms</groupId> - <artifactId>javax.jms-api</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - </dependencies> - - <build> - <finalName>rocketmq-jms-${project.version}</finalName> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/Constant.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/Constant.java b/core/src/main/java/org/apache/rocketmq/jms/Constant.java deleted file mode 100644 index 9519bea..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/Constant.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -public interface Constant { - - String NO_MESSAGE_SELECTOR = "*"; - - boolean DEFAULT_NO_LOCAL = true; - - boolean DEFAULT_DURABLE = false; - - //-------------------------JMS defined properties constant---------------------------- - /** - * The identity of the user sending the Send message - */ - String JMS_XUSER_ID = "jmsXUserID"; - /** - * The identity of the application Send sending the message - */ - String JMS_XAPP_ID = "jmsXAppID"; - /** - * The number of message delivery Receive attempts - */ - String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount"; - /** - * The identity of the message group this message is part of - */ - String JMS_XGROUP_ID = "jmsXGroupID"; - /** - * The sequence number of this message within the group; the first message is 1, the second 2,... - */ - String JMS_XGROUP_SEQ = "jmsXGroupSeq"; - /** - * The transaction identifier of the Send transaction within which this message was produced - */ - String JMS_XPRODUCER_TXID = "jmsXProducerTXID"; - /** - * The transaction identifier of the Receive transaction within which this message was consumed - */ - String JMS_XCONSUMER_TXID = "jmsXConsumerTXID"; - - /** - * The time JMS delivered the Receive message to the consumer - */ - String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp"; - /** - * Assume there exists a message warehouse that contains a separate copy of each message sent to each consumer and - * that these copies exist from the time the original message was sent. Each copyâs state is one of: 1(waiting), - * 2(ready), 3(expired) or 4(retained) Since state is of no interest to producers and consumers it is not provided - * to either. It is only of relevance to messages looked up in a warehouse and JMS provides no API for this. - */ - String JMS_XSTATE = "jmsXState"; - - //---------------------------JMS Headers' value constant--------------------------- - /** - * Default time to live - */ - long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000; - - /** - * Default Jms Type - */ - String DEFAULT_JMS_TYPE = "rocketmq"; - - String MESSAGE_ID_PREFIX = "ID:"; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java b/core/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java deleted file mode 100644 index 79efa6a..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import org.apache.rocketmq.common.ServiceThread; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicLong; -import javax.jms.JMSRuntimeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConsumeMessageService extends ServiceThread { - - private static final Logger log = LoggerFactory.getLogger(DeliverMessageService.class); - private static final AtomicLong COUNTER = new AtomicLong(0L); - - private BlockingQueue<MessageWrapper> queue = new ArrayBlockingQueue(1000); - private RocketMQSession session; - private final long index = COUNTER.incrementAndGet(); - - public ConsumeMessageService(RocketMQSession session) { - this.session = session; - } - - @Override public String getServiceName() { - return ConsumeMessageService.class.getSimpleName() + "-" + this.index; - } - - @Override public void run() { - while (!this.isStopped()) { - try { - MessageWrapper wrapper = queue.take(); - RocketMQConsumer consumer = wrapper.getConsumer(); - consumer.getMessageListener().onMessage(wrapper.getMessage()); - consumer.getDeliverMessageService().ack(wrapper.getMq(), wrapper.getOffset()); - } - catch (Exception e) { - log.error(e.getMessage(), e); - } - } - } - - public void put(MessageWrapper wrapper) { - try { - this.queue.put(wrapper); - } - catch (InterruptedException e) { - throw new JMSRuntimeException(e.getMessage()); - } - } - - public RocketMQSession getSession() { - return session; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java b/core/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java deleted file mode 100644 index 356cbd7..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -public enum ConsumeModel { - SYNC, - ASYNC -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java deleted file mode 100644 index 1043f5d..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; -import org.apache.rocketmq.client.consumer.PullResult; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.jms.exception.MessageExpiredException; -import org.apache.rocketmq.jms.hook.ReceiveMessageHook; -import org.apache.rocketmq.jms.msg.convert.RMQ2JMSMessageConvert; -import org.apache.rocketmq.jms.support.JMSUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.String.format; - -/** - * Service deliver messages synchronous or asynchronous. - */ -public class DeliverMessageService extends ServiceThread { - - private static final Logger log = LoggerFactory.getLogger(DeliverMessageService.class); - private static final AtomicLong COUNTER = new AtomicLong(0L); - private static final int PULL_BATCH_SIZE = 100; - - private RocketMQConsumer consumer; - private DefaultMQPullConsumer rocketMQPullConsumer; - private Destination destination; - private String consumerGroup; - private String topicName; - private ConsumeModel consumeModel = ConsumeModel.SYNC; - - /** only support RMQ subExpression */ - private String messageSelector; - private ReceiveMessageHook hook = new ReceiveMessageHook(); - - /** - * If durable is true, consume message from the offset consumed last time. - * Otherwise consume from the max offset - */ - private boolean durable = false; - private boolean shared = false; - - private BlockingQueue<MessageWrapper> msgQueue = new ArrayBlockingQueue(PULL_BATCH_SIZE); - private volatile boolean pause = true; - private final long index = COUNTER.incrementAndGet(); - - private Map<MessageQueue, Long> offsetMap = new HashMap(); - - public DeliverMessageService(RocketMQConsumer consumer, Destination destination, String consumerGroup, - String messageSelector, boolean durable, boolean shared) { - this.consumer = consumer; - this.destination = destination; - this.consumerGroup = consumerGroup; - this.messageSelector = messageSelector; - this.durable = durable; - this.shared = shared; - - this.topicName = JMSUtils.getDestinationName(destination); - - createAndStartRocketMQPullConsumer(); - - if (this.consumer.getSession().getConnection().isStarted()) { - this.recover(); - } - else { - this.pause(); - } - } - - private void createAndStartRocketMQPullConsumer() { - final ClientConfig clientConfig = this.consumer.getSession().getConnection().getClientConfig(); - this.rocketMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); - this.rocketMQPullConsumer.setNamesrvAddr(clientConfig.getNamesrvAddr()); - this.rocketMQPullConsumer.setInstanceName(clientConfig.getInstanceName()); - this.rocketMQPullConsumer.setRegisterTopics(new HashSet(Arrays.asList(this.topicName))); - - try { - this.rocketMQPullConsumer.start(); - } - catch (MQClientException e) { - throw new JMSRuntimeException(format("Fail to start RocketMQ pull consumer, error msg:%s", ExceptionUtils.getStackTrace(e))); - } - } - - @Override - public String getServiceName() { - return DeliverMessageService.class.getSimpleName() + "-" + this.index; - } - - @Override - public void run() { - while (!isStopped()) { - if (pause) { - this.waitForRunning(1000); - continue; - } - - try { - pullMessage(); - } - catch (InterruptedException e) { - log.debug("Pulling messages service has been interrupted"); - } - catch (Exception e) { - log.error("Error during pulling messages", e); - } - } - } - - private void pullMessage() throws Exception { - Set<MessageQueue> mqs = getMessageQueues(); - - for (MessageQueue mq : mqs) { - Long offset = offsetMap.get(mq); - if (offset == null) { - offset = beginOffset(mq); - } - PullResult pullResult = this.rocketMQPullConsumer.pullBlockIfNotFound(mq, this.messageSelector, offset, PULL_BATCH_SIZE); - - switch (pullResult.getPullStatus()) { - case FOUND: - List<MessageExt> msgs = pullResult.getMsgFoundList(); - offsetMap.put(mq, pullResult.getMaxOffset()); - for (MessageExt msg : msgs) { - handleMessage(msg, mq); - } - log.debug("Pull {} messages from topic:{},broker:{},queueId:{}", msgs.size(), mq.getTopic(), mq.getBrokerName(), mq.getQueueId()); - break; - case NO_NEW_MSG: - case NO_MATCHED_MSG: - break; - case OFFSET_ILLEGAL: - throw new JMSException("Error during pull message[reason:OFFSET_ILLEGAL]"); - } - } - } - - private Set<MessageQueue> getMessageQueues() throws MQClientException { - Set<MessageQueue> mqs = this.rocketMQPullConsumer.fetchSubscribeMessageQueues(this.topicName); - return mqs; - } - - /** - * Refer to {@link #durable}. - * - * @param mq message queue - * @return offset - * @throws MQClientException - */ - private Long beginOffset(MessageQueue mq) throws MQClientException { - return this.durable ? this.rocketMQPullConsumer.fetchConsumeOffset(mq, false) : this.rocketMQPullConsumer.maxOffset(mq); - } - - /** - * If {@link #consumeModel} is {@link ConsumeModel#ASYNC}, messages pulled from broker - * are handled in {@link ConsumeMessageService} owned by its session. - * - * If {@link #consumeModel} is {@link ConsumeModel#SYNC}, messages pulled from broker are put - * into a memory blocking queue, waiting for the {@link MessageConsumer#receive()} - * using {@link BlockingQueue#poll()} to handle messages synchronous. - * - * @param msg to handle message - * @throws InterruptedException - * @throws JMSException - */ - private void handleMessage(MessageExt msg, MessageQueue mq) throws InterruptedException, JMSException { - Message jmsMessage = RMQ2JMSMessageConvert.convert(msg); - - try { - hook.before(jmsMessage); - } - catch (MessageExpiredException e) { - log.debug(e.getMessage()); - } - - final MessageWrapper wrapper = new MessageWrapper(jmsMessage, this.consumer, mq, msg.getQueueOffset()); - - switch (this.consumeModel) { - case SYNC: - this.msgQueue.put(wrapper); - break; - case ASYNC: - this.consumer.getSession().getConsumeMessageService().put(wrapper); - break; - default: - throw new JMSException(format("Unsupported consume model[%s]", this.consumeModel)); - } - } - - public void ack(MessageQueue mq, Long offset) throws JMSException { - try { - this.rocketMQPullConsumer.updateConsumeOffset(mq, offset); - } - catch (MQClientException e) { - throw new JMSException(format("Fail to ack offset[mq:%s,offset:%s]", mq, offset)); - } - } - - public MessageWrapper poll() throws JMSException { - try { - return this.msgQueue.take(); - } - catch (InterruptedException e) { - throw new JMSException(e.getMessage()); - } - } - - public MessageWrapper poll(long timeout, TimeUnit timeUnit) throws JMSException { - try { - return this.msgQueue.poll(timeout, timeUnit); - } - catch (InterruptedException e) { - throw new JMSException(e.getMessage()); - } - } - - public void pause() { - this.pause = true; - } - - public void recover() { - this.pause = false; - } - - public void close() { - - this.stop(); - - this.rocketMQPullConsumer.shutdown(); - - this.shutdown(true); - - log.debug("Success to close message delivery service:{}", getServiceName()); - } - - public void setConsumeModel(ConsumeModel consumeModel) { - this.consumeModel = consumeModel; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java b/core/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java deleted file mode 100644 index 1bca541..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import org.apache.rocketmq.common.message.MessageQueue; -import javax.jms.Message; - -public class MessageWrapper { - - private Message message; - private RocketMQConsumer consumer; - private MessageQueue mq; - private long offset; - - public MessageWrapper(Message message, RocketMQConsumer consumer, MessageQueue mq, long offset) { - this.message = message; - this.consumer = consumer; - this.mq = mq; - this.offset = offset; - } - - public Message getMessage() { - return message; - } - - public RocketMQConsumer getConsumer() { - return consumer; - } - - public MessageQueue getMq() { - return mq; - } - - public long getOffset() { - return offset; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java deleted file mode 100644 index 727ebca..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Connection; -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.impl.MQClientManager; -import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.String.format; -import static javax.jms.Session.AUTO_ACKNOWLEDGE; -import static javax.jms.Session.SESSION_TRANSACTED; -import static org.apache.commons.lang.StringUtils.isNotBlank; -import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace; - -public class RocketMQConnection implements Connection { - - private static final Logger log = LoggerFactory.getLogger(RocketMQConnection.class); - - private String clientID; - private ClientConfig clientConfig; - private MQClientInstance clientInstance; - private String userName; - private String password; - - private List<RocketMQSession> sessionList = new ArrayList(); - private AtomicBoolean started = new AtomicBoolean(false); - - public RocketMQConnection(String nameServerAddress, String clientID, String instanceName, String userName, - String password) { - this.clientID = clientID; - this.userName = userName; - this.password = password; - - this.clientConfig = new ClientConfig(); - this.clientConfig.setNamesrvAddr(nameServerAddress); - this.clientConfig.setInstanceName(instanceName); - - startClientInstance(); - } - - private void startClientInstance() { - try { - // create a tcp connection to broker and some other background thread - this.clientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.clientConfig); - clientInstance.start(); - } - catch (MQClientException e) { - throw new JMSRuntimeException(format("Fail to startClientInstance connection object[namesrvAddr:%s,instanceName:%s]. Error message:%s", - this.clientConfig.getNamesrvAddr(), this.clientConfig.getInstanceName(), getStackTrace(e))); - } - } - - @Override - public Session createSession() throws JMSException { - return createSession(false, AUTO_ACKNOWLEDGE); - } - - @Override - public Session createSession(int sessionMode) throws JMSException { - if (sessionMode == SESSION_TRANSACTED) { - return createSession(true, Session.AUTO_ACKNOWLEDGE); - } - else { - return createSession(false, sessionMode); - } - } - - @Override - public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { - //todo: support transacted and more acknowledge mode - if (transacted) { - throw new JMSException("Not support local transaction session"); - } - if (acknowledgeMode != AUTO_ACKNOWLEDGE) { - throw new JMSException("Only support AUTO_ACKNOWLEDGE mode now"); - } - - RocketMQSession session = new RocketMQSession(this, acknowledgeMode, transacted); - this.sessionList.add(session); - - return session; - } - - @Override - public ConnectionConsumer createConnectionConsumer(Destination destination, - String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public String getClientID() throws JMSException { - return this.clientID; - } - - @Override - public void setClientID(String clientID) throws JMSException { - if (isNotBlank(this.clientID)) { - throw new IllegalStateException("administratively client identifier has been configured."); - } - this.clientID = clientID; - } - - @Override - public ConnectionMetaData getMetaData() throws JMSException { - return RocketMQConnectionMetaData.instance(); - } - - @Override - public ExceptionListener getExceptionListener() throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public void setExceptionListener(ExceptionListener listener) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public void start() throws JMSException { - if (this.started.compareAndSet(false, true)) { - for (RocketMQSession session : sessionList) { - for (RocketMQConsumer consumer : session.getConsumerList()) { - consumer.getDeliverMessageService().recover(); - } - } - log.debug("Start connection successfully:{}", toString()); - } - } - - @Override - public void stop() throws JMSException { - if (this.started.compareAndSet(true, false)) { - for (RocketMQSession session : sessionList) { - for (RocketMQConsumer consumer : session.getConsumerList()) { - consumer.getDeliverMessageService().pause(); - } - } - log.debug("Stop connection successfully:{}", toString()); - } - } - - @Override - public void close() throws JMSException { - - for (RocketMQSession session : sessionList) { - session.close(); - } - - this.clientInstance.shutdown(); - - log.info("Success to close connection:{}", toString()); - } - - public boolean isStarted() { - return started.get(); - } - - public ClientConfig getClientConfig() { - return clientConfig; - } - - public String getUserName() { - return userName; - } - - @Override public String toString() { - return new ToStringBuilder(this) - .append("nameServerAddress", this.clientConfig.getNamesrvAddr()) - .append("instanceName", this.clientConfig.getInstanceName()) - .append("clientIdentifier", this.clientID) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java deleted file mode 100644 index c81e8b5..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSContext; -import javax.jms.JMSException; -import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.jms.support.JMSUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implement of {@link ConnectionFactory} using RocketMQ client. - * - * <P>In RocketMQ, all producers and consumers interactive with broker - * by an {@link MQClientInstance} object, which encapsulates tcp connection, - * schedule task and so on. The best way to control the behavior of producers/consumers - * derived from a connection is to manipulate the {@link MQClientInstance} directly. - * - * <P>However, this object is not easy to access as it is maintained within RocketMQ Client. - * Fortunately another equivalent identifier called "instanceName" is provided. - * The "instanceName" is a one-to-one conception with {@link MQClientInstance} object. - * Just like there is a hash map,"instanceName" is the key and a {@link MQClientInstance} - * object is the value. So the essential keyword passed through all objects created by a - * connection is "instanceName". - */ -public class RocketMQConnectionFactory implements ConnectionFactory { - - private static final Logger log = LoggerFactory.getLogger(RocketMQConnectionFactory.class); - - private String nameServerAddress; - - private String clientId; - - public RocketMQConnectionFactory(String nameServerAddress) { - this.nameServerAddress = nameServerAddress; - this.clientId = JMSUtils.uuid(); - } - - public RocketMQConnectionFactory(String nameServerAddress, String clientId) { - this.nameServerAddress = nameServerAddress; - this.clientId = clientId; - } - - @Override - public Connection createConnection() throws JMSException { - return createConnection(null, null); - } - - /** - * Using userName and Password to register a connection. Now access RMQ broker - * is anonymous and any userName/password is legal. - * - * @param userName ignored - * @param password ignored - * @return the new JMS Connection - * @throws JMSException - */ - @Override - public Connection createConnection(String userName, String password) throws JMSException { - return createRocketMQConnection(userName, password); - } - - private Connection createRocketMQConnection(String userName, String password) throws JMSException { - final String instanceName = JMSUtils.uuid(); - RocketMQConnection connection = new RocketMQConnection(this.nameServerAddress, this.clientId, instanceName, userName, password); - - log.info("Create a connection successfully[instanceName:{},clientIdentifier:{},userName:{}", instanceName, clientId, userName); - return connection; - } - - @Override - public JMSContext createContext() { - //todo: - return null; - } - - @Override - public JMSContext createContext(String userName, String password) { - //todo: - return null; - } - - @Override - public JMSContext createContext(String userName, String password, int sessionMode) { - //todo: - return null; - } - - @Override - public JMSContext createContext(int sessionMode) { - //todo: - return null; - } - - public String getClientId() { - return clientId; - } - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - public String getNameServerAddress() { - return nameServerAddress; - } - - public void setNameServerAddress(String nameServerAddress) { - this.nameServerAddress = nameServerAddress; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java deleted file mode 100644 index e4353e1..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.Enumeration; -import java.util.Vector; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.jms.ConnectionMetaData; -import javax.jms.JMSException; -import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; -import org.apache.rocketmq.jms.support.ProviderVersion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RocketMQConnectionMetaData implements ConnectionMetaData { - - private static final Logger log = LoggerFactory.getLogger(RocketMQConnectionMetaData.class); - private static final String PROVIDER_NAME = "Apache RocketMQ"; - - private String jmsVersion; - private int jmsMajorVersion; - private int jmsMinorVersion; - - private String providerVersion; - private int providerMajorVersion; - private int providerMinorVersion; - - private static RocketMQConnectionMetaData metaData = new RocketMQConnectionMetaData(); - - private RocketMQConnectionMetaData() { - Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*"); - - String jmsVersion = null; - int jmsMajor = 0; - int jmsMinor = 0; - try { - Package p = Package.getPackage("javax.jms"); - if (p != null) { - jmsVersion = p.getImplementationVersion(); - Matcher m = pattern.matcher(jmsVersion); - if (m.matches()) { - jmsMajor = Integer.parseInt(m.group(1)); - jmsMinor = Integer.parseInt(m.group(2)); - } - } - } - catch (Throwable e) { - log.error("Error during getting jms version", e); - } - - this.jmsVersion = jmsVersion; - this.jmsMajorVersion = jmsMajor; - this.jmsMinorVersion = jmsMinor; - - this.providerVersion = ProviderVersion.CURRENT_VERSION.name(); - this.providerMinorVersion = ProviderVersion.CURRENT_VERSION.getValue(); - this.providerMajorVersion = ProviderVersion.CURRENT_VERSION.getValue(); - } - - public static RocketMQConnectionMetaData instance() { - return metaData; - } - - public String getJMSVersion() throws JMSException { - return jmsVersion; - } - - public int getJMSMajorVersion() throws JMSException { - return jmsMajorVersion; - } - - public int getJMSMinorVersion() throws JMSException { - return jmsMinorVersion; - } - - public String getJMSProviderName() throws JMSException { - return PROVIDER_NAME; - } - - public String getProviderVersion() throws JMSException { - return providerVersion; - } - - public int getProviderMajorVersion() throws JMSException { - return providerMajorVersion; - } - - public int getProviderMinorVersion() throws JMSException { - return providerMinorVersion; - } - - public Enumeration<?> getJMSXPropertyNames() throws JMSException { - Vector<String> result = new Vector<String>(); - for (JMSPropertiesEnum e : JMSPropertiesEnum.values()) { - result.add(e.name()); - } - return result.elements(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java deleted file mode 100644 index af147e0..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import org.apache.rocketmq.jms.support.JMSUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RocketMQConsumer implements MessageConsumer { - - private static final Logger log = LoggerFactory.getLogger(RocketMQConsumer.class); - private RocketMQSession session; - private Destination destination; - private String messageSelector; - private MessageListener messageListener; - private String subscriptionName; - private boolean durable; - private boolean shared; - - private DeliverMessageService deliverMessageService; - - public RocketMQConsumer(RocketMQSession session, Destination destination, - String messageSelector, - boolean durable, boolean shared) { - this(session, destination, messageSelector, UUID.randomUUID().toString(), durable, shared); - } - - public RocketMQConsumer(RocketMQSession session, Destination destination, - String messageSelector, - String subscriptionName, boolean durable, boolean shared) { - this.session = session; - this.destination = destination; - this.messageSelector = messageSelector; - this.subscriptionName = subscriptionName; - this.durable = durable; - this.shared = shared; - - String consumerGroup = JMSUtils.getConsumerGroup(this); - this.deliverMessageService = new DeliverMessageService(this, this.destination, consumerGroup, - this.messageSelector, this.durable, this.shared); - this.deliverMessageService.start(); - } - - @Override - public String getMessageSelector() throws JMSException { - return messageSelector; - } - - @Override - public MessageListener getMessageListener() throws JMSException { - return this.messageListener; - } - - @Override - public void setMessageListener(MessageListener listener) throws JMSException { - if (this.session.isSyncModel()) { - throw new JMSException("A asynchronously call is not permitted when a session is being used synchronously"); - } - - this.messageListener = listener; - this.deliverMessageService.setConsumeModel(ConsumeModel.ASYNC); - this.session.addAsyncConsumer(this); - } - - @Override - public Message receive() throws JMSException { - return this.receive(0); - } - - @Override - public Message receive(long timeout) throws JMSException { - if (this.session.isAsyncModel()) { - throw new JMSException("A synchronous call is not permitted when a session is being used asynchronously."); - } - - this.session.addSyncConsumer(this); - - if (timeout == 0) { - MessageWrapper wrapper = this.deliverMessageService.poll(); - wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), wrapper.getOffset()); - return wrapper.getMessage(); - } - else { - MessageWrapper wrapper = this.deliverMessageService.poll(timeout, TimeUnit.MILLISECONDS); - if (wrapper == null) { - return null; - } - wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), wrapper.getOffset()); - return wrapper.getMessage(); - } - } - - @Override - public Message receiveNoWait() throws JMSException { - return receive(1); - } - - @Override - public void close() throws JMSException { - this.deliverMessageService.close(); - } - - public void start() { - this.deliverMessageService.recover(); - } - - public void stop() { - this.deliverMessageService.pause(); - } - - public DeliverMessageService getDeliverMessageService() { - return deliverMessageService; - } - - public RocketMQSession getSession() { - return session; - } - - public String getSubscriptionName() { - return subscriptionName; - } - - public boolean isDurable() { - return durable; - } - - public boolean isShared() { - return shared; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java deleted file mode 100644 index ab1eb69..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.UUID; -import javax.jms.CompletionListener; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException; -import org.apache.rocketmq.jms.hook.SendMessageHook; -import org.apache.rocketmq.jms.msg.AbstractJMSMessage; -import org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.String.format; -import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Object; - -public class RocketMQProducer implements MessageProducer { - - private static final Logger log = LoggerFactory.getLogger(RocketMQProducer.class); - private RocketMQSession session; - private DefaultMQProducer rocketMQProducer; - private Destination destination; - - private boolean disableMessageID; - private boolean disableMessageTimestamp; - private long timeToLive = JMS_TIME_TO_LIVE_DEFAULT_VALUE; - private int deliveryMode = JMS_DELIVERY_MODE_DEFAULT_VALUE; - private int priority = JMS_PRIORITY_DEFAULT_VALUE; - private long deliveryDelay = JMS_DELIVERY_TIME_DEFAULT_VALUE; - - private SendMessageHook sendMessageHook; - - public RocketMQProducer() { - } - - public RocketMQProducer(RocketMQSession session, Destination destination) { - this.session = session; - this.destination = destination; - - this.rocketMQProducer = new DefaultMQProducer(UUID.randomUUID().toString()); - ClientConfig clientConfig = this.session.getConnection().getClientConfig(); - this.rocketMQProducer.setNamesrvAddr(clientConfig.getNamesrvAddr()); - this.rocketMQProducer.setInstanceName(clientConfig.getInstanceName()); - try { - this.rocketMQProducer.start(); - } - catch (MQClientException e) { - throw new JMSRuntimeException(format("Fail to start producer, error msg:%s", getStackTrace(e))); - } - - this.sendMessageHook = new SendMessageHook(this); - } - - @Override - public void setDisableMessageID(boolean value) throws JMSException { - this.disableMessageID = value; - } - - @Override - public boolean getDisableMessageID() throws JMSException { - return this.disableMessageID; - } - - @Override - public void setDisableMessageTimestamp(boolean value) throws JMSException { - this.disableMessageTimestamp = value; - } - - @Override - public boolean getDisableMessageTimestamp() throws JMSException { - return this.disableMessageTimestamp; - } - - @Override - public void setDeliveryMode(int deliveryMode) throws JMSException { - throw new UnsupportDeliveryModelException(); - } - - @Override - public int getDeliveryMode() throws JMSException { - return this.deliveryMode; - } - - @Override - public void setPriority(int priority) throws JMSException { - this.priority = priority; - } - - @Override - public int getPriority() throws JMSException { - return this.priority; - } - - @Override - public void setTimeToLive(long timeToLive) throws JMSException { - this.timeToLive = timeToLive; - } - - @Override - public long getTimeToLive() throws JMSException { - return this.timeToLive; - } - - @Override - public void setDeliveryDelay(long deliveryDelay) throws JMSException { - this.deliveryDelay = deliveryDelay; - } - - @Override - public long getDeliveryDelay() throws JMSException { - return this.deliveryDelay; - } - - @Override - public Destination getDestination() throws JMSException { - return this.destination; - } - - @Override - public void close() throws JMSException { - this.rocketMQProducer.shutdown(); - } - - @Override - public void send(Message message) throws JMSException { - this.send(this.destination, message); - } - - @Override - public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - this.send(this.destination, message, deliveryMode, priority, timeToLive); - } - - @Override - public void send(Destination destination, Message message) throws JMSException { - this.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive()); - } - - @Override - public void send(Destination destination, Message message, int deliveryMode, int priority, - long timeToLive) throws JMSException { - - sendMessageHook.before(message, destination, deliveryMode, priority, timeToLive); - - MessageExt rmqMsg = createRocketMQMessage(message); - - SendResult sendResult = sendSync(rmqMsg); - if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) { - log.debug("Success to send message[key={}]", rmqMsg.getKeys()); - return; - } - else { - throw new JMSException(format("Sending message error with result status:%s", sendResult.getSendStatus().name())); - } - } - - private SendResult sendSync(org.apache.rocketmq.common.message.Message rmqMsg) throws JMSException { - - try { - return rocketMQProducer.send(rmqMsg); - } - catch (Exception e) { - throw new JMSException(format("Fail to send message. Error: %s", getStackTrace(e))); - } - } - - private void sendAsync(org.apache.rocketmq.common.message.Message rmqMsg, - CompletionListener completionListener) throws JMSException { - try { - rocketMQProducer.send(rmqMsg, new SendCompletionListener(completionListener)); - } - catch (Exception e) { - throw new JMSException(format("Fail to send message. Error: %s", getStackTrace(e))); - } - } - - private MessageExt createRocketMQMessage(Message jmsMsg) throws JMSException { - AbstractJMSMessage abstractJMSMessage = cast2Object(jmsMsg, AbstractJMSMessage.class); - try { - return JMS2RMQMessageConvert.convert(abstractJMSMessage); - } - catch (Exception e) { - throw new JMSException(format("Fail to convert to RocketMQ jmsMsg. Error: %s", getStackTrace(e))); - } - } - - @Override - public void send(Message message, CompletionListener completionListener) throws JMSException { - this.send(this.destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), completionListener); - } - - @Override - public void send(Message message, int deliveryMode, int priority, long timeToLive, - CompletionListener completionListener) throws JMSException { - this.send(this.destination, message, deliveryMode, priority, timeToLive, completionListener); - } - - @Override - public void send(Destination destination, Message message, - CompletionListener completionListener) throws JMSException { - this.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), completionListener); - } - - @Override - public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, - CompletionListener completionListener) throws JMSException { - - sendMessageHook.before(message, destination, deliveryMode, priority, timeToLive); - - MessageExt rmqMsg = createRocketMQMessage(message); - - sendAsync(rmqMsg, completionListener); - } - - public RocketMQSession getSession() { - return session; - } - - public void setSession(RocketMQSession session) { - this.session = session; - } - - public void setRocketMQProducer(DefaultMQProducer rocketMQProducer) { - this.rocketMQProducer = rocketMQProducer; - } - - public void setDestination(Destination destination) { - this.destination = destination; - } - - public void setSendMessageHook(SendMessageHook sendMessageHook) { - this.sendMessageHook = sendMessageHook; - } - - public String getUserName() { - return this.session.getConnection().getUserName(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java deleted file mode 100644 index 0094c47..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import com.google.common.base.Preconditions; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.jms.admin.AdminFactory; -import org.apache.rocketmq.jms.destination.RocketMQQueue; -import org.apache.rocketmq.jms.destination.RocketMQTopic; -import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException; -import org.apache.rocketmq.jms.msg.JMSBytesMessage; -import org.apache.rocketmq.jms.msg.JMSMapMessage; -import org.apache.rocketmq.jms.msg.JMSObjectMessage; -import org.apache.rocketmq.jms.msg.JMSTextMessage; -import org.apache.rocketmq.jms.support.JMSUtils; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.rocketmq.jms.Constant.DEFAULT_DURABLE; -import static org.apache.rocketmq.jms.Constant.DEFAULT_NO_LOCAL; -import static org.apache.rocketmq.jms.Constant.NO_MESSAGE_SELECTOR; - -/** - * Implement of {@link Session}. - */ -public class RocketMQSession implements Session { - - private static final Logger log = LoggerFactory.getLogger(RocketMQSession.class); - - private RocketMQConnection connection; - - private int acknowledgeMode; - - private boolean transacted; - - private ConsumeMessageService consumeMessageService; - - private final List<RocketMQProducer> producerList = new ArrayList(); - - private final List<RocketMQConsumer> consumerList = new ArrayList(); - - private final Set<RocketMQConsumer> asyncConsumerSet = new HashSet(); - - private final Set<RocketMQConsumer> syncConsumerSet = new HashSet(); - - public RocketMQSession(RocketMQConnection connection, int acknowledgeMode, boolean transacted) { - this.connection = connection; - this.acknowledgeMode = acknowledgeMode; - this.transacted = transacted; - - this.consumeMessageService = new ConsumeMessageService(this); - this.consumeMessageService.start(); - } - - @Override - public BytesMessage createBytesMessage() throws JMSException { - return new JMSBytesMessage(); - } - - @Override - public MapMessage createMapMessage() throws JMSException { - return new JMSMapMessage(); - } - - @Override - public Message createMessage() throws JMSException { - return new JMSBytesMessage(); - } - - @Override - public ObjectMessage createObjectMessage() throws JMSException { - return new JMSObjectMessage(); - } - - @Override - public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { - return new JMSObjectMessage(serializable); - } - - @Override - public StreamMessage createStreamMessage() throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public TextMessage createTextMessage() throws JMSException { - return new JMSTextMessage(); - } - - @Override - public TextMessage createTextMessage(String text) throws JMSException { - return new JMSTextMessage(text); - } - - @Override - public boolean getTransacted() throws JMSException { - return this.transacted; - } - - @Override - public int getAcknowledgeMode() throws JMSException { - return this.acknowledgeMode; - } - - @Override - public void commit() throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public void rollback() throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public void close() throws JMSException { - for (RocketMQProducer producer : this.producerList) { - producer.close(); - } - for (RocketMQConsumer consumer : this.consumerList) { - consumer.close(); - } - } - - @Override - public void recover() throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public MessageListener getMessageListener() throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public void setMessageListener(MessageListener listener) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public void run() { - //todo - throw new JMSRuntimeException("Not support yet"); - } - - @Override - public MessageProducer createProducer(Destination destination) throws JMSException { - RocketMQProducer producer = new RocketMQProducer(this, destination); - this.producerList.add(producer); - return producer; - } - - @Override - public MessageConsumer createConsumer(Destination destination) throws JMSException { - return createConsumer(destination, NO_MESSAGE_SELECTOR); - } - - @Override - public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - return createConsumer(destination, messageSelector, DEFAULT_NO_LOCAL); - } - - @Override - public MessageConsumer createConsumer(Destination destination, String messageSelector, - boolean noLocal) throws JMSException { - - // ignore noLocal param as RMQ not support - RocketMQConsumer consumer = new RocketMQConsumer(this, destination, messageSelector, DEFAULT_DURABLE, false); - this.consumerList.add(consumer); - - return consumer; - } - - @Override - public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException { - return createSharedConsumer(topic, sharedSubscriptionName, NO_MESSAGE_SELECTOR); - } - - @Override - public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, - String messageSelector) throws JMSException { - RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, sharedSubscriptionName, DEFAULT_DURABLE, true); - this.consumerList.add(consumer); - - return consumer; - } - - @Override - public Queue createQueue(String queueName) throws JMSException { - return new RocketMQQueue(queueName); - } - - @Override - public Topic createTopic(String topicName) throws JMSException { - Preconditions.checkNotNull(topicName); - - return new RocketMQTopic(topicName); - } - - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { - return createDurableSubscriber(topic, name, NO_MESSAGE_SELECTOR, DEFAULT_NO_LOCAL); - } - - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, - boolean noLocal) throws JMSException { - RocketMQTopicSubscriber subscriber = new RocketMQTopicSubscriber(this, topic, messageSelector, name, true, true); - this.consumerList.add(subscriber); - - return subscriber; - } - - @Override - public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException { - return createDurableConsumer(topic, name, NO_MESSAGE_SELECTOR, true); - } - - @Override - public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, - boolean noLocal) throws JMSException { - DefaultMQAdminExt admin = AdminFactory.getAdmin(this.getConnection().getClientConfig().getNamesrvAddr()); - try { - GroupList groupList = admin.queryTopicConsumeByWho(topic.getTopicName()); - if (groupList.getGroupList().contains(JMSUtils.getConsumerGroup(name, this.getConnection().getClientID(), false))) { - throw new DuplicateSubscriptionException("The same subscription( join subscriptionName with clientID) has existed, so couldn't create consumer on them again "); - } - } - catch (InterruptedException | MQBrokerException | RemotingException | MQClientException e) { - throw new JMSException(ExceptionUtils.getStackTrace(e)); - } - RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, name, true, false); - this.consumerList.add(consumer); - - return consumer; - } - - @Override - public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException { - return createSharedDurableConsumer(topic, name, NO_MESSAGE_SELECTOR); - } - - @Override - public MessageConsumer createSharedDurableConsumer(Topic topic, String name, - String messageSelector) throws JMSException { - RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, name, true, true); - this.consumerList.add(consumer); - - return consumer; - } - - @Override - public QueueBrowser createBrowser(Queue queue) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public TemporaryQueue createTemporaryQueue() throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public TemporaryTopic createTemporaryTopic() throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public void unsubscribe(String name) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - public List<RocketMQProducer> getProducerList() { - return producerList; - } - - public List<RocketMQConsumer> getConsumerList() { - return consumerList; - } - - public RocketMQConnection getConnection() { - return connection; - } - - public boolean isTransacted() { - return transacted; - } - - public void addSyncConsumer(RocketMQConsumer consumer) { - this.syncConsumerSet.add(consumer); - } - - public void addAsyncConsumer(RocketMQConsumer consumer) { - this.asyncConsumerSet.add(consumer); - } - - public boolean isAsyncModel() { - return !this.asyncConsumerSet.isEmpty(); - } - - public boolean isSyncModel() { - return !this.syncConsumerSet.isEmpty(); - } - - public ConsumeMessageService getConsumeMessageService() { - return consumeMessageService; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java deleted file mode 100644 index 51b732b..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import javax.jms.JMSException; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -public class RocketMQTopicSubscriber extends RocketMQConsumer implements TopicSubscriber { - - private Topic topic; - - public RocketMQTopicSubscriber(RocketMQSession session, Topic topic, String messageSelector, - String sharedSubscriptionName, boolean durable, boolean shared) { - super(session, topic, messageSelector, sharedSubscriptionName, durable, shared); - this.topic = topic; - } - - @Override - public Topic getTopic() throws JMSException { - return this.topic; - } - - @Override - public boolean getNoLocal() throws JMSException { - //todo: not inhibit now - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java b/core/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java deleted file mode 100644 index a99a607..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; - -import javax.jms.CompletionListener; - -public class SendCompletionListener implements SendCallback { - - private CompletionListener completionListener; - - public SendCompletionListener(CompletionListener completionListener) { - this.completionListener = completionListener; - } - - @Override - public void onSuccess(SendResult sendResult) { - //todo: how to transmit message into - this.completionListener.onCompletion(null); - } - - @Override - public void onException(Throwable e) { - //todo: how to transmit message into - this.completionListener.onException(null, new Exception(e)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java b/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java deleted file mode 100644 index c551a22..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.admin; - -import java.util.concurrent.ConcurrentHashMap; -import javax.jms.JMSException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.jms.exception.JMSClientException; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; - -public class AdminFactory { - - private static ConcurrentHashMap<String/*nameServerAddress*/, DefaultMQAdminExt> admins = new ConcurrentHashMap(); - - public static DefaultMQAdminExt getAdmin(String nameServerAddress) throws JMSException { - if (nameServerAddress == null) { - throw new IllegalArgumentException("NameServerAddress could be null"); - } - - DefaultMQAdminExt admin = admins.get(nameServerAddress); - if (admin != null) { - return admin; - } - - admin = new DefaultMQAdminExt(nameServerAddress); - try { - admin.start(); - } - catch (MQClientException e) { - throw new JMSClientException("Error during starting admin client"); - } - DefaultMQAdminExt old = admins.putIfAbsent(nameServerAddress, admin); - if (old != null) { - admin.shutdown(); - return old; - } - - return admin; - } -}
