[ 
https://issues.apache.org/jira/browse/ROCKETMQ-293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264871#comment-16264871
 ] 

ASF GitHub Bot commented on ROCKETMQ-293:
-----------------------------------------

vongosling closed pull request #30: [ROCKETMQ-293] Add high-availability 
support for rocketmq-mysql-replicator
URL: https://github.com/apache/rocketmq-externals/pull/30
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN
index 22b0aa40..c2998526 100644
--- a/rocketmq-mysql/LICENSE-BIN
+++ b/rocketmq-mysql/LICENSE-BIN
@@ -299,3 +299,13 @@ Copyright (C) 2002 Kevin Atkinson ([email protected])
 ------
 This product has a bundle mysql-binlog-connector-java, which is available 
under the ASL2 License.
 The source code of mysql-binlog-connector-java can be found at 
https://github.com/shyiko/mysql-binlog-connector-java.
+
+------
+This product has a bundle zookeeper, which is available under the ASL2 License.
+The source code of zookeeper can be found at 
https://github.com/apache/zookeeper.
+
+Apache ZooKeeper
+Copyright 2009-2014 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml
index 23e7468f..6f3569aa 100644
--- a/rocketmq-mysql/pom.xml
+++ b/rocketmq-mysql/pom.xml
@@ -111,6 +111,11 @@
             <artifactId>commons-codec</artifactId>
             <version>1.9</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>2.8.0</version>
+        </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
@@ -210,6 +215,7 @@
                 <configuration>
                     <excludes>
                         <exclude>README.md</exclude>
+                        <exclude>NOTICE-BIN</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/rocketmq-mysql/src/main/assembly/scripts/start.sh 
b/rocketmq-mysql/src/main/assembly/scripts/start.sh
index e159f36e..047458d3 100644
--- a/rocketmq-mysql/src/main/assembly/scripts/start.sh
+++ b/rocketmq-mysql/src/main/assembly/scripts/start.sh
@@ -1,5 +1,20 @@
 #!/usr/bin/env bash
 
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 binPath=$(cd "$(dirname "$0")"; pwd);
 cd $binPath
 cd ..
diff --git a/rocketmq-mysql/src/main/assembly/scripts/stop.sh 
b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
index f0e3c0d3..fdebf5c9 100755
--- a/rocketmq-mysql/src/main/assembly/scripts/stop.sh
+++ b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
@@ -1,5 +1,20 @@
 #!/bin/bash
 
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 PROGRAM_NAME="org.apache.rocketmq.mysql.Replicator"
 PIDS=`ps -ef | grep $PROGRAM_NAME | grep -v "grep" | awk '{print $2}'`
 
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
index 6c14cb45..0ddc055e 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
@@ -33,6 +33,8 @@
     public String mqNamesrvAddr;
     public String mqTopic;
 
+    public String zkAddr;
+
     public String startType = "DEFAULT";
     public String binlogFilename;
     public Long nextPosition;
@@ -127,4 +129,8 @@ public void setMqTopic(String mqTopic) {
     public void setStartType(String startType) {
         this.startType = startType;
     }
+
+    public void setZkAddr(String zkAddr) {
+        this.zkAddr = zkAddr;
+    }
 }
\ No newline at end of file
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
index ae3c9841..e0705caa 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
@@ -19,6 +19,7 @@
 
 import org.apache.rocketmq.mysql.binlog.EventProcessor;
 import org.apache.rocketmq.mysql.binlog.Transaction;
+import org.apache.rocketmq.mysql.ha.MasterElectionLatch;
 import org.apache.rocketmq.mysql.position.BinlogPositionLogThread;
 import org.apache.rocketmq.mysql.productor.RocketMQProducer;
 import org.apache.rocketmq.mysql.position.BinlogPosition;
@@ -37,6 +38,10 @@
 
     private RocketMQProducer rocketMQProducer;
 
+    private BinlogPositionLogThread binlogPositionLogThread;
+
+    private MasterElectionLatch masterElectionLatch;
+
     private Object lock = new Object();
     private BinlogPosition nextBinlogPosition;
     private long nextQueueOffset;
@@ -57,11 +62,12 @@ public void start() {
             rocketMQProducer = new RocketMQProducer(config);
             rocketMQProducer.start();
 
-            BinlogPositionLogThread binlogPositionLogThread = new 
BinlogPositionLogThread(this);
-            binlogPositionLogThread.start();
-
-            eventProcessor = new EventProcessor(this);
-            eventProcessor.start();
+            if (config.zkAddr != null) {
+                masterElectionLatch = new MasterElectionLatch(this);
+                masterElectionLatch.elect();
+            } else {
+                startProcess();
+            }
 
         } catch (Exception e) {
             LOGGER.error("Start error.", e);
@@ -69,6 +75,23 @@ public void start() {
         }
     }
 
