Release rocketmq-mysql 1.1.0 version

Project: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/a0aeee62
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/a0aeee62
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/a0aeee62

Branch: refs/heads/release-rocketmq-mysql-1.1.0
Commit: a0aeee629f4dc3fbb86ccbf3408011092c719b1e
Parents: 
Author: yukon <yu...@apache.org>
Authored: Tue Aug 22 22:08:24 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Aug 22 22:08:24 2017 +0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 README.md                                       |  26 ++
 rocketmq-mysql/.gitignore                       |  14 +
 rocketmq-mysql/LICENSE                          | 201 +++++++++++++
 rocketmq-mysql/LICENSE-BIN                      | 301 +++++++++++++++++++
 rocketmq-mysql/NOTICE                           |   5 +
 rocketmq-mysql/NOTICE-BIN                       |   5 +
 rocketmq-mysql/README.md                        |  42 +++
 rocketmq-mysql/doc/dataflow.png                 | Bin 0 -> 28277 bytes
 rocketmq-mysql/doc/overview.png                 | Bin 0 -> 35155 bytes
 rocketmq-mysql/pom.xml                          | 275 +++++++++++++++++
 rocketmq-mysql/src/main/assembly/assembly.xml   |  61 ++++
 .../src/main/assembly/scripts/start.sh          |  23 ++
 .../src/main/assembly/scripts/stop.sh           |  18 ++
 .../java/org/apache/rocketmq/mysql/Config.java  | 130 ++++++++
 .../org/apache/rocketmq/mysql/Replicator.java   | 129 ++++++++
 .../apache/rocketmq/mysql/binlog/DataRow.java   |  76 +++++
 .../rocketmq/mysql/binlog/EventListener.java    |  65 ++++
 .../rocketmq/mysql/binlog/EventProcessor.java   | 285 ++++++++++++++++++
 .../rocketmq/mysql/binlog/Transaction.java      |  88 ++++++
 .../rocketmq/mysql/position/BinlogPosition.java |  47 +++
 .../mysql/position/BinlogPositionLogThread.java |  47 +++
 .../mysql/position/BinlogPositionManager.java   | 149 +++++++++
 .../mysql/productor/RocketMQProducer.java       |  52 ++++
 .../apache/rocketmq/mysql/schema/Database.java  | 104 +++++++
 .../apache/rocketmq/mysql/schema/Schema.java    | 126 ++++++++
 .../org/apache/rocketmq/mysql/schema/Table.java |  58 ++++
 .../mysql/schema/column/BigIntColumnParser.java |  50 +++
 .../mysql/schema/column/ColumnParser.java       |  71 +++++
 .../schema/column/DateTimeColumnParser.java     |  53 ++++
 .../schema/column/DefaultColumnParser.java      |  37 +++
 .../mysql/schema/column/EnumColumnParser.java   |  46 +++
 .../mysql/schema/column/IntColumnParser.java    |  66 ++++
 .../mysql/schema/column/SetColumnParser.java    |  54 ++++
 .../mysql/schema/column/StringColumnParser.java |  57 ++++
 .../mysql/schema/column/TimeColumnParser.java   |  39 +++
 .../mysql/schema/column/YearColumnParser.java   |  40 +++
 rocketmq-mysql/src/main/resources/logback.xml   |  79 +++++
 .../src/main/resources/rocketmq_mysql.conf      |  28 ++
 .../rocketmq/mysql/BigIntColumnParserTest.java  |  37 +++
 .../rocketmq/mysql/EnumColumnParserTest.java    |  37 +++
 .../rocketmq/mysql/IntColumnParserTest.java     |  54 ++++
 .../rocketmq/mysql/SetColumnParserTest.java     |  36 +++
 rocketmq-mysql/style/copyright/Apache.xml       |  23 ++
 .../style/copyright/profiles_settings.xml       |  64 ++++
 rocketmq-mysql/style/rmq_checkstyle.xml         | 134 +++++++++
 rocketmq-mysql/style/rmq_codeStyle.xml          | 143 +++++++++
 47 files changed, 3476 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..485dee6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..8cd617b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,26 @@
