http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/support/PrimitiveTypeCastTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/support/PrimitiveTypeCastTest.java b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/support/PrimitiveTypeCastTest.java new file mode 100644 index 0000000..53ae0da --- /dev/null +++ b/rocketmq-jms/src/test/java/org/apache/rocketmq/jms/support/PrimitiveTypeCastTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +import java.util.Date; +import javax.jms.JMSException; +import org.junit.Test; + +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Boolean; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Byte; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2ByteArray; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Char; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Double; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Float; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Int; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Long; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2Short; +import static org.apache.rocketmq.jms.support.PrimitiveTypeCast.cast2String; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class PrimitiveTypeCastTest { + + @Test + public void testConvert2Boolean() throws Exception { + assertThat(cast2Boolean(new Boolean(true)), is(true)); + assertThat(cast2Boolean(null), is(false)); + + assertThat(cast2Boolean("true"), is(true)); + assertThat(cast2Boolean("hello"), is(false)); + + try { + cast2Boolean(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Byte() throws Exception { + final byte b = Byte.parseByte("101", 2); + assertThat(cast2Byte(b), is(b)); + + assertThat(cast2Byte(new String("5")), is(b)); + try { + assertThat(cast2Byte(null), is(b)); + assertTrue(false); + } + catch (RuntimeException e) { + assertTrue(true); + } + + try { + cast2Byte("abc"); + assertTrue(false); + } + catch (RuntimeException e) { + assertTrue(true); + } + + try { + cast2Byte(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Short() throws Exception { + final Short s = new Short("12"); + assertThat(cast2Short(s), is(s)); + + assertThat(cast2Short("3"), is(new Short("3"))); + + try { + cast2Short(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Char() throws Exception { + final char c = 'a'; + assertThat(cast2Char(c), is(c)); + + try { + cast2Char("a"); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Int() throws Exception { + assertThat(cast2Int(12), is(12)); + + assertThat(cast2Int("12"), is(12)); + assertThat(cast2Int(Byte.parseByte("11", 2)), is(3)); + + try { + cast2Int(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Long() throws Exception { + assertThat(cast2Long(12), is(12l)); + + assertThat(cast2Long("12"), is(12l)); + + try { + cast2Int(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Float() throws Exception { + assertThat(cast2Float(12.00f), is(12f)); + + assertThat(cast2Float("12.00"), is(12f)); + + try { + cast2Float(12); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2Double() throws Exception { + assertThat(cast2Double(12.00d), is(12d)); + + assertThat(cast2Double("12.00"), is(12d)); + assertThat(cast2Double(12.00f), is(12d)); + + try { + cast2Double(12); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2String() throws Exception { + assertThat(cast2String(12.00d), is("12.0")); + + assertThat(cast2String("12.00"), is("12.00")); + assertThat(cast2String(true), is("true")); + + try { + cast2String(new Date()); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } + + @Test + public void testConvert2ByteArray() throws Exception { + byte[] arr = new byte[] {Byte.parseByte("11", 2), Byte.parseByte("101", 2)}; + + assertThat(cast2ByteArray(arr), is(arr)); + + try { + cast2ByteArray("10"); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/style/checkstyle-suppressions.xml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/style/checkstyle-suppressions.xml b/rocketmq-jms/style/checkstyle-suppressions.xml new file mode 100644 index 0000000..0174c40 --- /dev/null +++ b/rocketmq-jms/style/checkstyle-suppressions.xml @@ -0,0 +1,10 @@ +<?xml version="1.0"?> + +<!DOCTYPE suppressions PUBLIC + "-//Puppy Crawl//DTD Suppressions 1.0//EN" + "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd"> + +<suppressions> + <suppress files="LICENSE"/> + <suppress files="NOTICE"/> +</suppressions> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/style/copyright/Apache.xml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/style/copyright/Apache.xml b/rocketmq-jms/style/copyright/Apache.xml new file mode 100644 index 0000000..2db86d0 --- /dev/null +++ b/rocketmq-jms/style/copyright/Apache.xml @@ -0,0 +1,24 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<component name="CopyrightManager"> + <copyright> + <option name="myName" value="Apache"/> + <option name="notice" + value="Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."/> + </copyright> +</component> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/style/copyright/profiles_settings.xml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/style/copyright/profiles_settings.xml b/rocketmq-jms/style/copyright/profiles_settings.xml new file mode 100644 index 0000000..4c0e521 --- /dev/null +++ b/rocketmq-jms/style/copyright/profiles_settings.xml @@ -0,0 +1,64 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<component name="CopyrightManager"> + <settings default="Apache"> + <module2copyright> + <element module="All" copyright="Apache"/> + </module2copyright> + <LanguageOptions name="GSP"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="HTML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="JAVA"> + <option name="fileTypeOverride" value="3"/> + <option name="addBlankAfter" value="false"/> + </LanguageOptions> + <LanguageOptions name="JSP"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="JSPX"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="MXML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="Properties"> + <option name="fileTypeOverride" value="3"/> + <option name="block" value="false"/> + </LanguageOptions> + <LanguageOptions name="SPI"> + <option name="fileTypeOverride" value="3"/> + <option name="block" value="false"/> + </LanguageOptions> + <LanguageOptions name="XML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="__TEMPLATE__"> + <option name="separateBefore" value="true"/> + <option name="lenBefore" value="1"/> + </LanguageOptions> + </settings> +</component> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/style/rmq_checkstyle.xml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/style/rmq_checkstyle.xml b/rocketmq-jms/style/rmq_checkstyle.xml new file mode 100644 index 0000000..b100601 --- /dev/null +++ b/rocketmq-jms/style/rmq_checkstyle.xml @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE module PUBLIC + "-//Puppy Crawl//DTD Check Configuration 1.3//EN" + "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> +<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding --> +<module name="Checker"> + + <property name="localeLanguage" value="en"/> + + <!--To configure the check to report on the first instance in each file--> + <module name="FileTabCharacter"/> + + <!-- header --> + <module name="RegexpHeader"> + <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="System\.out\.println"/> + <property name="message" value="Prohibit invoking System.out.println in source code !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="//FIXME"/> + <property name="message" value="Recommended fix FIXME task !"/> + </module> + + <!--<module name="RegexpSingleline">--> + <!--<property name="format" value="//TODO"/>--> + <!--<property name="message" value="Recommended fix TODO task !"/>--> + <!--</module>--> + + <module name="RegexpSingleline"> + <property name="format" value="@alibaba"/> + <property name="message" value="Recommended remove @alibaba keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@taobao"/> + <property name="message" value="Recommended remove @taobao keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@author"/> + <property name="message" value="Recommended remove @author tag in javadoc!"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" + value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/> + <property name="message" value="Not allow chinese character !"/> + </module> + + <module name="FileLength"> + <property name="max" value="3000"/> + </module> + + <module name="TreeWalker"> + + <module name="UnusedImports"> + <property name="processJavadoc" value="true"/> + </module> + <module name="RedundantImport"/> + + <!--<module name="IllegalImport" />--> + + <!--Checks that classes that override equals() also override hashCode()--> + <module name="EqualsHashCode"/> + <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.--> + <module name="SimplifyBooleanExpression"/> + <module name="OneStatementPerLine"/> + <module name="UnnecessaryParentheses"/> + <!--Checks for over-complicated boolean return statements. For example the following code--> + <module name="SimplifyBooleanReturn"/> + + <!--Check that the default is after all the cases in producerGroup switch statement--> + <module name="DefaultComesLast"/> + <!--Detects empty statements (standalone ";" semicolon)--> + <module name="EmptyStatement"/> + <!--Checks that long constants are defined with an upper ell--> + <module name="UpperEll"/> + <module name="ConstantName"> + <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/> + </module> + <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property--> + <module name="LocalVariableName"/> + <!--Validates identifiers for local, final variables, including catch parameters--> + <module name="LocalFinalVariableName"/> + <!--Validates identifiers for non-static fields--> + <module name="MemberName"/> + <!--Validates identifiers for class type parameters--> + <module name="ClassTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <!--Validates identifiers for method type parameters--> + <module name="MethodTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <module name="PackageName"/> + <module name="ParameterName"/> + <module name="StaticVariableName"/> + <module name="TypeName"/> + <!--Checks that there are no import statements that use the * notation--> + <module name="AvoidStarImport"/> + + <!--whitespace--> + <module name="GenericWhitespace"/> + <module name="NoWhitespaceBefore"/> + <module name="WhitespaceAfter"/> + <module name="NoWhitespaceAfter"/> + <module name="WhitespaceAround"> + <property name="allowEmptyConstructors" value="true"/> + <property name="allowEmptyMethods" value="true"/> + </module> + <module name="Indentation"/> + <module name="MethodParamPad"/> + <module name="ParenPad"/> + <module name="TypecastParenPad"/> + </module> +</module> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/rocketmq-jms/style/rmq_codeStyle.xml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/style/rmq_codeStyle.xml b/rocketmq-jms/style/rmq_codeStyle.xml new file mode 100644 index 0000000..cd95ee6 --- /dev/null +++ b/rocketmq-jms/style/rmq_codeStyle.xml @@ -0,0 +1,157 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<code_scheme name="rocketmq"> + <option name="USE_SAME_INDENTS" value="true"/> + <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/> + <option name="OTHER_INDENT_OPTIONS"> + <value> + <option name="INDENT_SIZE" value="4"/> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + <option name="TAB_SIZE" value="4"/> + <option name="USE_TAB_CHARACTER" value="false"/> + <option name="SMART_TABS" value="false"/> + <option name="LABEL_INDENT_SIZE" value="0"/> + <option name="LABEL_INDENT_ABSOLUTE" value="false"/> + <option name="USE_RELATIVE_INDENTS" value="false"/> + </value> + </option> + <option name="PREFER_LONGER_NAMES" value="false"/> + <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/> + <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/> + <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND"> + <value/> + </option> + <option name="IMPORT_LAYOUT_TABLE"> + <value> + <package name="" withSubpackages="true" static="false"/> + <emptyLine/> + <package name="" withSubpackages="true" static="true"/> + </value> + </option> + <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/> + <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/> + <option name="JD_P_AT_EMPTY_LINES" value="false"/> + <option name="JD_KEEP_INVALID_TAGS" value="false"/> + <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="false"/> + <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/> + <option name="LABELED_STATEMENT_WRAP" value="1"/> + <option name="WRAP_COMMENTS" value="true"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <JavaCodeStyleSettings> + <option name="CLASS_NAMES_IN_JAVADOC" value="3"/> + </JavaCodeStyleSettings> + <XML> + <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/> + </XML> + <ADDITIONAL_INDENT_OPTIONS fileType="haml"> + <option name="INDENT_SIZE" value="2"/> + </ADDITIONAL_INDENT_OPTIONS> + <codeStyleSettings language="Groovy"> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="false"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="HOCON"> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + </codeStyleSettings> + <codeStyleSettings language="JAVA"> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="false"/> + <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/> + <option name="LABELED_STATEMENT_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="JSON"> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + </codeStyleSettings> + <codeStyleSettings language="Scala"> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="INDENT_SIZE" value="4"/> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + <option name="TAB_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="XML"> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> +</code_scheme> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/Constant.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/Constant.java b/src/main/java/org/apache/rocketmq/jms/Constant.java deleted file mode 100644 index 9519bea..0000000 --- a/src/main/java/org/apache/rocketmq/jms/Constant.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -public interface Constant { - - String NO_MESSAGE_SELECTOR = "*"; - - boolean DEFAULT_NO_LOCAL = true; - - boolean DEFAULT_DURABLE = false; - - //-------------------------JMS defined properties constant---------------------------- - /** - * The identity of the user sending the Send message - */ - String JMS_XUSER_ID = "jmsXUserID"; - /** - * The identity of the application Send sending the message - */ - String JMS_XAPP_ID = "jmsXAppID"; - /** - * The number of message delivery Receive attempts - */ - String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount"; - /** - * The identity of the message group this message is part of - */ - String JMS_XGROUP_ID = "jmsXGroupID"; - /** - * The sequence number of this message within the group; the first message is 1, the second 2,... - */ - String JMS_XGROUP_SEQ = "jmsXGroupSeq"; - /** - * The transaction identifier of the Send transaction within which this message was produced - */ - String JMS_XPRODUCER_TXID = "jmsXProducerTXID"; - /** - * The transaction identifier of the Receive transaction within which this message was consumed - */ - String JMS_XCONSUMER_TXID = "jmsXConsumerTXID"; - - /** - * The time JMS delivered the Receive message to the consumer - */ - String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp"; - /** - * Assume there exists a message warehouse that contains a separate copy of each message sent to each consumer and - * that these copies exist from the time the original message was sent. Each copyâs state is one of: 1(waiting), - * 2(ready), 3(expired) or 4(retained) Since state is of no interest to producers and consumers it is not provided - * to either. It is only of relevance to messages looked up in a warehouse and JMS provides no API for this. - */ - String JMS_XSTATE = "jmsXState"; - - //---------------------------JMS Headers' value constant--------------------------- - /** - * Default time to live - */ - long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000; - - /** - * Default Jms Type - */ - String DEFAULT_JMS_TYPE = "rocketmq"; - - String MESSAGE_ID_PREFIX = "ID:"; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java b/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java deleted file mode 100644 index 6187ac2..0000000 --- a/src/main/java/org/apache/rocketmq/jms/ConsumeMessageService.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicLong; -import javax.jms.JMSRuntimeException; -import org.apache.rocketmq.common.ServiceThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConsumeMessageService extends ServiceThread { - - private static final Logger log = LoggerFactory.getLogger(DeliverMessageService.class); - private static final AtomicLong COUNTER = new AtomicLong(0L); - - private BlockingQueue<MessageWrapper> queue = new ArrayBlockingQueue(1000); - private RocketMQSession session; - private final long index = COUNTER.incrementAndGet(); - - public ConsumeMessageService(RocketMQSession session) { - this.session = session; - } - - @Override public String getServiceName() { - return ConsumeMessageService.class.getSimpleName() + "-" + this.index; - } - - @Override public void run() { - while (!this.isStopped()) { - try { - MessageWrapper wrapper = queue.take(); - RocketMQConsumer consumer = wrapper.getConsumer(); - consumer.getMessageListener().onMessage(wrapper.getMessage()); - consumer.getDeliverMessageService().ack(wrapper.getMq(), wrapper.getOffset()); - } - catch (Exception e) { - log.error(e.getMessage(), e); - } - } - } - - public void put(MessageWrapper wrapper) { - try { - this.queue.put(wrapper); - } - catch (InterruptedException e) { - throw new JMSRuntimeException(e.getMessage()); - } - } - - public RocketMQSession getSession() { - return session; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java b/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java deleted file mode 100644 index 356cbd7..0000000 --- a/src/main/java/org/apache/rocketmq/jms/ConsumeModel.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -public enum ConsumeModel { - SYNC, - ASYNC -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java b/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java deleted file mode 100644 index a534bd3..0000000 --- a/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; -import org.apache.rocketmq.client.consumer.PullResult; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.jms.exception.MessageExpiredException; -import org.apache.rocketmq.jms.hook.ReceiveMessageHook; -import org.apache.rocketmq.jms.msg.convert.RMQ2JMSMessageConvert; -import org.apache.rocketmq.jms.support.JMSUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.String.format; - -/** - * Service deliver messages synchronous or asynchronous. - */ -public class DeliverMessageService extends ServiceThread { - - private static final Logger log = LoggerFactory.getLogger(DeliverMessageService.class); - private static final AtomicLong COUNTER = new AtomicLong(0L); - private static final int PULL_BATCH_SIZE = 100; - - private RocketMQConsumer consumer; - private DefaultMQPullConsumer rocketMQPullConsumer; - private Destination destination; - private String consumerGroup; - private String topicName; - private ConsumeModel consumeModel = ConsumeModel.SYNC; - - /** only support RMQ subExpression */ - private String messageSelector; - private ReceiveMessageHook hook = new ReceiveMessageHook(); - - /** - * If durable is true, consume message from the offset consumed last time. - * Otherwise consume from the max offset - */ - private boolean durable = false; - private boolean shared = false; - - private BlockingQueue<MessageWrapper> msgQueue = new ArrayBlockingQueue(PULL_BATCH_SIZE); - private volatile boolean pause = true; - private final long index = COUNTER.incrementAndGet(); - - private Map<MessageQueue, Long> offsetMap = new HashMap(); - - public DeliverMessageService(RocketMQConsumer consumer, Destination destination, String consumerGroup, - String messageSelector, boolean durable, boolean shared) { - this.consumer = consumer; - this.destination = destination; - this.consumerGroup = consumerGroup; - this.messageSelector = messageSelector; - this.durable = durable; - this.shared = shared; - - this.topicName = JMSUtils.getDestinationName(destination); - - createAndStartRocketMQPullConsumer(); - - if (this.consumer.getSession().getConnection().isStarted()) { - this.recover(); - } - else { - this.pause(); - } - } - - private void createAndStartRocketMQPullConsumer() { - final ClientConfig clientConfig = this.consumer.getSession().getConnection().getClientConfig(); - this.rocketMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); - this.rocketMQPullConsumer.setNamesrvAddr(clientConfig.getNamesrvAddr()); - this.rocketMQPullConsumer.setInstanceName(clientConfig.getInstanceName()); - this.rocketMQPullConsumer.setRegisterTopics(new HashSet(Arrays.asList(this.topicName))); - - try { - this.rocketMQPullConsumer.start(); - } - catch (MQClientException e) { - throw new JMSRuntimeException(format("Fail to start RocketMQ pull consumer, error msg:%s", ExceptionUtils.getStackTrace(e))); - } - } - - @Override - public String getServiceName() { - return DeliverMessageService.class.getSimpleName() + "-" + this.index; - } - - @Override - public void run() { - while (!isStopped()) { - if (pause) { - this.waitForRunning(1000); - continue; - } - - try { - pullMessage(); - } - catch (InterruptedException e) { - log.debug("Pulling messages service has been interrupted"); - } - catch (Exception e) { - log.error("Error during pulling messages", e); - } - } - } - - private void pullMessage() throws Exception { - Set<MessageQueue> mqs = getMessageQueues(); - - for (MessageQueue mq : mqs) { - Long offset = offsetMap.get(mq); - if (offset == null) { - offset = beginOffset(mq); - } - PullResult pullResult = this.rocketMQPullConsumer.pullBlockIfNotFound(mq, this.messageSelector, offset, PULL_BATCH_SIZE); - - switch (pullResult.getPullStatus()) { - case FOUND: - List<MessageExt> msgs = pullResult.getMsgFoundList(); - offsetMap.put(mq, pullResult.getMaxOffset()); - for (MessageExt msg : msgs) { - handleMessage(msg, mq); - } - log.debug("Pull {} messages from topic:{},broker:{},queueId:{}", msgs.size(), mq.getTopic(), mq.getBrokerName(), mq.getQueueId()); - break; - case NO_NEW_MSG: - case NO_MATCHED_MSG: - break; - case OFFSET_ILLEGAL: - throw new JMSException("Error during pull message[reason:OFFSET_ILLEGAL]"); - } - } - } - - private Set<MessageQueue> getMessageQueues() throws MQClientException { - Set<MessageQueue> mqs = this.rocketMQPullConsumer.fetchSubscribeMessageQueues(this.topicName); - return mqs; - } - - /** - * Refer to {@link #durable}. - * - * @param mq message queue - * @return offset - * @throws MQClientException - */ - private Long beginOffset(MessageQueue mq) throws MQClientException { - return this.durable ? this.rocketMQPullConsumer.fetchConsumeOffset(mq, false) : this.rocketMQPullConsumer.maxOffset(mq); - } - - /** - * If {@link #consumeModel} is {@link ConsumeModel#ASYNC}, messages pulled from broker - * are handled in {@link ConsumeMessageService} owned by its session. - * - * If {@link #consumeModel} is {@link ConsumeModel#SYNC}, messages pulled from broker are put - * into a memory blocking queue, waiting for the {@link MessageConsumer#receive()} - * using {@link BlockingQueue#poll()} to handle messages synchronous. - * - * @param msg to handle message - * @throws InterruptedException - * @throws JMSException - */ - private void handleMessage(MessageExt msg, MessageQueue mq) throws InterruptedException, JMSException { - Message jmsMessage = RMQ2JMSMessageConvert.convert(msg); - - try { - hook.before(jmsMessage); - } - catch (MessageExpiredException e) { - log.debug(e.getMessage()); - } - - final MessageWrapper wrapper = new MessageWrapper(jmsMessage, this.consumer, mq, msg.getQueueOffset()); - - switch (this.consumeModel) { - case SYNC: - this.msgQueue.put(wrapper); - break; - case ASYNC: - this.consumer.getSession().getConsumeMessageService().put(wrapper); - break; - default: - throw new JMSException(format("Unsupported consume model[%s]", this.consumeModel)); - } - } - - public void ack(MessageQueue mq, Long offset) throws JMSException { - try { - this.rocketMQPullConsumer.updateConsumeOffset(mq, offset); - } - catch (MQClientException e) { - throw new JMSException(format("Fail to ack offset[mq:%s,offset:%s]", mq, offset)); - } - } - - public MessageWrapper poll() throws JMSException { - try { - return this.msgQueue.take(); - } - catch (InterruptedException e) { - throw new JMSException(e.getMessage()); - } - } - - public MessageWrapper poll(long timeout, TimeUnit timeUnit) throws JMSException { - try { - return this.msgQueue.poll(timeout, timeUnit); - } - catch (InterruptedException e) { - throw new JMSException(e.getMessage()); - } - } - - public void pause() { - this.pause = true; - } - - public void recover() { - this.pause = false; - } - - public void close() { - - this.stop(); - - this.rocketMQPullConsumer.shutdown(); - - this.shutdown(true); - - log.debug("Success to close message delivery service:{}", getServiceName()); - } - - public void setConsumeModel(ConsumeModel consumeModel) { - this.consumeModel = consumeModel; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java b/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java deleted file mode 100644 index e322e5b..0000000 --- a/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import javax.jms.Message; -import org.apache.rocketmq.common.message.MessageQueue; - -public class MessageWrapper { - - private Message message; - private RocketMQConsumer consumer; - private MessageQueue mq; - private long offset; - - public MessageWrapper(Message message, RocketMQConsumer consumer, MessageQueue mq, long offset) { - this.message = message; - this.consumer = consumer; - this.mq = mq; - this.offset = offset; - } - - public Message getMessage() { - return message; - } - - public RocketMQConsumer getConsumer() { - return consumer; - } - - public MessageQueue getMq() { - return mq; - } - - public long getOffset() { - return offset; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java b/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java deleted file mode 100644 index 1d34500..0000000 --- a/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Connection; -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.impl.MQClientManager; -import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.String.format; -import static javax.jms.Session.AUTO_ACKNOWLEDGE; -import static javax.jms.Session.SESSION_TRANSACTED; -import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace; - -public class RocketMQConnection implements Connection { - - private static final Logger log = LoggerFactory.getLogger(RocketMQConnection.class); - - private String clientID; - private ClientConfig clientConfig; - private MQClientInstance clientInstance; - private String userName; - private String password; - - private List<RocketMQSession> sessionList = new ArrayList(); - private AtomicBoolean started = new AtomicBoolean(false); - - public RocketMQConnection(String nameServerAddress, String clientID, String instanceName, String userName, - String password) { - this.clientID = clientID; - this.userName = userName; - this.password = password; - - this.clientConfig = new ClientConfig(); - this.clientConfig.setNamesrvAddr(nameServerAddress); - this.clientConfig.setInstanceName(instanceName); - - startClientInstance(); - } - - private void startClientInstance() { - try { - // create a tcp connection to broker and some other background thread - this.clientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.clientConfig); - clientInstance.start(); - } - catch (MQClientException e) { - throw new JMSRuntimeException(format("Fail to startClientInstance connection object[namesrvAddr:%s,instanceName:%s]. Error message:%s", - this.clientConfig.getNamesrvAddr(), this.clientConfig.getInstanceName(), getStackTrace(e))); - } - } - - @Override - public Session createSession() throws JMSException { - return createSession(false, AUTO_ACKNOWLEDGE); - } - - @Override - public Session createSession(int sessionMode) throws JMSException { - if (sessionMode == SESSION_TRANSACTED) { - return createSession(true, Session.AUTO_ACKNOWLEDGE); - } - else { - return createSession(false, sessionMode); - } - } - - @Override - public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { - //todo: support transacted and more acknowledge mode - if (transacted) { - throw new JMSException("Not support local transaction session"); - } - if (acknowledgeMode != AUTO_ACKNOWLEDGE) { - throw new JMSException("Only support AUTO_ACKNOWLEDGE mode now"); - } - - RocketMQSession session = new RocketMQSession(this, acknowledgeMode, transacted); - this.sessionList.add(session); - - return session; - } - - @Override - public ConnectionConsumer createConnectionConsumer(Destination destination, - String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public String getClientID() throws JMSException { - return this.clientID; - } - - @Override - public void setClientID(String clientID) throws JMSException { - if (isNotBlank(this.clientID)) { - throw new IllegalStateException("administratively client identifier has been configured."); - } - this.clientID = clientID; - } - - @Override - public ConnectionMetaData getMetaData() throws JMSException { - return RocketMQConnectionMetaData.instance(); - } - - @Override - public ExceptionListener getExceptionListener() throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public void setExceptionListener(ExceptionListener listener) throws JMSException { - //todo - throw new JMSException("Not support yet"); - } - - @Override - public void start() throws JMSException { - if (this.started.compareAndSet(false, true)) { - for (RocketMQSession session : sessionList) { - for (RocketMQConsumer consumer : session.getConsumerList()) { - consumer.getDeliverMessageService().recover(); - } - } - log.debug("Start connection successfully:{}", toString()); - } - } - - @Override - public void stop() throws JMSException { - if (this.started.compareAndSet(true, false)) { - for (RocketMQSession session : sessionList) { - for (RocketMQConsumer consumer : session.getConsumerList()) { - consumer.getDeliverMessageService().pause(); - } - } - log.debug("Stop connection successfully:{}", toString()); - } - } - - @Override - public void close() throws JMSException { - - for (RocketMQSession session : sessionList) { - session.close(); - } - - this.clientInstance.shutdown(); - - log.info("Success to close connection:{}", toString()); - } - - public boolean isStarted() { - return started.get(); - } - - public ClientConfig getClientConfig() { - return clientConfig; - } - - public String getUserName() { - return userName; - } - - @Override public String toString() { - return new ToStringBuilder(this) - .append("nameServerAddress", this.clientConfig.getNamesrvAddr()) - .append("instanceName", this.clientConfig.getInstanceName()) - .append("clientIdentifier", this.clientID) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java deleted file mode 100644 index 16ce1e0..0000000 --- a/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSContext; -import javax.jms.JMSException; -import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.jms.support.JMSUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implement of {@link ConnectionFactory} using RocketMQ client. - * - * <P>In RocketMQ, all producers and consumers interactive with broker - * by an {@link MQClientInstance} object, which encapsulates tcp connection, - * schedule task and so on. The best way to control the behavior of producers/consumers - * derived from a connection is to manipulate the {@link MQClientInstance} directly. - * - * <P>However, this object is not easy to access as it is maintained within RocketMQ Client. - * Fortunately another equivalent identifier called "instanceName" is provided. - * The "instanceName" is a one-to-one conception with {@link MQClientInstance} object. - * Just like there is a hash map,"instanceName" is the key and a {@link MQClientInstance} - * object is the value. So the essential keyword passed through all objects created by a - * connection is "instanceName". - */ -public class RocketMQConnectionFactory implements ConnectionFactory { - - private static final Logger log = LoggerFactory.getLogger(RocketMQConnectionFactory.class); - - private String nameServerAddress; - - private String clientId; - - public RocketMQConnectionFactory(String nameServerAddress) { - this.nameServerAddress = nameServerAddress; - this.clientId = JMSUtils.uuid(); - } - - public RocketMQConnectionFactory(String nameServerAddress, String clientId) { - this.nameServerAddress = nameServerAddress; - this.clientId = clientId; - } - - @Override - public Connection createConnection() throws JMSException { - return createConnection(null, null); - } - - /** - * Using userName and Password to register a connection. Now access RMQ broker - * is anonymous and any userName/password is legal. - * - * @param userName ignored - * @param password ignored - * @return the new JMS Connection - * @throws JMSException if create connection fail - */ - @Override - public Connection createConnection(String userName, String password) throws JMSException { - return createRocketMQConnection(userName, password); - } - - private Connection createRocketMQConnection(String userName, String password) throws JMSException { - final String instanceName = JMSUtils.uuid(); - RocketMQConnection connection = new RocketMQConnection(this.nameServerAddress, this.clientId, instanceName, userName, password); - - log.info("Create a connection successfully[instanceName:{},clientIdentifier:{},userName:{}", instanceName, clientId, userName); - return connection; - } - - @Override - public JMSContext createContext() { - //todo: - return null; - } - - @Override - public JMSContext createContext(String userName, String password) { - //todo: - return null; - } - - @Override - public JMSContext createContext(String userName, String password, int sessionMode) { - //todo: - return null; - } - - @Override - public JMSContext createContext(int sessionMode) { - //todo: - return null; - } - - public String getClientId() { - return clientId; - } - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - public String getNameServerAddress() { - return nameServerAddress; - } - - public void setNameServerAddress(String nameServerAddress) { - this.nameServerAddress = nameServerAddress; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java deleted file mode 100644 index e4353e1..0000000 --- a/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.Enumeration; -import java.util.Vector; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.jms.ConnectionMetaData; -import javax.jms.JMSException; -import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; -import org.apache.rocketmq.jms.support.ProviderVersion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RocketMQConnectionMetaData implements ConnectionMetaData { - - private static final Logger log = LoggerFactory.getLogger(RocketMQConnectionMetaData.class); - private static final String PROVIDER_NAME = "Apache RocketMQ"; - - private String jmsVersion; - private int jmsMajorVersion; - private int jmsMinorVersion; - - private String providerVersion; - private int providerMajorVersion; - private int providerMinorVersion; - - private static RocketMQConnectionMetaData metaData = new RocketMQConnectionMetaData(); - - private RocketMQConnectionMetaData() { - Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*"); - - String jmsVersion = null; - int jmsMajor = 0; - int jmsMinor = 0; - try { - Package p = Package.getPackage("javax.jms"); - if (p != null) { - jmsVersion = p.getImplementationVersion(); - Matcher m = pattern.matcher(jmsVersion); - if (m.matches()) { - jmsMajor = Integer.parseInt(m.group(1)); - jmsMinor = Integer.parseInt(m.group(2)); - } - } - } - catch (Throwable e) { - log.error("Error during getting jms version", e); - } - - this.jmsVersion = jmsVersion; - this.jmsMajorVersion = jmsMajor; - this.jmsMinorVersion = jmsMinor; - - this.providerVersion = ProviderVersion.CURRENT_VERSION.name(); - this.providerMinorVersion = ProviderVersion.CURRENT_VERSION.getValue(); - this.providerMajorVersion = ProviderVersion.CURRENT_VERSION.getValue(); - } - - public static RocketMQConnectionMetaData instance() { - return metaData; - } - - public String getJMSVersion() throws JMSException { - return jmsVersion; - } - - public int getJMSMajorVersion() throws JMSException { - return jmsMajorVersion; - } - - public int getJMSMinorVersion() throws JMSException { - return jmsMinorVersion; - } - - public String getJMSProviderName() throws JMSException { - return PROVIDER_NAME; - } - - public String getProviderVersion() throws JMSException { - return providerVersion; - } - - public int getProviderMajorVersion() throws JMSException { - return providerMajorVersion; - } - - public int getProviderMinorVersion() throws JMSException { - return providerMinorVersion; - } - - public Enumeration<?> getJMSXPropertyNames() throws JMSException { - Vector<String> result = new Vector<String>(); - for (JMSPropertiesEnum e : JMSPropertiesEnum.values()) { - result.add(e.name()); - } - return result.elements(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java b/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java deleted file mode 100644 index af147e0..0000000 --- a/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import org.apache.rocketmq.jms.support.JMSUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RocketMQConsumer implements MessageConsumer { - - private static final Logger log = LoggerFactory.getLogger(RocketMQConsumer.class); - private RocketMQSession session; - private Destination destination; - private String messageSelector; - private MessageListener messageListener; - private String subscriptionName; - private boolean durable; - private boolean shared; - - private DeliverMessageService deliverMessageService; - - public RocketMQConsumer(RocketMQSession session, Destination destination, - String messageSelector, - boolean durable, boolean shared) { - this(session, destination, messageSelector, UUID.randomUUID().toString(), durable, shared); - } - - public RocketMQConsumer(RocketMQSession session, Destination destination, - String messageSelector, - String subscriptionName, boolean durable, boolean shared) { - this.session = session; - this.destination = destination; - this.messageSelector = messageSelector; - this.subscriptionName = subscriptionName; - this.durable = durable; - this.shared = shared; - - String consumerGroup = JMSUtils.getConsumerGroup(this); - this.deliverMessageService = new DeliverMessageService(this, this.destination, consumerGroup, - this.messageSelector, this.durable, this.shared); - this.deliverMessageService.start(); - } - - @Override - public String getMessageSelector() throws JMSException { - return messageSelector; - } - - @Override - public MessageListener getMessageListener() throws JMSException { - return this.messageListener; - } - - @Override - public void setMessageListener(MessageListener listener) throws JMSException { - if (this.session.isSyncModel()) { - throw new JMSException("A asynchronously call is not permitted when a session is being used synchronously"); - } - - this.messageListener = listener; - this.deliverMessageService.setConsumeModel(ConsumeModel.ASYNC); - this.session.addAsyncConsumer(this); - } - - @Override - public Message receive() throws JMSException { - return this.receive(0); - } - - @Override - public Message receive(long timeout) throws JMSException { - if (this.session.isAsyncModel()) { - throw new JMSException("A synchronous call is not permitted when a session is being used asynchronously."); - } - - this.session.addSyncConsumer(this); - - if (timeout == 0) { - MessageWrapper wrapper = this.deliverMessageService.poll(); - wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), wrapper.getOffset()); - return wrapper.getMessage(); - } - else { - MessageWrapper wrapper = this.deliverMessageService.poll(timeout, TimeUnit.MILLISECONDS); - if (wrapper == null) { - return null; - } - wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), wrapper.getOffset()); - return wrapper.getMessage(); - } - } - - @Override - public Message receiveNoWait() throws JMSException { - return receive(1); - } - - @Override - public void close() throws JMSException { - this.deliverMessageService.close(); - } - - public void start() { - this.deliverMessageService.recover(); - } - - public void stop() { - this.deliverMessageService.pause(); - } - - public DeliverMessageService getDeliverMessageService() { - return deliverMessageService; - } - - public RocketMQSession getSession() { - return session; - } - - public String getSubscriptionName() { - return subscriptionName; - } - - public boolean isDurable() { - return durable; - } - - public boolean isShared() { - return shared; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/725026db/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java b/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java deleted file mode 100644 index 9c8e1d0..0000000 --- a/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms; - -import java.util.UUID; -import javax.jms.CompletionListener; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException; -import org.apache.rocketmq.jms.hook.SendMessageHook; -import org.apache.rocketmq.jms.msg.AbstractJMSMessage; -import org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.String.format; -import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE; -import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Object; - -public class RocketMQProducer implements MessageProducer { - - private static final Logger log = LoggerFactory.getLogger(RocketMQProducer.class); - private RocketMQSession session; - private DefaultMQProducer rocketMQProducer; - private Destination destination; - - private boolean disableMessageID; - private boolean disableMessageTimestamp; - private long timeToLive = JMS_TIME_TO_LIVE_DEFAULT_VALUE; - private int deliveryMode = JMS_DELIVERY_MODE_DEFAULT_VALUE; - private int priority = JMS_PRIORITY_DEFAULT_VALUE; - private long deliveryDelay = JMS_DELIVERY_TIME_DEFAULT_VALUE; - - private SendMessageHook sendMessageHook; - - public RocketMQProducer() { - } - - public RocketMQProducer(RocketMQSession session, Destination destination) { - this.session = session; - this.destination = destination; - - this.rocketMQProducer = new DefaultMQProducer(UUID.randomUUID().toString()); - ClientConfig clientConfig = this.session.getConnection().getClientConfig(); - this.rocketMQProducer.setNamesrvAddr(clientConfig.getNamesrvAddr()); - this.rocketMQProducer.setInstanceName(clientConfig.getInstanceName()); - try { - this.rocketMQProducer.start(); - } - catch (MQClientException e) { - throw new JMSRuntimeException(format("Fail to start producer, error msg:%s", getStackTrace(e))); - } - - this.sendMessageHook = new SendMessageHook(this); - } - - @Override - public void setDisableMessageID(boolean value) throws JMSException { - this.disableMessageID = value; - } - - @Override - public boolean getDisableMessageID() throws JMSException { - return this.disableMessageID; - } - - @Override - public void setDisableMessageTimestamp(boolean value) throws JMSException { - this.disableMessageTimestamp = value; - } - - @Override - public boolean getDisableMessageTimestamp() throws JMSException { - return this.disableMessageTimestamp; - } - - @Override - public void setDeliveryMode(int deliveryMode) throws JMSException { - throw new UnsupportDeliveryModelException(); - } - - @Override - public int getDeliveryMode() throws JMSException { - return this.deliveryMode; - } - - @Override - public void setPriority(int priority) throws JMSException { - this.priority = priority; - } - - @Override - public int getPriority() throws JMSException { - return this.priority; - } - - @Override - public void setTimeToLive(long timeToLive) throws JMSException { - this.timeToLive = timeToLive; - } - - @Override - public long getTimeToLive() throws JMSException { - return this.timeToLive; - } - - @Override - public void setDeliveryDelay(long deliveryDelay) throws JMSException { - this.deliveryDelay = deliveryDelay; - } - - @Override - public long getDeliveryDelay() throws JMSException { - return this.deliveryDelay; - } - - @Override - public Destination getDestination() throws JMSException { - return this.destination; - } - - @Override - public void close() throws JMSException { - this.rocketMQProducer.shutdown(); - } - - @Override - public void send(Message message) throws JMSException { - this.send(this.destination, message); - } - - @Override - public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - this.send(this.destination, message, deliveryMode, priority, timeToLive); - } - - @Override - public void send(Destination destination, Message message) throws JMSException { - this.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive()); - } - - @Override - public void send(Destination destination, Message message, int deliveryMode, int priority, - long timeToLive) throws JMSException { - - sendMessageHook.before(message, destination, deliveryMode, priority, timeToLive); - - MessageExt rmqMsg = createRocketMQMessage(message); - - SendResult sendResult = sendSync(rmqMsg); - if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) { - log.debug("Success to send message[key={}]", rmqMsg.getKeys()); - return; - } - else { - throw new JMSException(format("Sending message error with result status:%s", sendResult.getSendStatus().name())); - } - } - - private SendResult sendSync(org.apache.rocketmq.common.message.Message rmqMsg) throws JMSException { - - try { - return rocketMQProducer.send(rmqMsg); - } - catch (Exception e) { - throw new JMSException(format("Fail to send message. Error: %s", getStackTrace(e))); - } - } - - private void sendAsync(org.apache.rocketmq.common.message.Message rmqMsg, - CompletionListener completionListener) throws JMSException { - try { - rocketMQProducer.send(rmqMsg, new SendCompletionListener(completionListener)); - } - catch (Exception e) { - throw new JMSException(format("Fail to send message. Error: %s", getStackTrace(e))); - } - } - - private MessageExt createRocketMQMessage(Message jmsMsg) throws JMSException { - AbstractJMSMessage abstractJMSMessage = cast2Object(jmsMsg, AbstractJMSMessage.class); - try { - return JMS2RMQMessageConvert.convert(abstractJMSMessage); - } - catch (Exception e) { - throw new JMSException(format("Fail to convert to RocketMQ jmsMsg. Error: %s", getStackTrace(e))); - } - } - - @Override - public void send(Message message, CompletionListener completionListener) throws JMSException { - this.send(this.destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), completionListener); - } - - @Override - public void send(Message message, int deliveryMode, int priority, long timeToLive, - CompletionListener completionListener) throws JMSException { - this.send(this.destination, message, deliveryMode, priority, timeToLive, completionListener); - } - - @Override - public void send(Destination destination, Message message, - CompletionListener completionListener) throws JMSException { - this.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), completionListener); - } - - @Override - public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, - CompletionListener completionListener) throws JMSException { - - sendMessageHook.before(message, destination, deliveryMode, priority, timeToLive); - - MessageExt rmqMsg = createRocketMQMessage(message); - - sendAsync(rmqMsg, completionListener); - } - - public RocketMQSession getSession() { - return session; - } - - public void setSession(RocketMQSession session) { - this.session = session; - } - - public void setRocketMQProducer(DefaultMQProducer rocketMQProducer) { - this.rocketMQProducer = rocketMQProducer; - } - - public void setDestination(Destination destination) { - this.destination = destination; - } - - public void setSendMessageHook(SendMessageHook sendMessageHook) { - this.sendMessageHook = sendMessageHook; - } - - public String getUserName() { - return this.session.getConnection().getUserName(); - } -}