+    public void startProcess() {
+
+        binlogPositionLogThread = new BinlogPositionLogThread(this);
+        binlogPositionLogThread.start();
+
+        eventProcessor = new EventProcessor(this);
+        eventProcessor.start();
+    }
+
+    public void stopProcess() {
+
+        binlogPositionLogThread.interrupt();
+
+        eventProcessor.stopProcess();
+        eventProcessor.interrupt();
+    }
+
     public void commit(Transaction transaction, boolean isComplete) {
 
         String json = transaction.toJson();
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
index a730403f..28657cba 100644
--- 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
+++ 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
@@ -46,12 +46,14 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class EventProcessor {
+public class EventProcessor extends Thread {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(EventProcessor.class);
 
     private Replicator replicator;
     private Config config;
 
+    private volatile boolean eof = false;
+
     private DataSource dataSource;
 
     private BinlogPositionManager binlogPositionManager;
@@ -74,7 +76,29 @@ public EventProcessor(Replicator replicator) {
         this.config = replicator.getConfig();
     }
 
-    public void start() throws Exception {
+    @Override
+    public void run() {
+
+        do {
+            try {
+                startProcess();
+
+                break;
+            } catch (Exception e) {
+                LOGGER.error("Start error.", e);
+            }
+
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+
+        } while (!eof);
+
+        LOGGER.info("Process thread stopped.");
+    }
+
+    public void startProcess() throws Exception {
 
         initDataSource();
 
@@ -102,14 +126,14 @@ public void start() throws Exception {
 
         binaryLogClient.connect(3000);
 
-        LOGGER.info("Started.");
+        LOGGER.info("Process thread started.");
 
         doProcess();
     }
 
     private void doProcess() {
 
-        while (true) {
+        while (!eof) {
 
             try {
                 Event event = queue.poll(1000, TimeUnit.MILLISECONDS);
@@ -147,16 +171,18 @@ private void doProcess() {
                         break;
 
                 }
+            } catch (InterruptedException ex) {
+                LOGGER.info("Process thread interrupted.");
+
             } catch (Exception e) {
                 LOGGER.error("Binlog process error.", e);
             }
-
         }
     }
 
     private void checkConnection() throws Exception {
 
-        if (!binaryLogClient.isConnected()) {
+        if (!binaryLogClient.isConnected() && !eof) {
             BinlogPosition binlogPosition = replicator.getNextBinlogPosition();
             if (binlogPosition != null) {
                 
binaryLogClient.setBinlogFilename(binlogPosition.getBinlogFilename());
@@ -167,6 +193,19 @@ private void checkConnection() throws Exception {
         }
     }
 
+    public void stopProcess() {
+
+        eof = true;
+
+        try {
+            if (binaryLogClient != null) {
+                binaryLogClient.disconnect();
+            }
+        } catch (Exception e) {
+            LOGGER.error("stop error", e);
+        }
+    }
+
     private void processTableMapEvent(Event event) {
         TableMapEventData data = event.getData();
         String dbName = data.getDatabase();
@@ -278,8 +317,4 @@ private void initDataSource() throws Exception {
         dataSource = DruidDataSourceFactory.createDataSource(map);
     }
 
-    public Config getConfig() {
-        return config;
-    }
-
-}
+}
\ No newline at end of file
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/ha/MasterElectionLatch.java
 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/ha/MasterElectionLatch.java
new file mode 100644
index 00000000..8bb0df32
--- /dev/null
+++ 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/ha/MasterElectionLatch.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.ha;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.rocketmq.mysql.Replicator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterElectionLatch implements LeaderLatchListener {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MasterElectionLatch.class);
+
+    private Replicator replicator;
+
+    public MasterElectionLatch(Replicator replicator) {
+
+        this.replicator = replicator;
+    }
+
+    public void elect() throws Exception {
+
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+            .connectString(replicator.getConfig().zkAddr)
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+            .sessionTimeoutMs(3000)
+            .connectionTimeoutMs(3000)
+            .namespace("rocketmq-mysql-replicator")
+            .build();
+        client.start();
+
+        LeaderLatch leaderLatch = new LeaderLatch(client, "/master", 
"replicator");
+        leaderLatch.addListener(this);
+        leaderLatch.start();
+    }
+
+    @Override
+    public void isLeader() {
+
+        LOGGER.info("ZK_EVENT:isLeader!");
+
+        replicator.startProcess();
+    }
+
+    @Override
+    public void notLeader() {
+
+        LOGGER.info("ZK_EVENT:notLeader!");
+
+        replicator.stopProcess();
+    }
+
+
+}
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
index dedb08f4..6c151959 100644
--- 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
+++ 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
@@ -38,7 +38,7 @@ public void run() {
             try {
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
-                logger.error("Offset thread interrupted.", e);
+                logger.info("Offset thread interrupted.");
             }
 
             replicator.logPosition();
diff --git 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
index fd6555c0..27bb3755 100644
--- 
a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
+++ 
b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
@@ -109,6 +109,7 @@ private void initPositionFromMqTail() throws Exception {
                 nextPosition = js.getLong("nextPosition");
             }
         }
+        consumer.shutdown();
 
     }
 
diff --git a/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf 
b/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf
index 4a7a35f0..00878ce6 100644
--- a/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf
+++ b/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf
@@ -22,6 +22,8 @@ mysqlPassword=
 mqNamesrvAddr=
 mqTopic=
 
+#zkAddr=
+
 #startType=
 #binlogFilename=
 #nextPosition=


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add high-availability support for rocketmq-mysql-replicator
> -----------------------------------------------------------
>
>                 Key: ROCKETMQ-293
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-293
>             Project: Apache RocketMQ
>          Issue Type: New Feature
>          Components: rocketmq-externals
>            Reporter: Qun Zhao
>            Assignee: dongeforever
>            Priority: Minor
>
> Add high-availability support for rocketmq-mysql-replicator using zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to