Release rocketmq-mysql 1.1.0 version
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/a0aeee62 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/a0aeee62 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/a0aeee62 Branch: refs/heads/release-rocketmq-mysql-1.1.0 Commit: a0aeee629f4dc3fbb86ccbf3408011092c719b1e Parents: Author: yukon <yu...@apache.org> Authored: Tue Aug 22 22:08:24 2017 +0800 Committer: yukon <yu...@apache.org> Committed: Tue Aug 22 22:08:24 2017 +0800 ---------------------------------------------------------------------- .gitignore | 1 + README.md | 26 ++ rocketmq-mysql/.gitignore | 14 + rocketmq-mysql/LICENSE | 201 +++++++++++++ rocketmq-mysql/LICENSE-BIN | 301 +++++++++++++++++++ rocketmq-mysql/NOTICE | 5 + rocketmq-mysql/NOTICE-BIN | 5 + rocketmq-mysql/README.md | 42 +++ rocketmq-mysql/doc/dataflow.png | Bin 0 -> 28277 bytes rocketmq-mysql/doc/overview.png | Bin 0 -> 35155 bytes rocketmq-mysql/pom.xml | 275 +++++++++++++++++ rocketmq-mysql/src/main/assembly/assembly.xml | 61 ++++ .../src/main/assembly/scripts/start.sh | 23 ++ .../src/main/assembly/scripts/stop.sh | 18 ++ .../java/org/apache/rocketmq/mysql/Config.java | 130 ++++++++ .../org/apache/rocketmq/mysql/Replicator.java | 129 ++++++++ .../apache/rocketmq/mysql/binlog/DataRow.java | 76 +++++ .../rocketmq/mysql/binlog/EventListener.java | 65 ++++ .../rocketmq/mysql/binlog/EventProcessor.java | 285 ++++++++++++++++++ .../rocketmq/mysql/binlog/Transaction.java | 88 ++++++ .../rocketmq/mysql/position/BinlogPosition.java | 47 +++ .../mysql/position/BinlogPositionLogThread.java | 47 +++ .../mysql/position/BinlogPositionManager.java | 149 +++++++++ .../mysql/productor/RocketMQProducer.java | 52 ++++ .../apache/rocketmq/mysql/schema/Database.java | 104 +++++++ .../apache/rocketmq/mysql/schema/Schema.java | 126 ++++++++ .../org/apache/rocketmq/mysql/schema/Table.java | 58 ++++ .../mysql/schema/column/BigIntColumnParser.java | 50 +++ .../mysql/schema/column/ColumnParser.java | 71 +++++ .../schema/column/DateTimeColumnParser.java | 53 ++++ .../schema/column/DefaultColumnParser.java | 37 +++ .../mysql/schema/column/EnumColumnParser.java | 46 +++ .../mysql/schema/column/IntColumnParser.java | 66 ++++ .../mysql/schema/column/SetColumnParser.java | 54 ++++ .../mysql/schema/column/StringColumnParser.java | 57 ++++ .../mysql/schema/column/TimeColumnParser.java | 39 +++ .../mysql/schema/column/YearColumnParser.java | 40 +++ rocketmq-mysql/src/main/resources/logback.xml | 79 +++++ .../src/main/resources/rocketmq_mysql.conf | 28 ++ .../rocketmq/mysql/BigIntColumnParserTest.java | 37 +++ .../rocketmq/mysql/EnumColumnParserTest.java | 37 +++ .../rocketmq/mysql/IntColumnParserTest.java | 54 ++++ .../rocketmq/mysql/SetColumnParserTest.java | 36 +++ rocketmq-mysql/style/copyright/Apache.xml | 23 ++ .../style/copyright/profiles_settings.xml | 64 ++++ rocketmq-mysql/style/rmq_checkstyle.xml | 134 +++++++++ rocketmq-mysql/style/rmq_codeStyle.xml | 143 +++++++++ 47 files changed, 3476 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md new file mode 100644 index 0000000..8cd617b --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +# RocketMQ Externals + +There are many Apache RocketMQ external projects contributed and maintained by community. + +## RocketMQ-Console +A newly designed RocketMQ's console using spring-boot. + +## RocketMQ-JMS +RocketMQ's JMS 1.1 spec. implementation. + +## RocketMQ-Flume +Flume RocketMQ source and sink implementation. + + +## RocketMQ-Spark + +Integration of Apache Spark-Streaming and Apache RocketMQ. Both push & pull consumers are provided. For more details please refer to [README](https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-spark). + +## RocketMQ-Docker +Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image. + +## RocketMQ-MySQL +This project is a data replicator between MySQL and other systems.For more details please refer to [README](https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-mysql). + +## Others +[RocketMQ-Druid](https://github.com/druid-io/druid/tree/master/extensions-contrib/druid-rocketmq), [RocketMQ-Ignite](https://github.com/apache/ignite/tree/master/modules/rocketmq) and [RocketMQ-Storm](https://github.com/apache/storm/tree/master/external/storm-rocketmq) integration can be found in those repositories. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/.gitignore ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/.gitignore b/rocketmq-mysql/.gitignore new file mode 100644 index 0000000..3311eab --- /dev/null +++ b/rocketmq-mysql/.gitignore @@ -0,0 +1,14 @@ +*dependency-reduced-pom.xml +.classpath +.project +.settings/ +target/ +devenv +*.log* +*.iml +.idea/ +*.versionsBackup +*bin +!NOTICE-BIN +!LICENSE-BIN +.DS_Store \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/LICENSE ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/LICENSE b/rocketmq-mysql/LICENSE new file mode 100644 index 0000000..b67d909 --- /dev/null +++ b/rocketmq-mysql/LICENSE @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/LICENSE-BIN ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN new file mode 100644 index 0000000..22b0aa4 --- /dev/null +++ b/rocketmq-mysql/LICENSE-BIN @@ -0,0 +1,301 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + +------ +This product has a bundle logback, which is available under the EPL v1.0 License. +The source code of logback can be found at https://github.com/qos-ch/logback. + +Logback LICENSE +--------------- + +Logback: the reliable, generic, fast and flexible logging framework. +Copyright (C) 1999-2015, QOS.ch. All rights reserved. + +This program and the accompanying materials are dual-licensed under +either the terms of the Eclipse Public License v1.0 as published by +the Eclipse Foundation + + or (per the licensee's choosing) + +under the terms of the GNU Lesser General Public License version 2.1 +as published by the Free Software Foundation. + +------ +This product has a bundle slf4j, which is available under the MIT License. +The source code of slf4j can be found at https://github.com/qos-ch/slf4j. + + Copyright (c) 2004-2017 QOS.ch + All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +------ +This product has a bundle fastjson, which is available under the ASL2 License. +The source code of fastjson can be found at https://github.com/alibaba/fastjson. + + Copyright 1999-2017 Alibaba Group Holding Ltd. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +------ + This product has a bundle druid, which is available under the ASL2 License. + The source code of druid can be found at https://github.com/alibaba/druid. + + Copyright 1999-2017 Alibaba Group Holding Ltd. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +------ +This product has a bundle commons-codec, which is available under the ASL2 License. +The source code of commons-codec can be found at http://svn.apache.org/viewvc/commons/proper/codec/trunk/. + +Apache Commons Codec +Copyright 2002-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java +contains test data from http://aspell.net/test/orig/batch0.tab. +Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org) + +------ +This product has a bundle mysql-binlog-connector-java, which is available under the ASL2 License. +The source code of mysql-binlog-connector-java can be found at https://github.com/shyiko/mysql-binlog-connector-java. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/NOTICE ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/NOTICE b/rocketmq-mysql/NOTICE new file mode 100644 index 0000000..5384857 --- /dev/null +++ b/rocketmq-mysql/NOTICE @@ -0,0 +1,5 @@ +Apache RocketMQ (incubating) +Copyright 2016-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/NOTICE-BIN ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/NOTICE-BIN b/rocketmq-mysql/NOTICE-BIN new file mode 100644 index 0000000..5384857 --- /dev/null +++ b/rocketmq-mysql/NOTICE-BIN @@ -0,0 +1,5 @@ +Apache RocketMQ (incubating) +Copyright 2016-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/README.md ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/README.md b/rocketmq-mysql/README.md new file mode 100644 index 0000000..65efb05 --- /dev/null +++ b/rocketmq-mysql/README.md @@ -0,0 +1,42 @@ +# RocketMQ-MySQL + + +## Overview +![overview](./doc/overview.png) + +The RocketMQ-MySQL is a data replicator between MySQL and other systems. The replicator simulates a MySQL slave instance, parses the binlog event +and sends it to RocketMQ in json format. Besides MySQL, other systems can also consume data from RocketMQ. With the RocketMQ-MySQL Replicator, more systems can easily process data from MySQL binlog at a very low cost. + +## Dataflow +![dataflow](./doc/dataflow.png) + +* 1. Firstly, get the last data from the queue, and get the binlog position from this + data. If the data queue is empty, use the latest binlog position of MySQL. Besides that, customized setting of position of the wanted binlog is also supported. +* 2. Send a binlog dump request to MySQL. +* 3. MySQL pushes binlog event to the replicator. The replicator parses the data and accumulates it as a transaction-object. +* 4. Add the next-position of the transaction to the transaction-object and send it in json format to the queue. +* 5. Record the binlog position and the offset in the queue of the latest transaction every second. + + +## Quick Start + +* 1. Create an account with MySQL replication permission, which is used to simulate the MySQL slave to get the binlog event, and the replication must be in row mode. +* 2. Create a topic in the RocketMQ to store binlog events to ensure that the downstream system consumes the data in order. Make sure the topic must have only one queue. +* 3. Configure the relevant information of MySQL and RocketMQ in the RocketMQ-MySQL.conf file. +* 4. Execute "mvn install", then start the replicator (via execute "nohup ./start.sh &"). +* 5. Subscribe and process the messages. + + +## Configuration Instruction +|key |nullable|default |description| +|------------------|--------|-----------|-----------| +|mysqlAddr |false | |MySQL address| +|mysqlPort |false | |MySQL port| +|mysqlUsername |false | |username of MySQL account| +|mysqlPassword |false | |password of MySQL account| +|mqNamesrvAddr |false | |RocketMQ name server address (e.g.,127.0.0.1:9876)| +|mqTopic |false | |RocketMQ topic name| +|startType |true |DEFAULT |The way that the replicator starts processing data,there are four options available:<br>- DEFAULT: try to start processing data in the "LAST_PROCESSED" way,if failed, then in the "NEW_EVENT" way<br>- LAST_PROCESSED: starts processing data from the last processed event<br>- NEW_EVENT: starts processing data from the tail of binlog<br>- SPECIFIED: starts processing data from the position that user specified,if you choose this option,the binlogFilename and nextPosition must not be null| +|binlogFilename |true | |If "startType" is "SPECIFIED",the replicator will begin to replicate from this binlog file| +|nextPosition |true | |If "startType" is "SPECIFIED",the replicator will begin to replicate from this position| +|maxTransactionRows|true |100 |max rows of the transaction pushed to RocketMQ| \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/doc/dataflow.png ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/doc/dataflow.png b/rocketmq-mysql/doc/dataflow.png new file mode 100644 index 0000000..ed12b52 Binary files /dev/null and b/rocketmq-mysql/doc/dataflow.png differ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/doc/overview.png ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/doc/overview.png b/rocketmq-mysql/doc/overview.png new file mode 100644 index 0000000..0a3ec82 Binary files /dev/null and b/rocketmq-mysql/doc/overview.png differ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml new file mode 100644 index 0000000..23e7468 --- /dev/null +++ b/rocketmq-mysql/pom.xml @@ -0,0 +1,275 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache</groupId> + <artifactId>rocketmq-mysql-replicator</artifactId> + <version>1.1.0</version> + + <scm> + <url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</url> + <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</connection> + <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git + </developerConnection> + <tag>HEAD</tag> + </scm> + + <mailingLists> + <mailingList> + <name>Development List</name> + <subscribe>dev-subscr...@rocketmq.incubator.apache.org</subscribe> + <unsubscribe>dev-unsubscr...@rocketmq.incubator.apache.org</unsubscribe> + <post>d...@rocketmq.incubator.apache.org</post> + </mailingList> + <mailingList> + <name>User List</name> + <subscribe>users-subscr...@rocketmq.incubator.apache.org</subscribe> + <unsubscribe>users-unsubscr...@rocketmq.incubator.apache.org</unsubscribe> + <post>us...@rocketmq.incubator.apache.org</post> + </mailingList> + <mailingList> + <name>Commits List</name> + <subscribe>commits-subscr...@rocketmq.incubator.apache.org</subscribe> + <unsubscribe>commits-unsubscr...@rocketmq.incubator.apache.org</unsubscribe> + <post>commits@rocketmq.incubator.apache.org</post> + </mailingList> + </mailingLists> + + <developers> + <developer> + <id>Apache RocketMQ</id> + <name>Apache RocketMQ of ASF</name> + <url>https://rocketmq.apache.org/</url> + </developer> + </developers> + + <licenses> + <license> + <name>Apache License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0</url> + <distribution>repo</distribution> + </license> + </licenses> + + <organization> + <name>Apache Software Foundation</name> + <url>http://www.apache.org</url> + </organization> + + <issueManagement> + <system>jira</system> + <url>https://issues.apache.org/jira/browse/RocketMQ</url> + </issueManagement> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + + <maven.test.skip>false</maven.test.skip> + <maven.javadoc.skip>true</maven.javadoc.skip> + <maven.compiler.source>1.7</maven.compiler.source> + <maven.compiler.target>1.7</maven.compiler.target> + <rocketmq.version>4.0.0-incubating</rocketmq.version> + </properties> + + + <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>com.github.shyiko</groupId> + <artifactId>mysql-binlog-connector-java</artifactId> + <version>0.12.1</version> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>6.0.6</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.5</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>druid</artifactId> + <version>1.0.31</version> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.9</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <finalName>rocketmq-mysql</finalName> + <sourceDirectory>${project.basedir}/src/main/java</sourceDirectory> + <outputDirectory>${project.basedir}/target/classes</outputDirectory> + <resources> + <resource> + <directory>${project.basedir}/src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.8</version> + </plugin> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.4.1</version> + <executions> + <execution> + <id>enforce-ban-circular-dependencies</id> + <goals> + <goal>enforce</goal> + </goals> + </execution> + </executions> + <configuration> + <rules> + <banCircularDependencies/> + </rules> + <fail>true</fail> + </configuration> + <dependencies> + <dependency> + <groupId>org.codehaus.mojo</groupId> + <artifactId>extra-enforcer-rules</artifactId> + <version>1.0-beta-6</version> + </dependency> + </dependencies> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.5.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + </configuration> + <executions> + <execution> + <id>attach-javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <version>0.12</version> + <configuration> + <excludes> + <exclude>README.md</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <executions> + <execution> + <id>verify</id> + <phase>verify</phase> + <configuration> + <configLocation>style/rmq_checkstyle.xml</configLocation> + <encoding>UTF-8</encoding> + <consoleOutput>true</consoleOutput> + <failsOnError>true</failsOnError> + <includeTestSourceDirectory>false</includeTestSourceDirectory> + <includeTestResources>false</includeTestResources> + </configuration> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <configuration> + <archive> + <addMavenDescriptor>false</addMavenDescriptor> + <manifest> + <addClasspath>true</addClasspath> + <classpathPrefix>lib/</classpathPrefix> + </manifest> + </archive> + <excludes> + <exclude>rocketmq_mysql.conf</exclude> + <exclude>logback.xml</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.4</version> + <configuration> + <descriptors> + <descriptor>src/main/assembly/assembly.xml</descriptor> + </descriptors> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/assembly/assembly.xml ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/assembly/assembly.xml b/rocketmq-mysql/src/main/assembly/assembly.xml new file mode 100644 index 0000000..b280aa6 --- /dev/null +++ b/rocketmq-mysql/src/main/assembly/assembly.xml @@ -0,0 +1,61 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>pack</id> + <formats> + <format>tar.gz</format> + <format>dir</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <useProjectArtifact>true</useProjectArtifact> + <outputDirectory>lib</outputDirectory> + </dependencySet> + </dependencySets> + <fileSets> + <fileSet> + <directory>src/main/assembly/scripts</directory> + <outputDirectory>bin</outputDirectory> + <fileMode>0755</fileMode> + </fileSet> + <fileSet> + <directory>target/classes</directory> + <outputDirectory>conf</outputDirectory> + <fileMode>0755</fileMode> + <includes> + <include>*.conf</include> + <include>logback.xml</include> + </includes> + </fileSet> + </fileSets> + + <files> + <file> + <source>LICENSE-BIN</source> + <destName>LICENSE</destName> + </file> + <file> + <source>NOTICE-BIN</source> + <destName>NOTICE</destName> + </file> + </files> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/assembly/scripts/start.sh ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/assembly/scripts/start.sh b/rocketmq-mysql/src/main/assembly/scripts/start.sh new file mode 100644 index 0000000..e159f36 --- /dev/null +++ b/rocketmq-mysql/src/main/assembly/scripts/start.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +binPath=$(cd "$(dirname "$0")"; pwd); +cd $binPath +cd .. +parentPath=`pwd` +libPath=$parentPath/lib/ + + +function exportClassPath(){ + jarFileList=`find "$libPath" -name *.jar |awk -F'/' '{print $(NF)}' 2>>/dev/null` + CLASSPATH=".:$binPath"; + for jarItem in $jarFileList + do + CLASSPATH="$CLASSPATH:$libPath$jarItem" + done + CLASSPATH=$CLASSPATH:./conf + export CLASSPATH +} +ulimit -n 65535 +exportClassPath + +java -server -Xms512m -Xmx512m -Xss2m -XX:NewRatio=2 -XX:+UseGCOverheadLimit -XX:-UseParallelGC -XX:ParallelGCThreads=24 org.apache.rocketmq.mysql.Replicator http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/assembly/scripts/stop.sh ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/assembly/scripts/stop.sh b/rocketmq-mysql/src/main/assembly/scripts/stop.sh new file mode 100755 index 0000000..f0e3c0d --- /dev/null +++ b/rocketmq-mysql/src/main/assembly/scripts/stop.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +PROGRAM_NAME="org.apache.rocketmq.mysql.Replicator" +PIDS=`ps -ef | grep $PROGRAM_NAME | grep -v "grep" | awk '{print $2}'` + +if [ -z $PIDS ]; then + echo "No this process." +else + echo "Find process is $PIDS." +fi + +#####kill#### +echo -e "Stopping the $PROGRAM_NAME...\c" +for PID in $PIDS ; do + kill $PID +done + +echo "SUCCESS!" http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java new file mode 100644 index 0000000..6c14cb4 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; +import java.util.Properties; + + +public class Config { + + public String mysqlAddr; + public Integer mysqlPort; + public String mysqlUsername; + public String mysqlPassword; + + public String mqNamesrvAddr; + public String mqTopic; + + public String startType = "DEFAULT"; + public String binlogFilename; + public Long nextPosition; + public Integer maxTransactionRows = 100; + + public void load() throws IOException { + + InputStream in = Config.class.getClassLoader().getResourceAsStream("rocketmq_mysql.conf"); + Properties properties = new Properties(); + properties.load(in); + + properties2Object(properties, this); + + } + + private void properties2Object(final Properties p, final Object object) { + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getProperty(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } + + public void setMysqlAddr(String mysqlAddr) { + this.mysqlAddr = mysqlAddr; + } + + public void setMysqlPort(Integer mysqlPort) { + this.mysqlPort = mysqlPort; + } + + public void setMysqlUsername(String mysqlUsername) { + this.mysqlUsername = mysqlUsername; + } + + public void setMysqlPassword(String mysqlPassword) { + this.mysqlPassword = mysqlPassword; + } + + public void setBinlogFilename(String binlogFilename) { + this.binlogFilename = binlogFilename; + } + + public void setNextPosition(Long nextPosition) { + this.nextPosition = nextPosition; + } + + public void setMaxTransactionRows(Integer maxTransactionRows) { + this.maxTransactionRows = maxTransactionRows; + } + + public void setMqNamesrvAddr(String mqNamesrvAddr) { + this.mqNamesrvAddr = mqNamesrvAddr; + } + + public void setMqTopic(String mqTopic) { + this.mqTopic = mqTopic; + } + + public void setStartType(String startType) { + this.startType = startType; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java new file mode 100644 index 0000000..ae3c984 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql; + +import org.apache.rocketmq.mysql.binlog.EventProcessor; +import org.apache.rocketmq.mysql.binlog.Transaction; +import org.apache.rocketmq.mysql.position.BinlogPositionLogThread; +import org.apache.rocketmq.mysql.productor.RocketMQProducer; +import org.apache.rocketmq.mysql.position.BinlogPosition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Replicator { + + private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class); + + private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger"); + + private Config config; + + private EventProcessor eventProcessor; + + private RocketMQProducer rocketMQProducer; + + private Object lock = new Object(); + private BinlogPosition nextBinlogPosition; + private long nextQueueOffset; + private long xid; + + public static void main(String[] args) { + + Replicator replicator = new Replicator(); + replicator.start(); + } + + public void start() { + + try { + config = new Config(); + config.load(); + + rocketMQProducer = new RocketMQProducer(config); + rocketMQProducer.start(); + + BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this); + binlogPositionLogThread.start(); + + eventProcessor = new EventProcessor(this); + eventProcessor.start(); + + } catch (Exception e) { + LOGGER.error("Start error.", e); + System.exit(1); + } + } + + public void commit(Transaction transaction, boolean isComplete) { + + String json = transaction.toJson(); + + for (int i = 0; i < 3; i++) { + try { + if (isComplete) { + long offset = rocketMQProducer.push(json); + + synchronized (lock) { + xid = transaction.getXid(); + nextBinlogPosition = transaction.getNextBinlogPosition(); + nextQueueOffset = offset; + } + + } else { + rocketMQProducer.push(json); + } + break; + + } catch (Exception e) { + LOGGER.error("Push error,retry:" + (i + 1) + ",", e); + } + } + } + + public void logPosition() { + + String binlogFilename = null; + long xid = 0L; + long nextPosition = 0L; + long nextOffset = 0L; + + synchronized (lock) { + if (nextBinlogPosition != null) { + xid = this.xid; + binlogFilename = nextBinlogPosition.getBinlogFilename(); + nextPosition = nextBinlogPosition.getPosition(); + nextOffset = nextQueueOffset; + } + } + + if (binlogFilename != null) { + POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}", + xid, binlogFilename, nextPosition, nextOffset); + } + + } + + public Config getConfig() { + return config; + } + + public BinlogPosition getNextBinlogPosition() { + return nextBinlogPosition; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java new file mode 100644 index 0000000..646c018 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.binlog; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.mysql.schema.Table; +import org.apache.rocketmq.mysql.schema.column.ColumnParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataRow { + private Logger logger = LoggerFactory.getLogger(DataRow.class); + + private String type; + private Table table; + private Serializable[] row; + + public DataRow(String type, Table table, Serializable[] row) { + this.type = type; + this.table = table; + this.row = row; + } + + public Map toMap() { + + try { + if (table.getColList().size() == row.length) { + Map<String, Object> dataMap = new HashMap<>(); + List<String> keyList = table.getColList(); + List<ColumnParser> parserList = table.getParserList(); + + for (int i = 0; i < keyList.size(); i++) { + Object value = row[i]; + ColumnParser parser = parserList.get(i); + dataMap.put(keyList.get(i), parser.getValue(value)); + } + + Map<String, Object> map = new HashMap<>(); + map.put("database", table.getDatabase()); + map.put("table", table.getName()); + map.put("type", type); + map.put("data", dataMap); + + return map; + } else { + logger.error("Table schema changed,discard data: {} - {}, {} {}", + table.getDatabase().toUpperCase(), table.getName().toUpperCase(), type, row.toString()); + + return null; + } + } catch (Exception e) { + logger.error("Row parse error,discard data: {} - {}, {} {}", + table.getDatabase().toUpperCase(), table.getName().toUpperCase(), type, row.toString()); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java new file mode 100644 index 0000000..b5005bc --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.binlog; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.Event; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +public class EventListener implements BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener { + + private BlockingQueue<Event> queue; + + public EventListener(BlockingQueue<Event> queue) { + this.queue = queue; + } + + @Override + public void onEvent(Event event) { + try { + while (true) { + if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) { + return; + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void onConnect(BinaryLogClient client) { + + } + + @Override + public void onCommunicationFailure(BinaryLogClient client, Exception e) { + + } + + @Override + public void onEventDeserializationFailure(BinaryLogClient client, Exception e) { + + } + + @Override + public void onDisconnect(BinaryLogClient client) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java new file mode 100644 index 0000000..a730403 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.binlog; + +import com.alibaba.druid.pool.DruidDataSourceFactory; +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import com.github.shyiko.mysql.binlog.event.XidEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import javax.sql.DataSource; +import org.apache.rocketmq.mysql.Config; +import org.apache.rocketmq.mysql.Replicator; +import org.apache.rocketmq.mysql.position.BinlogPosition; +import org.apache.rocketmq.mysql.position.BinlogPositionManager; +import org.apache.rocketmq.mysql.schema.Schema; +import org.apache.rocketmq.mysql.schema.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class); + + private Replicator replicator; + private Config config; + + private DataSource dataSource; + + private BinlogPositionManager binlogPositionManager; + + private BlockingQueue<Event> queue = new LinkedBlockingQueue<>(100); + + private BinaryLogClient binaryLogClient; + + private EventListener eventListener; + + private Schema schema; + + private Map<Long, Table> tableMap = new HashMap<>(); + + private Transaction transaction; + + public EventProcessor(Replicator replicator) { + + this.replicator = replicator; + this.config = replicator.getConfig(); + } + + public void start() throws Exception { + + initDataSource(); + + binlogPositionManager = new BinlogPositionManager(config, dataSource); + binlogPositionManager.initBeginPosition(); + + schema = new Schema(dataSource); + schema.load(); + + eventListener = new EventListener(queue); + binaryLogClient = new BinaryLogClient(config.mysqlAddr, + config.mysqlPort, + config.mysqlUsername, + config.mysqlPassword); + binaryLogClient.setBlocking(true); + binaryLogClient.setServerId(1001); + + EventDeserializer eventDeserializer = new EventDeserializer(); + eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, + EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY); + binaryLogClient.setEventDeserializer(eventDeserializer); + binaryLogClient.registerEventListener(eventListener); + binaryLogClient.setBinlogFilename(binlogPositionManager.getBinlogFilename()); + binaryLogClient.setBinlogPosition(binlogPositionManager.getPosition()); + + binaryLogClient.connect(3000); + + LOGGER.info("Started."); + + doProcess(); + } + + private void doProcess() { + + while (true) { + + try { + Event event = queue.poll(1000, TimeUnit.MILLISECONDS); + if (event == null) { + checkConnection(); + continue; + } + + switch (event.getHeader().getEventType()) { + case TABLE_MAP: + processTableMapEvent(event); + break; + + case WRITE_ROWS: + case EXT_WRITE_ROWS: + processWriteEvent(event); + break; + + case UPDATE_ROWS: + case EXT_UPDATE_ROWS: + processUpdateEvent(event); + break; + + case DELETE_ROWS: + case EXT_DELETE_ROWS: + processDeleteEvent(event); + break; + + case QUERY: + processQueryEvent(event); + break; + + case XID: + processXidEvent(event); + break; + + } + } catch (Exception e) { + LOGGER.error("Binlog process error.", e); + } + + } + } + + private void checkConnection() throws Exception { + + if (!binaryLogClient.isConnected()) { + BinlogPosition binlogPosition = replicator.getNextBinlogPosition(); + if (binlogPosition != null) { + binaryLogClient.setBinlogFilename(binlogPosition.getBinlogFilename()); + binaryLogClient.setBinlogPosition(binlogPosition.getPosition()); + } + + binaryLogClient.connect(3000); + } + } + + private void processTableMapEvent(Event event) { + TableMapEventData data = event.getData(); + String dbName = data.getDatabase(); + String tableName = data.getTable(); + Long tableId = data.getTableId(); + + Table table = schema.getTable(dbName, tableName); + + tableMap.put(tableId, table); + } + + private void processWriteEvent(Event event) { + WriteRowsEventData data = event.getData(); + Long tableId = data.getTableId(); + List<Serializable[]> list = data.getRows(); + + for (Serializable[] row : list) { + addRow("WRITE", tableId, row); + } + } + + private void processUpdateEvent(Event event) { + UpdateRowsEventData data = event.getData(); + Long tableId = data.getTableId(); + List<Map.Entry<Serializable[], Serializable[]>> list = data.getRows(); + + for (Map.Entry<Serializable[], Serializable[]> entry : list) { + addRow("UPDATE", tableId, entry.getValue()); + } + } + + private void processDeleteEvent(Event event) { + DeleteRowsEventData data = event.getData(); + Long tableId = data.getTableId(); + List<Serializable[]> list = data.getRows(); + + for (Serializable[] row : list) { + addRow("DELETE", tableId, row); + } + + } + + private static Pattern createTablePattern = + Pattern.compile("^(CREATE|ALTER)\\s+TABLE", Pattern.CASE_INSENSITIVE); + + private void processQueryEvent(Event event) { + QueryEventData data = event.getData(); + String sql = data.getSql(); + + if (createTablePattern.matcher(sql).find()) { + schema.reset(); + } + } + + private void processXidEvent(Event event) { + EventHeaderV4 header = event.getHeader(); + XidEventData data = event.getData(); + + String binlogFilename = binaryLogClient.getBinlogFilename(); + Long position = header.getNextPosition(); + Long xid = data.getXid(); + + BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position); + transaction.setNextBinlogPosition(binlogPosition); + transaction.setXid(xid); + + replicator.commit(transaction, true); + + transaction = new Transaction(config); + } + + private void addRow(String type, Long tableId, Serializable[] row) { + + if (transaction == null) { + transaction = new Transaction(config); + } + + Table t = tableMap.get(tableId); + if (t != null) { + + while (true) { + if (transaction.addRow(type, t, row)) { + break; + + } else { + transaction.setNextBinlogPosition(replicator.getNextBinlogPosition()); + replicator.commit(transaction, false); + transaction = new Transaction(config); + } + } + + } + } + + private void initDataSource() throws Exception { + Map<String, String> map = new HashMap<>(); + map.put("driverClassName", "com.mysql.jdbc.Driver"); + map.put("url", "jdbc:mysql://" + config.mysqlAddr + ":" + config.mysqlPort + "?useSSL=true&verifyServerCertificate=false"); + map.put("username", config.mysqlUsername); + map.put("password", config.mysqlPassword); + map.put("initialSize", "2"); + map.put("maxActive", "2"); + map.put("maxWait", "60000"); + map.put("timeBetweenEvictionRunsMillis", "60000"); + map.put("minEvictableIdleTimeMillis", "300000"); + map.put("validationQuery", "SELECT 1 FROM DUAL"); + map.put("testWhileIdle", "true"); + + dataSource = DruidDataSourceFactory.createDataSource(map); + } + + public Config getConfig() { + return config; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java new file mode 100644 index 0000000..396815a --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.binlog; + +import com.alibaba.fastjson.JSONObject; +import java.io.Serializable; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.mysql.Config; +import org.apache.rocketmq.mysql.position.BinlogPosition; +import org.apache.rocketmq.mysql.schema.Table; + +public class Transaction { + private BinlogPosition nextBinlogPosition; + private Long xid; + + private Config config; + + private List<DataRow> list = new LinkedList<>(); + + public Transaction(Config config) { + this.config = config; + } + + public boolean addRow(String type, Table table, Serializable[] row) { + + if (list.size() == config.maxTransactionRows) { + return false; + } else { + DataRow dataRow = new DataRow(type, table, row); + list.add(dataRow); + return true; + } + + } + + public String toJson() { + + List<Map> rows = new LinkedList<>(); + for (DataRow dataRow : list) { + Map rowMap = dataRow.toMap(); + if (rowMap != null) { + rows.add(rowMap); + } + } + + Map<String, Object> map = new HashMap<>(); + map.put("xid", xid); + map.put("binlogFilename", nextBinlogPosition.getBinlogFilename()); + map.put("nextPosition", nextBinlogPosition.getPosition()); + map.put("rows", rows); + + return JSONObject.toJSONString(map); + } + + public BinlogPosition getNextBinlogPosition() { + return nextBinlogPosition; + } + + public void setNextBinlogPosition(BinlogPosition nextBinlogPosition) { + this.nextBinlogPosition = nextBinlogPosition; + } + + public void setXid(Long xid) { + this.xid = xid; + } + + public Long getXid() { + return xid; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java new file mode 100644 index 0000000..5ba436c --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.position; + +public class BinlogPosition { + + private String binlogFilename; + private Long position; + + public BinlogPosition(String binlogFilename, Long position) { + this.binlogFilename = binlogFilename; + this.position = position; + } + + public String getBinlogFilename() { + return binlogFilename; + } + + public void setBinlogFilename(String binlogFilename) { + this.binlogFilename = binlogFilename; + } + + public Long getPosition() { + return position; + } + + public void setPosition(Long position) { + this.position = position; + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java new file mode 100644 index 0000000..dedb08f --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.position; + +import org.apache.rocketmq.mysql.Replicator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BinlogPositionLogThread extends Thread { + private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class); + + private Replicator replicator; + + public BinlogPositionLogThread(Replicator replicator) { + this.replicator = replicator; + setDaemon(true); + } + + @Override + public void run() { + + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.error("Offset thread interrupted.", e); + } + + replicator.logPosition(); + } + } +}