+# RocketMQ Externals
+
+There are many Apache RocketMQ external projects contributed and maintained by 
community.
+
+## RocketMQ-Console
+A newly designed RocketMQ's console using spring-boot.
+
+## RocketMQ-JMS
+RocketMQ's JMS 1.1 spec. implementation.
+
+## RocketMQ-Flume
+Flume RocketMQ source and sink implementation.
+
+
+## RocketMQ-Spark
+
+Integration of Apache Spark-Streaming and Apache RocketMQ. Both push & pull 
consumers are provided. For more details please refer to 
[README](https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-spark).
+
+## RocketMQ-Docker
+Apache RocketMQ Docker provides Dockerfile and bash scripts for building and 
running docker image.
+
+## RocketMQ-MySQL
+This project is a data replicator between MySQL and other systems.For more 
details please refer to 
[README](https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-mysql).
+
+## Others
+[RocketMQ-Druid](https://github.com/druid-io/druid/tree/master/extensions-contrib/druid-rocketmq),
 
[RocketMQ-Ignite](https://github.com/apache/ignite/tree/master/modules/rocketmq)
 and 
[RocketMQ-Storm](https://github.com/apache/storm/tree/master/external/storm-rocketmq)
 integration can be found in those repositories.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/.gitignore
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/.gitignore b/rocketmq-mysql/.gitignore
new file mode 100644
index 0000000..3311eab
--- /dev/null
+++ b/rocketmq-mysql/.gitignore
@@ -0,0 +1,14 @@
+*dependency-reduced-pom.xml
+.classpath
+.project
+.settings/
+target/
+devenv
+*.log*
+*.iml
+.idea/
+*.versionsBackup
+*bin
+!NOTICE-BIN
+!LICENSE-BIN
+.DS_Store
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/LICENSE
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/LICENSE b/rocketmq-mysql/LICENSE
new file mode 100644
index 0000000..b67d909
--- /dev/null
+++ b/rocketmq-mysql/LICENSE
@@ -0,0 +1,201 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/LICENSE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN
new file mode 100644
index 0000000..22b0aa4
--- /dev/null
+++ b/rocketmq-mysql/LICENSE-BIN
@@ -0,0 +1,301 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+------
+This product has a bundle logback, which is available under the EPL v1.0 
License.
+The source code of logback can be found at https://github.com/qos-ch/logback.
+
+Logback LICENSE
+---------------
+
+Logback: the reliable, generic, fast and flexible logging framework.
+Copyright (C) 1999-2015, QOS.ch. All rights reserved.
+
+This program and the accompanying materials are dual-licensed under
+either the terms of the Eclipse Public License v1.0 as published by
+the Eclipse Foundation
+
+  or (per the licensee's choosing)
+
+under the terms of the GNU Lesser General Public License version 2.1
+as published by the Free Software Foundation.
+
+------
+This product has a bundle slf4j, which is available under the MIT License.
+The source code of slf4j can be found at https://github.com/qos-ch/slf4j.
+
+ Copyright (c) 2004-2017 QOS.ch
+ All rights reserved.
+
+ Permission is hereby granted, free  of charge, to any person obtaining
+ a  copy  of this  software  and  associated  documentation files  (the
+ "Software"), to  deal in  the Software without  restriction, including
+ without limitation  the rights to  use, copy, modify,  merge, publish,
+ distribute,  sublicense, and/or sell  copies of  the Software,  and to
+ permit persons to whom the Software  is furnished to do so, subject to
+ the following conditions:
+
+ The  above  copyright  notice  and  this permission  notice  shall  be
+ included in all copies or substantial portions of the Software.
+
+ THE  SOFTWARE IS  PROVIDED  "AS  IS", WITHOUT  WARRANTY  OF ANY  KIND,
+ EXPRESS OR  IMPLIED, INCLUDING  BUT NOT LIMITED  TO THE  WARRANTIES OF
+ MERCHANTABILITY,    FITNESS    FOR    A   PARTICULAR    PURPOSE    AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ OF CONTRACT, TORT OR OTHERWISE,  ARISING FROM, OUT OF OR IN CONNECTION
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+------
+This product has a bundle fastjson, which is available under the ASL2 License.
+The source code of fastjson can be found at 
https://github.com/alibaba/fastjson.
+
+ Copyright 1999-2017 Alibaba Group Holding Ltd.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+------
+ This product has a bundle druid, which is available under the ASL2 License.
+ The source code of druid can be found at https://github.com/alibaba/druid.
+
+  Copyright 1999-2017 Alibaba Group Holding Ltd.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+------
+This product has a bundle commons-codec, which is available under the ASL2 
License.
+The source code of commons-codec can be found at 
http://svn.apache.org/viewvc/commons/proper/codec/trunk/.
+
+Apache Commons Codec
+Copyright 2002-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+contains test data from http://aspell.net/test/orig/batch0.tab.
+Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org)
+
+------
+This product has a bundle mysql-binlog-connector-java, which is available 
under the ASL2 License.
+The source code of mysql-binlog-connector-java can be found at 
https://github.com/shyiko/mysql-binlog-connector-java.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/NOTICE
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/NOTICE b/rocketmq-mysql/NOTICE
new file mode 100644
index 0000000..5384857
--- /dev/null
+++ b/rocketmq-mysql/NOTICE
@@ -0,0 +1,5 @@
+Apache RocketMQ (incubating)
+Copyright 2016-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/NOTICE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/NOTICE-BIN b/rocketmq-mysql/NOTICE-BIN
new file mode 100644
index 0000000..5384857
--- /dev/null
+++ b/rocketmq-mysql/NOTICE-BIN
@@ -0,0 +1,5 @@
+Apache RocketMQ (incubating)
+Copyright 2016-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/README.md b/rocketmq-mysql/README.md
new file mode 100644
index 0000000..65efb05
--- /dev/null
+++ b/rocketmq-mysql/README.md
@@ -0,0 +1,42 @@
+# RocketMQ-MySQL
+
+
+## Overview
+![overview](./doc/overview.png)
+
+The RocketMQ-MySQL is a data replicator between MySQL and other systems. The 
replicator simulates a MySQL slave instance, parses the binlog event 
+and sends it to RocketMQ in json format. Besides MySQL, other systems can also 
consume data from RocketMQ. With the RocketMQ-MySQL Replicator, more systems 
can easily process data from MySQL binlog at a very low cost.
+
+## Dataflow
+![dataflow](./doc/dataflow.png)
+
+* 1. Firstly, get the last data from the queue, and get the binlog position 
from this
+ data. If the data queue is empty, use the latest binlog position of MySQL. 
Besides that, customized setting of position of the wanted binlog is also 
supported.
+* 2. Send a binlog dump request to MySQL.
+* 3. MySQL pushes binlog event to the replicator. The replicator parses the 
data and accumulates it as a transaction-object.
+* 4. Add the next-position of the transaction to the transaction-object and 
send it in json format to the queue.
+* 5. Record the binlog position and the offset in the queue of the latest 
transaction every second.
+
+
+## Quick Start
+
+* 1. Create an account with MySQL replication permission, which is used to 
simulate the MySQL slave to get the binlog event, and the replication must be 
in row mode.
+* 2. Create a topic in the RocketMQ to store binlog events to ensure that the 
downstream system consumes the data in order. Make sure the topic must have 
only one queue.
+* 3. Configure the relevant information of MySQL and RocketMQ in the 
RocketMQ-MySQL.conf file.
+* 4. Execute "mvn install", then start the replicator (via execute "nohup 
./start.sh &").
+* 5. Subscribe and process the messages.
+
+
+## Configuration Instruction
+|key               |nullable|default    |description|
+|------------------|--------|-----------|-----------|
+|mysqlAddr         |false   |           |MySQL address|
+|mysqlPort         |false   |           |MySQL port|
+|mysqlUsername     |false   |           |username of MySQL account|
+|mysqlPassword     |false   |           |password of MySQL account|
+|mqNamesrvAddr     |false   |           |RocketMQ name server address 
(e.g.,127.0.0.1:9876)|
+|mqTopic           |false   |           |RocketMQ topic name|
+|startType         |true    |DEFAULT    |The way that the replicator starts 
processing data,there are four options available:<br>- DEFAULT: try to start 
processing data in the "LAST_PROCESSED" way,if failed, then in the "NEW_EVENT" 
way<br>- LAST_PROCESSED: starts processing data from the last processed 
event<br>- NEW_EVENT: starts processing data from the tail of binlog<br>- 
SPECIFIED: starts processing data from the position that user specified,if you 
choose this option,the binlogFilename and nextPosition must not be null|
+|binlogFilename    |true    |           |If "startType" is "SPECIFIED",the 
replicator will begin to replicate from this binlog file|
+|nextPosition      |true    |           |If "startType" is "SPECIFIED",the 
replicator will begin to replicate from this position|
+|maxTransactionRows|true    |100        |max rows of the transaction pushed to 
RocketMQ|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/doc/dataflow.png
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/doc/dataflow.png b/rocketmq-mysql/doc/dataflow.png
new file mode 100644
index 0000000..ed12b52
Binary files /dev/null and b/rocketmq-mysql/doc/dataflow.png differ

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/doc/overview.png
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/doc/overview.png b/rocketmq-mysql/doc/overview.png
new file mode 100644
index 0000000..0a3ec82
Binary files /dev/null and b/rocketmq-mysql/doc/overview.png differ

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml
new file mode 100644
index 0000000..23e7468
--- /dev/null
+++ b/rocketmq-mysql/pom.xml
@@ -0,0 +1,275 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache</groupId>
+    <artifactId>rocketmq-mysql-replicator</artifactId>
+    <version>1.1.0</version>
+
+    <scm>
+        
<url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</url>
+        
<connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</connection>
+        
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git
+        </developerConnection>
+        <tag>HEAD</tag>
+    </scm>
+
+    <mailingLists>
+        <mailingList>
+            <name>Development List</name>
+            <subscribe>dev-subscr...@rocketmq.incubator.apache.org</subscribe>
+            
<unsubscribe>dev-unsubscr...@rocketmq.incubator.apache.org</unsubscribe>
+            <post>d...@rocketmq.incubator.apache.org</post>
+        </mailingList>
+        <mailingList>
+            <name>User List</name>
+            
<subscribe>users-subscr...@rocketmq.incubator.apache.org</subscribe>
+            
<unsubscribe>users-unsubscr...@rocketmq.incubator.apache.org</unsubscribe>
+            <post>us...@rocketmq.incubator.apache.org</post>
+        </mailingList>
+        <mailingList>
+            <name>Commits List</name>
+            
<subscribe>commits-subscr...@rocketmq.incubator.apache.org</subscribe>
+            
<unsubscribe>commits-unsubscr...@rocketmq.incubator.apache.org</unsubscribe>
+            <post>commits@rocketmq.incubator.apache.org</post>
+        </mailingList>
+    </mailingLists>
+
+    <developers>
+        <developer>
+            <id>Apache RocketMQ</id>
+            <name>Apache RocketMQ of ASF</name>
+            <url>https://rocketmq.apache.org/</url>
+        </developer>
+    </developers>
+
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <organization>
+        <name>Apache Software Foundation</name>
+        <url>http://www.apache.org</url>
+    </organization>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <maven.test.skip>false</maven.test.skip>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <maven.compiler.source>1.7</maven.compiler.source>
+        <maven.compiler.target>1.7</maven.compiler.target>
+        <rocketmq.version>4.0.0-incubating</rocketmq.version>
+    </properties>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.shyiko</groupId>
+            <artifactId>mysql-binlog-connector-java</artifactId>
+            <version>0.12.1</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>6.0.6</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.31</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.9</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>rocketmq-mysql</finalName>
+        <sourceDirectory>${project.basedir}/src/main/java</sourceDirectory>
+        <outputDirectory>${project.basedir}/target/classes</outputDirectory>
+        <resources>
+            <resource>
+                <directory>${project.basedir}/src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.8</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.4.1</version>
+                <executions>
+                    <execution>
+                        <id>enforce-ban-circular-dependencies</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <rules>
+                        <banCircularDependencies/>
+                    </rules>
+                    <fail>true</fail>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>extra-enforcer-rules</artifactId>
+                        <version>1.0-beta-6</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>attach-javadocs</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            
<configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            
<includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <archive>
+                        <addMavenDescriptor>false</addMavenDescriptor>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <classpathPrefix>lib/</classpathPrefix>
+                        </manifest>
+                    </archive>
+                    <excludes>
+                        <exclude>rocketmq_mysql.conf</exclude>
+                        <exclude>logback.xml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/assembly/assembly.xml</descriptor>
+                    </descriptors>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/assembly.xml 
b/rocketmq-mysql/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..b280aa6
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/assembly.xml
@@ -0,0 +1,61 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly
+       
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
 http://maven.apache.org/xsd/assembly-1.1.0.xsd";>
+       <id>pack</id>
+       <formats>
+               <format>tar.gz</format>
+               <format>dir</format>
+       </formats>
+       <includeBaseDirectory>false</includeBaseDirectory>
+       <dependencySets>
+               <dependencySet>
+                       <useProjectArtifact>true</useProjectArtifact>
+                       <outputDirectory>lib</outputDirectory>
+               </dependencySet>
+       </dependencySets>
+       <fileSets>
+               <fileSet>
+                       <directory>src/main/assembly/scripts</directory>
+                       <outputDirectory>bin</outputDirectory>
+                       <fileMode>0755</fileMode>
+               </fileSet>
+               <fileSet>
+                       <directory>target/classes</directory>
+                       <outputDirectory>conf</outputDirectory>
+                       <fileMode>0755</fileMode>
+                       <includes>
+                               <include>*.conf</include>
+                               <include>logback.xml</include>
+                       </includes>
+               </fileSet>
+       </fileSets>
+
+       <files>
+               <file>
+                       <source>LICENSE-BIN</source>
+                       <destName>LICENSE</destName>
+               </file>
+               <file>
+                       <source>NOTICE-BIN</source>
+                       <destName>NOTICE</destName>
+               </file>
+       </files>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/assembly/scripts/start.sh
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/scripts/start.sh 
b/rocketmq-mysql/src/main/assembly/scripts/start.sh
new file mode 100644
index 0000000..e159f36
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/scripts/start.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+
+binPath=$(cd "$(dirname "$0")"; pwd);
+cd $binPath
+cd ..
+parentPath=`pwd`
+libPath=$parentPath/lib/
+
+
+function exportClassPath(){
+    jarFileList=`find "$libPath" -name *.jar |awk -F'/' '{print $(NF)}' 
2>>/dev/null`
+    CLASSPATH=".:$binPath";
+    for jarItem in $jarFileList
+      do
+        CLASSPATH="$CLASSPATH:$libPath$jarItem"
+    done
+    CLASSPATH=$CLASSPATH:./conf
+    export CLASSPATH
+}
+ulimit -n 65535
+exportClassPath
+
+java -server -Xms512m -Xmx512m -Xss2m -XX:NewRatio=2  -XX:+UseGCOverheadLimit 
-XX:-UseParallelGC -XX:ParallelGCThreads=24 org.apache.rocketmq.mysql.Replicator

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/assembly/scripts/stop.sh
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/scripts/stop.sh 
b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
new file mode 100755
index 0000000..f0e3c0d
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+
+PROGRAM_NAME="org.apache.rocketmq.mysql.Replicator"
+PIDS=`ps -ef | grep $PROGRAM_NAME | grep -v "grep" | awk '{print $2}'`
+
+if [ -z $PIDS ]; then
+    echo "No this process."
+else
+    echo "Find process is $PIDS."
+fi
+
+#####kill####
+echo -e "Stopping the $PROGRAM_NAME...\c"
+for PID in $PIDS ; do
+    kill  $PID
+done
+
+echo "SUCCESS!"

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
new file mode 100644
index 0000000..6c14cb4
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+
+public class Config {
+
+    public String mysqlAddr;
+    public Integer mysqlPort;
+    public String mysqlUsername;
+    public String mysqlPassword;
+
+    public String mqNamesrvAddr;
+    public String mqTopic;
+
+    public String startType = "DEFAULT";
+    public String binlogFilename;
+    public Long nextPosition;
+    public Integer maxTransactionRows = 100;
+
+    public void load() throws IOException {
+
+        InputStream in = 
Config.class.getClassLoader().getResourceAsStream("rocketmq_mysql.conf");
+        Properties properties = new Properties();
+        properties.load(in);
+
+        properties2Object(properties, this);
+
+    }
+
+    private void properties2Object(final Properties p, final Object object) {
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getProperty(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) 
{
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || 
cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || 
cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || 
cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public void setMysqlAddr(String mysqlAddr) {
+        this.mysqlAddr = mysqlAddr;
+    }
+
+    public void setMysqlPort(Integer mysqlPort) {
+        this.mysqlPort = mysqlPort;
+    }
+
+    public void setMysqlUsername(String mysqlUsername) {
+        this.mysqlUsername = mysqlUsername;
+    }
+
+    public void setMysqlPassword(String mysqlPassword) {
+        this.mysqlPassword = mysqlPassword;
+    }
+
+    public void setBinlogFilename(String binlogFilename) {
+        this.binlogFilename = binlogFilename;
+    }
+
+    public void setNextPosition(Long nextPosition) {
+        this.nextPosition = nextPosition;
+    }
+
+    public void setMaxTransactionRows(Integer maxTransactionRows) {
+        this.maxTransactionRows = maxTransactionRows;
+    }
+
+    public void setMqNamesrvAddr(String mqNamesrvAddr) {
+        this.mqNamesrvAddr = mqNamesrvAddr;
+    }
+
+    public void setMqTopic(String mqTopic) {
+        this.mqTopic = mqTopic;
+    }
+
+    public void setStartType(String startType) {
+        this.startType = startType;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
new file mode 100644
index 0000000..ae3c984
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.binlog.EventProcessor;
+import org.apache.rocketmq.mysql.binlog.Transaction;
+import org.apache.rocketmq.mysql.position.BinlogPositionLogThread;
+import org.apache.rocketmq.mysql.productor.RocketMQProducer;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Replicator.class);
+
+    private static final Logger POSITION_LOGGER = 
LoggerFactory.getLogger("PositionLogger");
+
+    private Config config;
+
+    private EventProcessor eventProcessor;
+
+    private RocketMQProducer rocketMQProducer;
+
+    private Object lock = new Object();
+    private BinlogPosition nextBinlogPosition;
+    private long nextQueueOffset;
+    private long xid;
+
+    public static void main(String[] args) {
+
+        Replicator replicator = new Replicator();
+        replicator.start();
+    }
+
+    public void start() {
+
+        try {
+            config = new Config();
+            config.load();
+
+            rocketMQProducer = new RocketMQProducer(config);
+            rocketMQProducer.start();
+
+            BinlogPositionLogThread binlogPositionLogThread = new 
BinlogPositionLogThread(this);
+            binlogPositionLogThread.start();
+
+            eventProcessor = new EventProcessor(this);
+            eventProcessor.start();
+
+        } catch (Exception e) {
+            LOGGER.error("Start error.", e);
+            System.exit(1);
+        }
+    }
+
+    public void commit(Transaction transaction, boolean isComplete) {
+
+        String json = transaction.toJson();
+
+        for (int i = 0; i < 3; i++) {
+            try {
+                if (isComplete) {
+                    long offset = rocketMQProducer.push(json);
+
+                    synchronized (lock) {
+                        xid = transaction.getXid();
+                        nextBinlogPosition = 
transaction.getNextBinlogPosition();
+                        nextQueueOffset = offset;
+                    }
+
+                } else {
+                    rocketMQProducer.push(json);
+                }
+                break;
+
+            } catch (Exception e) {
+                LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
+            }
+        }
+    }
+
+    public void logPosition() {
+
+        String binlogFilename = null;
+        long xid = 0L;
+        long nextPosition = 0L;
+        long nextOffset = 0L;
+
+        synchronized (lock) {
+            if (nextBinlogPosition != null) {
+                xid = this.xid;
+                binlogFilename = nextBinlogPosition.getBinlogFilename();
+                nextPosition = nextBinlogPosition.getPosition();
+                nextOffset = nextQueueOffset;
+            }
+        }
+
+        if (binlogFilename != null) {
+            POSITION_LOGGER.info("XID: {},   BINLOG_FILE: {},   NEXT_POSITION: 
{},   NEXT_OFFSET: {}",
+                xid, binlogFilename, nextPosition, nextOffset);
+        }
+
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public BinlogPosition getNextBinlogPosition() {
+        return nextBinlogPosition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
new file mode 100644
index 0000000..646c018
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.mysql.schema.Table;
+import org.apache.rocketmq.mysql.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataRow {
+    private Logger logger = LoggerFactory.getLogger(DataRow.class);
+
+    private String type;
+    private Table table;
+    private Serializable[] row;
+
+    public DataRow(String type, Table table, Serializable[] row) {
+        this.type = type;
+        this.table = table;
+        this.row = row;
+    }
+
+    public Map toMap() {
+
+        try {
+            if (table.getColList().size() == row.length) {
+                Map<String, Object> dataMap = new HashMap<>();
+                List<String> keyList = table.getColList();
+                List<ColumnParser> parserList = table.getParserList();
+
+                for (int i = 0; i < keyList.size(); i++) {
+                    Object value = row[i];
+                    ColumnParser parser = parserList.get(i);
+                    dataMap.put(keyList.get(i), parser.getValue(value));
+                }
+
+                Map<String, Object> map = new HashMap<>();
+                map.put("database", table.getDatabase());
+                map.put("table", table.getName());
+                map.put("type", type);
+                map.put("data", dataMap);
+
+                return map;
+            } else {
+                logger.error("Table schema changed,discard data: {} - {}, {}  
{}",
+                    table.getDatabase().toUpperCase(), 
table.getName().toUpperCase(), type, row.toString());
+
+                return null;
+            }
+        } catch (Exception e) {
+            logger.error("Row parse error,discard data: {} - {}, {}  {}",
+                table.getDatabase().toUpperCase(), 
table.getName().toUpperCase(), type, row.toString());
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
new file mode 100644
index 0000000..b5005bc
--- /dev/null
+++ 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class EventListener implements BinaryLogClient.EventListener, 
BinaryLogClient.LifecycleListener {
+
+    private BlockingQueue<Event> queue;
+
+    public EventListener(BlockingQueue<Event> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void onEvent(Event event) {
+        try {
+            while (true) {
+                if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) {
+                    return;
+                }
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void onConnect(BinaryLogClient client) {
+
+    }
+
+    @Override
+    public void onCommunicationFailure(BinaryLogClient client, Exception e) {
+
+    }
+
+    @Override
+    public void onEventDeserializationFailure(BinaryLogClient client, 
Exception e) {
+
+    }
+
+    @Override
+    public void onDisconnect(BinaryLogClient client) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
new file mode 100644
index 0000000..a730403
--- /dev/null
+++ 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.XidEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import javax.sql.DataSource;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.mysql.Replicator;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.apache.rocketmq.mysql.position.BinlogPositionManager;
+import org.apache.rocketmq.mysql.schema.Schema;
+import org.apache.rocketmq.mysql.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventProcessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EventProcessor.class);
+
+    private Replicator replicator;
+    private Config config;
+
+    private DataSource dataSource;
+
+    private BinlogPositionManager binlogPositionManager;
+
+    private BlockingQueue<Event> queue = new LinkedBlockingQueue<>(100);
+
+    private BinaryLogClient binaryLogClient;
+
+    private EventListener eventListener;
+
+    private Schema schema;
+
+    private Map<Long, Table> tableMap = new HashMap<>();
+
+    private Transaction transaction;
+
+    public EventProcessor(Replicator replicator) {
+
+        this.replicator = replicator;
+        this.config = replicator.getConfig();
+    }
+
+    public void start() throws Exception {
+
+        initDataSource();
+
+        binlogPositionManager = new BinlogPositionManager(config, dataSource);
+        binlogPositionManager.initBeginPosition();
+
+        schema = new Schema(dataSource);
+        schema.load();
+
+        eventListener = new EventListener(queue);
+        binaryLogClient = new BinaryLogClient(config.mysqlAddr,
+            config.mysqlPort,
+            config.mysqlUsername,
+            config.mysqlPassword);
+        binaryLogClient.setBlocking(true);
+        binaryLogClient.setServerId(1001);
+
+        EventDeserializer eventDeserializer = new EventDeserializer();
+        
eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
+            EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
+        binaryLogClient.setEventDeserializer(eventDeserializer);
+        binaryLogClient.registerEventListener(eventListener);
+        
binaryLogClient.setBinlogFilename(binlogPositionManager.getBinlogFilename());
+        binaryLogClient.setBinlogPosition(binlogPositionManager.getPosition());
+
+        binaryLogClient.connect(3000);
+
+        LOGGER.info("Started.");
+
+        doProcess();
+    }
+
+    private void doProcess() {
+
+        while (true) {
+
+            try {
+                Event event = queue.poll(1000, TimeUnit.MILLISECONDS);
+                if (event == null) {
+                    checkConnection();
+                    continue;
+                }
+
+                switch (event.getHeader().getEventType()) {
+                    case TABLE_MAP:
+                        processTableMapEvent(event);
+                        break;
+
+                    case WRITE_ROWS:
+                    case EXT_WRITE_ROWS:
+                        processWriteEvent(event);
+                        break;
+
+                    case UPDATE_ROWS:
+                    case EXT_UPDATE_ROWS:
+                        processUpdateEvent(event);
+                        break;
+
+                    case DELETE_ROWS:
+                    case EXT_DELETE_ROWS:
+                        processDeleteEvent(event);
+                        break;
+
+                    case QUERY:
+                        processQueryEvent(event);
+                        break;
+
+                    case XID:
+                        processXidEvent(event);
+                        break;
+
+                }
+            } catch (Exception e) {
+                LOGGER.error("Binlog process error.", e);
+            }
+
+        }
+    }
+
+    private void checkConnection() throws Exception {
+
+        if (!binaryLogClient.isConnected()) {
+            BinlogPosition binlogPosition = replicator.getNextBinlogPosition();
+            if (binlogPosition != null) {
+                
binaryLogClient.setBinlogFilename(binlogPosition.getBinlogFilename());
+                
binaryLogClient.setBinlogPosition(binlogPosition.getPosition());
+            }
+
+            binaryLogClient.connect(3000);
+        }
+    }
+
+    private void processTableMapEvent(Event event) {
+        TableMapEventData data = event.getData();
+        String dbName = data.getDatabase();
+        String tableName = data.getTable();
+        Long tableId = data.getTableId();
+
+        Table table = schema.getTable(dbName, tableName);
+
+        tableMap.put(tableId, table);
+    }
+
+    private void processWriteEvent(Event event) {
+        WriteRowsEventData data = event.getData();
+        Long tableId = data.getTableId();
+        List<Serializable[]> list = data.getRows();
+
+        for (Serializable[] row : list) {
+            addRow("WRITE", tableId, row);
+        }
+    }
+
+    private void processUpdateEvent(Event event) {
+        UpdateRowsEventData data = event.getData();
+        Long tableId = data.getTableId();
+        List<Map.Entry<Serializable[], Serializable[]>> list = data.getRows();
+
+        for (Map.Entry<Serializable[], Serializable[]> entry : list) {
+            addRow("UPDATE", tableId, entry.getValue());
+        }
+    }
+
+    private void processDeleteEvent(Event event) {
+        DeleteRowsEventData data = event.getData();
+        Long tableId = data.getTableId();
+        List<Serializable[]> list = data.getRows();
+
+        for (Serializable[] row : list) {
+            addRow("DELETE", tableId, row);
+        }
+
+    }
+
+    private static Pattern createTablePattern =
+        Pattern.compile("^(CREATE|ALTER)\\s+TABLE", Pattern.CASE_INSENSITIVE);
+
+    private void processQueryEvent(Event event) {
+        QueryEventData data = event.getData();
+        String sql = data.getSql();
+
+        if (createTablePattern.matcher(sql).find()) {
+            schema.reset();
+        }
+    }
+
+    private void processXidEvent(Event event) {
+        EventHeaderV4 header = event.getHeader();
+        XidEventData data = event.getData();
+
+        String binlogFilename = binaryLogClient.getBinlogFilename();
+        Long position = header.getNextPosition();
+        Long xid = data.getXid();
+
+        BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, 
position);
+        transaction.setNextBinlogPosition(binlogPosition);
+        transaction.setXid(xid);
+
+        replicator.commit(transaction, true);
+
+        transaction = new Transaction(config);
+    }
+
+    private void addRow(String type, Long tableId, Serializable[] row) {
+
+        if (transaction == null) {
+            transaction = new Transaction(config);
+        }
+
+        Table t = tableMap.get(tableId);
+        if (t != null) {
+
+            while (true) {
+                if (transaction.addRow(type, t, row)) {
+                    break;
+
+                } else {
+                    
transaction.setNextBinlogPosition(replicator.getNextBinlogPosition());
+                    replicator.commit(transaction, false);
+                    transaction = new Transaction(config);
+                }
+            }
+
+        }
+    }
+
+    private void initDataSource() throws Exception {
+        Map<String, String> map = new HashMap<>();
+        map.put("driverClassName", "com.mysql.jdbc.Driver");
+        map.put("url", "jdbc:mysql://" + config.mysqlAddr + ":" + 
config.mysqlPort + "?useSSL=true&verifyServerCertificate=false");
+        map.put("username", config.mysqlUsername);
+        map.put("password", config.mysqlPassword);
+        map.put("initialSize", "2");
+        map.put("maxActive", "2");
+        map.put("maxWait", "60000");
+        map.put("timeBetweenEvictionRunsMillis", "60000");
+        map.put("minEvictableIdleTimeMillis", "300000");
+        map.put("validationQuery", "SELECT 1 FROM DUAL");
+        map.put("testWhileIdle", "true");
+
+        dataSource = DruidDataSourceFactory.createDataSource(map);
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
new file mode 100644
index 0000000..396815a
--- /dev/null
+++ 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.apache.rocketmq.mysql.schema.Table;
+
+public class Transaction {
+    private BinlogPosition nextBinlogPosition;
+    private Long xid;
+
+    private Config config;
+
+    private List<DataRow> list = new LinkedList<>();
+
+    public Transaction(Config config) {
+        this.config = config;
+    }
+
+    public boolean addRow(String type, Table table, Serializable[] row) {
+
+        if (list.size() == config.maxTransactionRows) {
+            return false;
+        } else {
+            DataRow dataRow = new DataRow(type, table, row);
+            list.add(dataRow);
+            return true;
+        }
+
+    }
+
+    public String toJson() {
+
+        List<Map> rows = new LinkedList<>();
+        for (DataRow dataRow : list) {
+            Map rowMap = dataRow.toMap();
+            if (rowMap != null) {
+                rows.add(rowMap);
+            }
+        }
+
+        Map<String, Object> map = new HashMap<>();
+        map.put("xid", xid);
+        map.put("binlogFilename", nextBinlogPosition.getBinlogFilename());
+        map.put("nextPosition", nextBinlogPosition.getPosition());
+        map.put("rows", rows);
+
+        return JSONObject.toJSONString(map);
+    }
+
+    public BinlogPosition getNextBinlogPosition() {
+        return nextBinlogPosition;
+    }
+
+    public void setNextBinlogPosition(BinlogPosition nextBinlogPosition) {
+        this.nextBinlogPosition = nextBinlogPosition;
+    }
+
+    public void setXid(Long xid) {
+        this.xid = xid;
+    }
+
+    public Long getXid() {
+        return xid;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
new file mode 100644
index 0000000..5ba436c
--- /dev/null
+++ 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+public class BinlogPosition {
+
+    private String binlogFilename;
+    private Long position;
+
+    public BinlogPosition(String binlogFilename, Long position) {
+        this.binlogFilename = binlogFilename;
+        this.position = position;
+    }
+
+    public String getBinlogFilename() {
+        return binlogFilename;
+    }
+
+    public void setBinlogFilename(String binlogFilename) {
+        this.binlogFilename = binlogFilename;
+    }
+
+    public Long getPosition() {
+        return position;
+    }
+
+    public void setPosition(Long position) {
+        this.position = position;
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
new file mode 100644
index 0000000..dedb08f
--- /dev/null
+++ 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+import org.apache.rocketmq.mysql.Replicator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BinlogPositionLogThread extends Thread {
+    private Logger logger = 
LoggerFactory.getLogger(BinlogPositionLogThread.class);
+
+    private Replicator replicator;
+
+    public BinlogPositionLogThread(Replicator replicator) {
+        this.replicator = replicator;
+        setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+
+        while (true) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                logger.error("Offset thread interrupted.", e);
+            }
+
+            replicator.logPosition();
+        }
+    }
+}


Reply via email to