http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/support/ObjectTypeCastTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/support/ObjectTypeCastTest.java b/src/test/java/org/apache/rocketmq/jms/support/ObjectTypeCastTest.java new file mode 100644 index 0000000..21fc50b --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/support/ObjectTypeCastTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class ObjectTypeCastTest { + + @Test + public void testConvert2String() throws Exception { + assertThat(ObjectTypeCast.cast2String("name"), is("name")); + } + + @Test + public void testConvert2Long() throws Exception { + assertThat(ObjectTypeCast.cast2Long(100l), is(100l)); + } + + @Test + public void testConvert2Integer() throws Exception { + assertThat(ObjectTypeCast.cast2Integer(100), is(100)); + } + + @Test + public void testConvert2Boolean() throws Exception { + assertThat(ObjectTypeCast.cast2Boolean(true), is(true)); + } + + @Test + public void testConvert2Object() throws Exception { + final ObjectTypeCast obj = new ObjectTypeCast(); + assertThat(ObjectTypeCast.cast2Object(obj, ObjectTypeCast.class), is(obj)); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/support/PrimitiveTypeCastTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/support/PrimitiveTypeCastTest.java b/src/test/java/org/apache/rocketmq/jms/support/PrimitiveTypeCastTest.java new file mode 100644 index 0000000..53ae0da --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/support/PrimitiveTypeCastTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +import java.util.Date; +import javax.jms.JMSException; +import org.junit.Test; + +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Boolean; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Byte; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2ByteArray; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Char; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Double; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Float; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Int; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Long; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Short; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2String; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class PrimitiveTypeCastTest { + + @Test + public void testConvert2Boolean() throws Exception { + assertThat(cast2Boolean(new Boolean(true)), is(true)); + assertThat(cast2Boolean(null), is(false)); + + assertThat(cast2Boolean("true"), is(true)); + assertThat(cast2Boolean("hello"), is(false)); + + try { + cast2Boolean(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Byte() throws Exception { + final byte b = Byte.parseByte("101", 2); + assertThat(cast2Byte(b), is(b)); + + assertThat(cast2Byte(new String("5")), is(b)); + try { + assertThat(cast2Byte(null), is(b)); + assertTrue(false); + } + catch (RuntimeException e) { + assertTrue(true); + } + + try { + cast2Byte("abc"); + assertTrue(false); + } + catch (RuntimeException e) { + assertTrue(true); + } + + try { + cast2Byte(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Short() throws Exception { + final Short s = new Short("12"); + assertThat(cast2Short(s), is(s)); + + assertThat(cast2Short("3"), is(new Short("3"))); + + try { + cast2Short(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Char() throws Exception { + final char c = 'a'; + assertThat(cast2Char(c), is(c)); + + try { + cast2Char("a"); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Int() throws Exception { + assertThat(cast2Int(12), is(12)); + + assertThat(cast2Int("12"), is(12)); + assertThat(cast2Int(Byte.parseByte("11", 2)), is(3)); + + try { + cast2Int(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Long() throws Exception { + assertThat(cast2Long(12), is(12l)); + + assertThat(cast2Long("12"), is(12l)); + + try { + cast2Int(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Float() throws Exception { + assertThat(cast2Float(12.00f), is(12f)); + + assertThat(cast2Float("12.00"), is(12f)); + + try { + cast2Float(12); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Double() throws Exception { + assertThat(cast2Double(12.00d), is(12d)); + + assertThat(cast2Double("12.00"), is(12d)); + assertThat(cast2Double(12.00f), is(12d)); + + try { + cast2Double(12); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2String() throws Exception { + assertThat(cast2String(12.00d), is("12.0")); + + assertThat(cast2String("12.00"), is("12.00")); + assertThat(cast2String(true), is("true")); + + try { + cast2String(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2ByteArray() throws Exception { + byte[] arr = new byte[] {Byte.parseByte("11", 2), Byte.parseByte("101", 2)}; + + assertThat(cast2ByteArray(arr), is(arr)); + + try { + cast2ByteArray("10"); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml deleted file mode 100644 index bd23f68..0000000 --- a/test/pom.xml +++ /dev/null @@ -1,70 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>rocketmq-jms-all</artifactId> - <groupId>org.apache.rocketmq</groupId> - <version>1.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>rocketmq-jms-test</artifactId> - - <properties> - <spring.version>4.3.6.RELEASE</spring.version> - </properties> - - <dependencies> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>rocketmq-jms-core</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-jms</artifactId> - <version>${spring.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-test</artifactId> - <version>${spring.version}</version> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-namesrv</artifactId> - <version>${rocketmq.version}</version> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-broker</artifactId> - <version>${rocketmq.version}</version> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java b/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java deleted file mode 100644 index b7b2a43..0000000 --- a/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import javax.jms.ConnectionFactory; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.jms.annotation.EnableJms; -import org.springframework.jms.config.DefaultJmsListenerContainerFactory; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.support.converter.SimpleMessageConverter; - -@Configuration -@ComponentScan(basePackageClasses = {org.apache.rocketmq.jms.integration.RocketMQServer.class}) -@EnableJms -public class AppConfig { - - @Bean - public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { - DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); - factory.setConnectionFactory(connectionFactory()); - factory.setConcurrency("1"); - return factory; - } - - @Bean - public ConnectionFactory connectionFactory() { -// CachingConnectionFactory factory = new CachingConnectionFactory(); -// factory.setTargetConnectionFactory(new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS)); -// return factory; - //todo - return new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS); - } - - @Bean - public JmsTemplate jmsTemplate() { - JmsTemplate jmsTemplate = new JmsTemplate(); - jmsTemplate.setConnectionFactory(connectionFactory()); - jmsTemplate.setMessageConverter(new SimpleMessageConverter()); - return jmsTemplate; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java b/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java deleted file mode 100644 index fd53608..0000000 --- a/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -public class Constant { - - public static final String NAME_SERVER_IP = "127.0.0.1"; - - public static final int NAME_SERVER_PORT = 9153; - - public static final String NAME_SERVER_ADDRESS = NAME_SERVER_IP + ":" + NAME_SERVER_PORT; - - public static final String BROKER_IP = "127.0.0.1"; - - public static final int BROKER_PORT = 9055; - - public static final String BROKER_ADDRESS = BROKER_IP + ":" + BROKER_PORT; - - public static final int BROKER_HA_PORT = 9043; - - public static final String CLIENT_ID = "coffee"; - - public static final String CLIENT_ID_SECOND = "tea"; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java deleted file mode 100644 index 7ee2fbb..0000000 --- a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQAdmin.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import com.google.common.collect.Sets; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import static org.apache.rocketmq.jms.integration.Constant.BROKER_ADDRESS; -import static org.apache.rocketmq.jms.integration.Constant.NAME_SERVER_ADDRESS; - -@Service -public class RocketMQAdmin { - - private static final Logger log = LoggerFactory.getLogger(RocketMQAdmin.class); - - @Autowired - // make sure RocketMQServer start ahead - private RocketMQServer rocketMQServer; - - //MQAdmin client - private DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); - - @PostConstruct - public void start() { - // reduce rebalance waiting time - System.setProperty("rocketmq.client.rebalance.waitInterval", "1000"); - - defaultMQAdminExt.setNamesrvAddr(NAME_SERVER_ADDRESS); - try { - defaultMQAdminExt.start(); - log.info("Start RocketMQAdmin Successfully"); - } - catch (MQClientException e) { - log.error("Failed to start MQAdmin", e); - System.exit(1); - } - } - - @PreDestroy - public void shutdown() { - defaultMQAdminExt.shutdown(); - } - - public void createTopic(String topic) { - createTopic(topic, 1); - } - - public void createTopic(String topic, int queueNum) { - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setTopicName(topic); - topicConfig.setReadQueueNums(queueNum); - topicConfig.setWriteQueueNums(queueNum); - try { - defaultMQAdminExt.createAndUpdateTopicConfig(BROKER_ADDRESS, topicConfig); - } - catch (Exception e) { - log.error("Create topic:{}, addr:{} failed:{}", topic, BROKER_ADDRESS, ExceptionUtils.getStackTrace(e)); - } - } - - public void deleteTopic(String topic) { - try { - defaultMQAdminExt.deleteTopicInBroker(Sets.newHashSet(BROKER_ADDRESS), topic); - } - catch (Exception e) { - log.error("Delete topic:{}, addr:{} failed:{}", topic, BROKER_ADDRESS, ExceptionUtils.getStackTrace(e)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java b/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java deleted file mode 100644 index c2d0f49..0000000 --- a/test/src/main/java/org/apache/rocketmq/jms/integration/RocketMQServer.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.io.File; -import java.text.SimpleDateFormat; -import java.util.Date; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.namesrv.NamesrvConfig; -import org.apache.rocketmq.namesrv.NamesrvController; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import static java.io.File.separator; -import static org.apache.rocketmq.jms.integration.Constant.BROKER_HA_PORT; -import static org.apache.rocketmq.jms.integration.Constant.BROKER_PORT; -import static org.apache.rocketmq.jms.integration.Constant.NAME_SERVER_ADDRESS; - -@Service -public class RocketMQServer { - public static Logger log = LoggerFactory.getLogger(RocketMQServer.class); - private final SimpleDateFormat sf = new SimpleDateFormat("yyyyMMddHHmmss"); - private final String rootDir = System.getProperty("user.home") + separator + "rocketmq-jms" + separator; - // fixed location of config files which is updated after RMQ3.2.6 - private final String configDir = System.getProperty("user.home") + separator + "store/config"; - - private String serverDir; - private volatile boolean started = false; - - //name server - private NamesrvConfig namesrvConfig = new NamesrvConfig(); - private NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); - private NamesrvController namesrvController; - - //broker - private final String brokerName = "JmsTestBrokerName"; - private BrokerController brokerController; - private BrokerConfig brokerConfig = new BrokerConfig(); - private NettyServerConfig nettyServerConfig = new NettyServerConfig(); - private NettyClientConfig nettyClientConfig = new NettyClientConfig(); - private MessageStoreConfig storeConfig = new MessageStoreConfig(); - - public RocketMQServer() { - this.storeConfig.setDiskMaxUsedSpaceRatio(95); - } - - @PostConstruct - public void start() { - if (started) { - return; - } - - createServerDir(); - - startNameServer(); - - startBroker(); - - started = true; - - log.info("Start RocketServer Successfully"); - } - - private void createServerDir() { - for (int i = 0; i < 5; i++) { - serverDir = rootDir + sf.format(new Date()); - final File file = new File(serverDir); - if (!file.exists()) { - return; - } - } - log.error("Has retry 5 times to register base dir,but still failed."); - System.exit(1); - } - - private void startNameServer() { - namesrvConfig.setKvConfigPath(serverDir + separator + "namesrv" + separator + "kvConfig.json"); - nameServerNettyServerConfig.setListenPort(Constant.NAME_SERVER_PORT); - namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); - try { - namesrvController.initialize(); - log.info("Success to start Name Server:{}", NAME_SERVER_ADDRESS); - namesrvController.start(); - } - catch (Exception e) { - log.error("Failed to start Name Server", e); - System.exit(1); - } - System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, NAME_SERVER_ADDRESS); - } - - private void startBroker() { - System.setProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.98"); - brokerConfig.setBrokerName(brokerName); - brokerConfig.setBrokerIP1(Constant.BROKER_IP); - brokerConfig.setNamesrvAddr(NAME_SERVER_ADDRESS); - storeConfig.setStorePathRootDir(serverDir); - storeConfig.setStorePathCommitLog(serverDir + separator + "commitlog"); - storeConfig.setHaListenPort(BROKER_HA_PORT); - nettyServerConfig.setListenPort(BROKER_PORT); - brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); - - try { - brokerController.initialize(); - log.info("Broker Start name:{} address:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); - brokerController.start(); - - } - catch (Exception e) { - log.error("Failed to start Broker", e); - System.exit(1); - } - } - - @PreDestroy - private void shutdown() { - brokerController.shutdown(); - namesrvController.shutdown(); - deleteFile(new File(rootDir)); - deleteFile(new File(configDir)); - } - - public void deleteFile(File file) { - if (!file.exists()) { - return; - } - if (file.isFile()) { - file.delete(); - } - else if (file.isDirectory()) { - File[] files = file.listFiles(); - for (int i = 0; i < files.length; i++) { - deleteFile(files[i]); - } - file.delete(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/main/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListener.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListener.java b/test/src/main/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListener.java deleted file mode 100644 index 8a78027..0000000 --- a/test/src/main/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListener.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.listener; - -import java.util.ArrayList; -import java.util.List; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import org.apache.rocketmq.jms.integration.RocketMQAdmin; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jms.annotation.JmsListener; -import org.springframework.messaging.Message; -import org.springframework.stereotype.Component; - -@Component -public class SimpleTextListener { - - public static final String DESTINATION = "orderTest"; - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - private List<String> receivedMsgs = new ArrayList(); - - public SimpleTextListener() { - } - - @PostConstruct - public void init() { - this.rocketMQAdmin.createTopic(DESTINATION); - } - - @PreDestroy - public void destroy() { - this.rocketMQAdmin.deleteTopic(DESTINATION); - } - - @JmsListener(destination = DESTINATION) - public void processOrder(Message<String> message) { - receivedMsgs.add(message.getPayload()); - } - - public List<String> getReceivedMsg() { - return receivedMsgs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/test/src/main/resources/logback.xml b/test/src/main/resources/logback.xml deleted file mode 100644 index 6757b08..0000000 --- a/test/src/main/resources/logback.xml +++ /dev/null @@ -1,56 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="DefaultAppender" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>${user.home}/logs/rocketmq/jms.log</file> - <append>true</append> - <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> - <fileNamePattern>${user.home}/logs/rocketmq/otherdays/jms.%i.log - </fileNamePattern> - <minIndex>1</minIndex> - <maxIndex>10</maxIndex> - </rollingPolicy> - <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> - <maxFileSize>100MB</maxFileSize> - </triggeringPolicy> - <encoder> - <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern> - <charset class="java.nio.charset.Charset">UTF-8</charset> - </encoder> - </appender> - - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <append>true</append> - <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern> - <charset class="java.nio.charset.Charset">UTF-8</charset> - </encoder> - </appender> - - <logger name="org.apache.rocketmq.jms"> - <level value="INFO"/> - </logger> - - <root> - <level value="ERROR"/> - <appender-ref ref="STDOUT"/> - </root> -</configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java deleted file mode 100644 index ea3fa60..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.apache.rocketmq.jms.integration.support.ConditionMatcher; -import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class ConsumeAsynchronousTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - @Test - public void testConsumeAsynchronous() throws Exception { - final String rmqTopicName = "coffee-async" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connection = factory.createConnection(); - Session session = connection.createSession(); - Topic topic = session.createTopic(rmqTopicName); - - try { - //producer - TextMessage message = session.createTextMessage("mocha coffee,please"); - MessageProducer producer = session.createProducer(topic); - producer.send(message); - - //consumer - final List<Message> received = new ArrayList(); - MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); - consumer.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { - received.add(message); - } - }); - - connection.start(); - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return received.size() == 1; - } - }, 5); - } - finally { - connection.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java deleted file mode 100644 index a7a7e5e..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsNull.notNullValue; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class ConsumeSynchronousTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - @Test - public void testConsumeSynchronous() throws Exception { - final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connection = factory.createConnection(); - Session session = connection.createSession(); - connection.start(); - Topic topic = session.createTopic(rmqTopicName); - - try { - //producer - TextMessage message = session.createTextMessage("a"); - MessageProducer producer = session.createProducer(topic); - producer.send(message); - - //consumer - MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); - - connection.start(); - - Message msg = consumer.receive(); - - assertThat(msg, notNullValue()); - } - finally { - connection.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java deleted file mode 100644 index d222f98..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.apache.rocketmq.jms.integration.support.ConditionMatcher; -import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class NonDurableConsumeTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - /** - * Test messages that producer after consumer inactive will not be delivered to consumer when it start again. - * - * <p>Test step: - * 1. Create a consumer and start the connection - * 2. Create a producer and send a message(msgA) to the topic subscribed by previous consumer - * 3. MsgA should be consumed successfully - * 4. Close the consumer and stop the connection - * 5. Producer sends a message(msgB) after the consumer closed - * 6. Create another consumer which is a non-durable one, and start the connection - * 7. Result: msgB should be consumed by the previous non-durable consumer - * - * @throws Exception - */ - @Test - public void testConsumeNotDurable() throws Exception { - final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connection = factory.createConnection(); - Session session = connection.createSession(); - connection.start(); - Topic topic = session.createTopic(rmqTopicName); - - try { - //consumer - final List<Message> received = new ArrayList(); - final MessageListener msgListener = new MessageListener() { - @Override public void onMessage(Message message) { - received.add(message); - } - }; - MessageConsumer consumer = session.createConsumer(topic); - consumer.setMessageListener(msgListener); - - connection.start(); - - Thread.sleep(1000 * 3); - - //producer - TextMessage message = session.createTextMessage("a"); - MessageProducer producer = session.createProducer(topic); - producer.send(message); - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return received.size() == 1; - } - }, 3); - - received.clear(); - - // close the consumer - connection.stop(); - consumer.close(); - - // send message - TextMessage lostMessage = session.createTextMessage("b"); - producer.send(lostMessage); - - Thread.sleep(1000 * 2); - - // start the non-durable consumer again - consumer = session.createConsumer(topic, "topic"); - consumer.setMessageListener(msgListener); - connection.start(); - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return received.size() == 0; - } - }, 5); - - } - finally { - connection.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java deleted file mode 100644 index a22872a..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.apache.rocketmq.jms.integration.support.ConditionMatcher; -import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class SharedDurableConsumeTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - /** - * Test messages will be deliver to every consumer if these consumers are in shared durable subscription. - * - * <p>Test step: - * 1. Create a share durable consumer(consumerA) via the first connection(connectionA) - * 2. Create a share durable consumer(consumerB) via another connection(connectionB) - * 3. The two consumer must subscribe the same topic with identical subscriptionName, - * and they also have the same clientID. - * 4. Send several(eg:10) messages to this topic - * 5. Result: all messages should be received by both consumerA and consumerB - * - * @throws Exception - */ - @Test - public void testConsumeAllMessages() throws Exception { - final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connectionA = null, connectionB = null; - final String subscriptionName = "MySubscription"; - final List<Message> receivedA = new ArrayList(), receivedB = new ArrayList(); - - try { - // consumerA - connectionA = factory.createConnection(); - Session sessionA = connectionA.createSession(); - connectionA.start(); - Topic topic = sessionA.createTopic(rmqTopicName); - MessageConsumer consumerA = sessionA.createSharedDurableConsumer(topic, subscriptionName); - consumerA.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { - receivedA.add(message); - } - }); - - // consumerB - connectionB = factory.createConnection(); - Session sessionB = connectionB.createSession(); - MessageConsumer consumerB = sessionB.createSharedDurableConsumer(topic, subscriptionName); - consumerB.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { - receivedB.add(message); - } - }); - connectionB.start(); - - //producer - TextMessage message = sessionA.createTextMessage("a"); - MessageProducer producer = sessionA.createProducer(topic); - for (int i = 0; i < 10; i++) { - producer.send(message); - } - - Thread.sleep(1000 * 2); - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return receivedA.size()==10 && receivedB.size()==10; - } - },5); - } - finally { - connectionA.close(); - connectionB.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java deleted file mode 100644 index c0e4e79..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/UnsharedDurableConsumeTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.apache.rocketmq.jms.RocketMQSession; -import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException; -import org.apache.rocketmq.jms.integration.support.ConditionMatcher; -import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class UnsharedDurableConsumeTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - /** - * Test each message will be deliver to only one consumer if these consumers are in unshared durable subscription. - * - * <p>Test step: - * 1. Create a unshared durable consumer(consumerA) via the first connection(connectionA) - * 2. Create a unshared durable consumer(consumerB) via another connection(connectionB) - * 3. Result: - * a. The creating consumerB should throw a JMSException as consumerA and consumberB have the same subscription - * b. All messages should be received by consumerA - * - * @throws Exception - * @see {@link RocketMQSession} - */ - @Test - public void testEachMessageOnlyConsumeByOneConsumer() throws Exception { - final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName, 2); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connectionA = null, connectionB = null; - final String subscriptionName = "MySubscription"; - final List<Message> receivedA = new ArrayList(); - - try { - // consumerA - connectionA = factory.createConnection(); - Session sessionA = connectionA.createSession(); - connectionA.start(); - Topic topic = sessionA.createTopic(rmqTopicName); - MessageConsumer consumerA = sessionA.createDurableConsumer(topic, subscriptionName); - consumerA.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { - receivedA.add(message); - } - }); - - Thread.sleep(1000 * 2); - - // consumerB - try { - connectionB = factory.createConnection(); - Session sessionB = connectionB.createSession(); - sessionB.createDurableConsumer(topic, subscriptionName); - assertFalse("Doesn't get the expected " + DuplicateSubscriptionException.class.getSimpleName(), true); - } - catch (DuplicateSubscriptionException e) { - assertTrue(true); - } - - connectionA.start(); - - //producer - TextMessage message = sessionA.createTextMessage("a"); - MessageProducer producer = sessionA.createProducer(topic); - for (int i = 0; i < 10; i++) { - producer.send(message); - } - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return receivedA.size() == 10; - } - }, 5); - } - finally { - connectionA.close(); - connectionB.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java deleted file mode 100644 index 8518ab5..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/listener/SimpleTextListenerTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.listener; - -import org.apache.commons.lang.time.StopWatch; -import org.apache.rocketmq.jms.integration.AppConfig; -import org.apache.rocketmq.jms.integration.support.ConditionMatcher; -import org.apache.rocketmq.jms.integration.support.TimeLimitAssert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringRunner; - -import static org.apache.rocketmq.jms.integration.listener.SimpleTextListener.DESTINATION; - -@RunWith(SpringRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class SimpleTextListenerTest { - - private static final Logger log = LoggerFactory.getLogger(SimpleTextListenerTest.class); - - @Autowired - private JmsTemplate jmsTemplate; - - @Autowired - private SimpleTextListener simpleTextListener; - - @Test - public void testListener() throws Exception { - jmsTemplate.convertAndSend(DESTINATION, "first"); - StopWatch watch = new StopWatch(); - watch.start(); - - TimeLimitAssert.doAssert(new ConditionMatcher() { - @Override public boolean match() { - return simpleTextListener.getReceivedMsg().size() == 1; - } - }, 60); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java b/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java deleted file mode 100644 index 22d7683..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/support/ConditionMatcher.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.support; - -public interface ConditionMatcher { - - boolean match(); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java b/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java deleted file mode 100644 index 6877cbd..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/support/TimeLimitAssert.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration.support; - -import org.apache.commons.lang.time.StopWatch; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class TimeLimitAssert { - - public static void doAssert(ConditionMatcher conditionMatcher, int timeLimit) throws InterruptedException { - StopWatch watch = new StopWatch(); - watch.start(); - - while (!conditionMatcher.match()) { - Thread.sleep(500); - if (watch.getTime() > timeLimit * 1000) { - assertFalse(String.format("Doesn't match assert condition in %s second", timeLimit), true); - } - } - - assertTrue(true); - } -}
