Repository: incubator-rocketmq-externals Updated Branches: refs/heads/master fdf7fc4a3 -> 343ab198e
Prepare release mysql replicator 1.1.0 version Closes #26 from zhaoqun911/mysql-develop. Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/343ab198 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/343ab198 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/343ab198 Branch: refs/heads/master Commit: 343ab198eb7987e50588d8209ded6a4db5e9de83 Parents: fdf7fc4 Author: zhaoqun007 <9...@zhaoqun911.cn> Authored: Mon Aug 14 19:54:09 2017 +0800 Committer: yukon <yu...@apache.org> Committed: Tue Aug 22 22:03:21 2017 +0800 ---------------------------------------------------------------------- rocketmq-mysql/LICENSE-BIN | 4 +- rocketmq-mysql/pom.xml | 8 +- .../apache/rocketmq/mysql/binlog/DataRow.java | 12 +- .../rocketmq/mysql/binlog/EventListener.java | 33 +++- .../rocketmq/mysql/binlog/EventProcessor.java | 196 ++++++++----------- .../rocketmq/mysql/binlog/Transaction.java | 10 +- .../mysql/position/BinlogPositionManager.java | 11 +- .../schema/column/DateTimeColumnParser.java | 15 +- 8 files changed, 144 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/LICENSE-BIN ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN index 5349fbd..22b0aa4 100644 --- a/rocketmq-mysql/LICENSE-BIN +++ b/rocketmq-mysql/LICENSE-BIN @@ -297,5 +297,5 @@ contains test data from http://aspell.net/test/orig/batch0.tab. Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org) ------ -This product has a bundle open-replicator, which is available under the ASL2 License. -The source code of open-replicator can be found at https://github.com/whitesock/open-replicator. +This product has a bundle mysql-binlog-connector-java, which is available under the ASL2 License. +The source code of mysql-binlog-connector-java can be found at https://github.com/shyiko/mysql-binlog-connector-java. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml index c09826c..23e7468 100644 --- a/rocketmq-mysql/pom.xml +++ b/rocketmq-mysql/pom.xml @@ -6,7 +6,7 @@ <groupId>org.apache</groupId> <artifactId>rocketmq-mysql-replicator</artifactId> - <version>1.0.0</version> + <version>1.1.0</version> <scm> <url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</url> @@ -82,9 +82,9 @@ <version>${rocketmq.version}</version> </dependency> <dependency> - <groupId>com.zendesk</groupId> - <artifactId>open-replicator</artifactId> - <version>1.6.0</version> + <groupId>com.github.shyiko</groupId> + <artifactId>mysql-binlog-connector-java</artifactId> + <version>0.12.1</version> </dependency> <dependency> <groupId>mysql</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java index 772ffd5..646c018 100644 --- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java @@ -17,8 +17,7 @@ package org.apache.rocketmq.mysql.binlog; -import com.google.code.or.common.glossary.Column; -import com.google.code.or.common.glossary.Row; +import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,9 +31,9 @@ public class DataRow { private String type; private Table table; - private Row row; + private Serializable[] row; - public DataRow(String type, Table table, Row row) { + public DataRow(String type, Table table, Serializable[] row) { this.type = type; this.table = table; this.row = row; @@ -43,14 +42,13 @@ public class DataRow { public Map toMap() { try { - if (table.getColList().size() == row.getColumns().size()) { + if (table.getColList().size() == row.length) { Map<String, Object> dataMap = new HashMap<>(); List<String> keyList = table.getColList(); List<ColumnParser> parserList = table.getParserList(); - List<Column> valueList = row.getColumns(); for (int i = 0; i < keyList.size(); i++) { - Object value = valueList.get(i).getValue(); + Object value = row[i]; ColumnParser parser = parserList.get(i); dataMap.put(keyList.get(i), parser.getValue(value)); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java index cea36a0..b5005bc 100644 --- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java @@ -17,20 +17,21 @@ package org.apache.rocketmq.mysql.binlog; -import com.google.code.or.binlog.BinlogEventListener; -import com.google.code.or.binlog.BinlogEventV4; +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.Event; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -public class EventListener implements BinlogEventListener { - private BlockingQueue<BinlogEventV4> queue; +public class EventListener implements BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener { - public EventListener(BlockingQueue<BinlogEventV4> queue) { + private BlockingQueue<Event> queue; + + public EventListener(BlockingQueue<Event> queue) { this.queue = queue; } @Override - public void onEvents(BinlogEventV4 event) { + public void onEvent(Event event) { try { while (true) { if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) { @@ -41,4 +42,24 @@ public class EventListener implements BinlogEventListener { e.printStackTrace(); } } + + @Override + public void onConnect(BinaryLogClient client) { + + } + + @Override + public void onCommunicationFailure(BinaryLogClient client, Exception e) { + + } + + @Override + public void onEventDeserializationFailure(BinaryLogClient client, Exception e) { + + } + + @Override + public void onDisconnect(BinaryLogClient client) { + + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java index ba35d3e..a730403 100644 --- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java @@ -18,20 +18,17 @@ package org.apache.rocketmq.mysql.binlog; import com.alibaba.druid.pool.DruidDataSourceFactory; -import com.google.code.or.OpenReplicator; -import com.google.code.or.binlog.BinlogEventV4; -import com.google.code.or.binlog.impl.event.DeleteRowsEvent; -import com.google.code.or.binlog.impl.event.DeleteRowsEventV2; -import com.google.code.or.binlog.impl.event.QueryEvent; -import com.google.code.or.binlog.impl.event.TableMapEvent; -import com.google.code.or.binlog.impl.event.UpdateRowsEvent; -import com.google.code.or.binlog.impl.event.UpdateRowsEventV2; -import com.google.code.or.binlog.impl.event.WriteRowsEvent; -import com.google.code.or.binlog.impl.event.WriteRowsEventV2; -import com.google.code.or.binlog.impl.event.XidEvent; -import com.google.code.or.common.glossary.Pair; -import com.google.code.or.common.glossary.Row; -import com.google.code.or.common.util.MySQLConstants; +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import com.github.shyiko.mysql.binlog.event.XidEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,9 +56,9 @@ public class EventProcessor { private BinlogPositionManager binlogPositionManager; - private BlockingQueue<BinlogEventV4> queue = new LinkedBlockingQueue<>(100); + private BlockingQueue<Event> queue = new LinkedBlockingQueue<>(100); - private OpenReplicator openReplicator; + private BinaryLogClient binaryLogClient; private EventListener eventListener; @@ -88,19 +85,22 @@ public class EventProcessor { schema.load(); eventListener = new EventListener(queue); - openReplicator = new OpenReplicator(); - openReplicator.setBinlogEventListener(eventListener); - openReplicator.setHost(config.mysqlAddr); - openReplicator.setPort(config.mysqlPort); - openReplicator.setUser(config.mysqlUsername); - openReplicator.setPassword(config.mysqlPassword); - openReplicator.setStopOnEOF(false); - openReplicator.setHeartbeatPeriod(1f); - openReplicator.setLevel2BufferSize(50 * 1024 * 1024); - openReplicator.setServerId(1001); - openReplicator.setBinlogFileName(binlogPositionManager.getBinlogFilename()); - openReplicator.setBinlogPosition(binlogPositionManager.getPosition()); - openReplicator.start(); + binaryLogClient = new BinaryLogClient(config.mysqlAddr, + config.mysqlPort, + config.mysqlUsername, + config.mysqlPassword); + binaryLogClient.setBlocking(true); + binaryLogClient.setServerId(1001); + + EventDeserializer eventDeserializer = new EventDeserializer(); + eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, + EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY); + binaryLogClient.setEventDeserializer(eventDeserializer); + binaryLogClient.registerEventListener(eventListener); + binaryLogClient.setBinlogFilename(binlogPositionManager.getBinlogFilename()); + binaryLogClient.setBinlogPosition(binlogPositionManager.getPosition()); + + binaryLogClient.connect(3000); LOGGER.info("Started."); @@ -112,46 +112,37 @@ public class EventProcessor { while (true) { try { - BinlogEventV4 event = queue.poll(100, TimeUnit.MILLISECONDS); + Event event = queue.poll(1000, TimeUnit.MILLISECONDS); if (event == null) { checkConnection(); continue; } switch (event.getHeader().getEventType()) { - case MySQLConstants.TABLE_MAP_EVENT: + case TABLE_MAP: processTableMapEvent(event); break; - case MySQLConstants.WRITE_ROWS_EVENT: + case WRITE_ROWS: + case EXT_WRITE_ROWS: processWriteEvent(event); break; - case MySQLConstants.WRITE_ROWS_EVENT_V2: - processWriteEventV2(event); - break; - - case MySQLConstants.UPDATE_ROWS_EVENT: + case UPDATE_ROWS: + case EXT_UPDATE_ROWS: processUpdateEvent(event); break; - case MySQLConstants.UPDATE_ROWS_EVENT_V2: - processUpdateEventV2(event); - break; - - case MySQLConstants.DELETE_ROWS_EVENT: + case DELETE_ROWS: + case EXT_DELETE_ROWS: processDeleteEvent(event); break; - case MySQLConstants.DELETE_ROWS_EVENT_V2: - processDeleteEventV2(event); - break; - - case MySQLConstants.QUERY_EVENT: + case QUERY: processQueryEvent(event); break; - case MySQLConstants.XID_EVENT: + case XID: processXidEvent(event); break; @@ -165,86 +156,54 @@ public class EventProcessor { private void checkConnection() throws Exception { - if (!openReplicator.isRunning()) { + if (!binaryLogClient.isConnected()) { BinlogPosition binlogPosition = replicator.getNextBinlogPosition(); if (binlogPosition != null) { - openReplicator.setBinlogFileName(binlogPosition.getBinlogFilename()); - openReplicator.setBinlogPosition(binlogPosition.getPosition()); + binaryLogClient.setBinlogFilename(binlogPosition.getBinlogFilename()); + binaryLogClient.setBinlogPosition(binlogPosition.getPosition()); } - openReplicator.start(); + binaryLogClient.connect(3000); } } - private void processTableMapEvent(BinlogEventV4 event) { - TableMapEvent tableMapEvent = (TableMapEvent) event; - String dbName = tableMapEvent.getDatabaseName().toString(); - String tableName = tableMapEvent.getTableName().toString(); - Long tableId = tableMapEvent.getTableId(); + private void processTableMapEvent(Event event) { + TableMapEventData data = event.getData(); + String dbName = data.getDatabase(); + String tableName = data.getTable(); + Long tableId = data.getTableId(); Table table = schema.getTable(dbName, tableName); tableMap.put(tableId, table); } - private void processWriteEvent(BinlogEventV4 event) { - WriteRowsEvent writeRowsEvent = (WriteRowsEvent) event; - Long tableId = writeRowsEvent.getTableId(); - List<Row> list = writeRowsEvent.getRows(); + private void processWriteEvent(Event event) { + WriteRowsEventData data = event.getData(); + Long tableId = data.getTableId(); + List<Serializable[]> list = data.getRows(); - for (Row row : list) { + for (Serializable[] row : list) { addRow("WRITE", tableId, row); } } - private void processWriteEventV2(BinlogEventV4 event) { - WriteRowsEventV2 writeRowsEventV2 = (WriteRowsEventV2) event; - Long tableId = writeRowsEventV2.getTableId(); - List<Row> list = writeRowsEventV2.getRows(); - - for (Row row : list) { - addRow("WRITE", tableId, row); - } - - } - - private void processUpdateEvent(BinlogEventV4 event) { - UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) event; - Long tableId = updateRowsEvent.getTableId(); - List<Pair<Row>> list = updateRowsEvent.getRows(); - - for (Pair<Row> pair : list) { - addRow("UPDATE", tableId, pair.getAfter()); - } - } - - private void processUpdateEventV2(BinlogEventV4 event) { - UpdateRowsEventV2 updateRowsEventV2 = (UpdateRowsEventV2) event; - Long tableId = updateRowsEventV2.getTableId(); - List<Pair<Row>> list = updateRowsEventV2.getRows(); - - for (Pair<Row> pair : list) { - addRow("UPDATE", tableId, pair.getAfter()); - } - } - - private void processDeleteEvent(BinlogEventV4 event) { - DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) event; - Long tableId = deleteRowsEvent.getTableId(); - List<Row> list = deleteRowsEvent.getRows(); + private void processUpdateEvent(Event event) { + UpdateRowsEventData data = event.getData(); + Long tableId = data.getTableId(); + List<Map.Entry<Serializable[], Serializable[]>> list = data.getRows(); - for (Row row : list) { - addRow("DELETE", tableId, row); + for (Map.Entry<Serializable[], Serializable[]> entry : list) { + addRow("UPDATE", tableId, entry.getValue()); } - } - private void processDeleteEventV2(BinlogEventV4 event) { - DeleteRowsEventV2 deleteRowsEventV2 = (DeleteRowsEventV2) event; - Long tableId = deleteRowsEventV2.getTableId(); - List<Row> list = deleteRowsEventV2.getRows(); + private void processDeleteEvent(Event event) { + DeleteRowsEventData data = event.getData(); + Long tableId = data.getTableId(); + List<Serializable[]> list = data.getRows(); - for (Row row : list) { + for (Serializable[] row : list) { addRow("DELETE", tableId, row); } @@ -253,20 +212,22 @@ public class EventProcessor { private static Pattern createTablePattern = Pattern.compile("^(CREATE|ALTER)\\s+TABLE", Pattern.CASE_INSENSITIVE); - private void processQueryEvent(BinlogEventV4 event) { - QueryEvent queryEvent = (QueryEvent) event; - String sql = queryEvent.getSql().toString(); + private void processQueryEvent(Event event) { + QueryEventData data = event.getData(); + String sql = data.getSql(); if (createTablePattern.matcher(sql).find()) { schema.reset(); } } - private void processXidEvent(BinlogEventV4 event) { - XidEvent xidEvent = (XidEvent) event; - String binlogFilename = xidEvent.getBinlogFilename(); - Long position = xidEvent.getHeader().getNextPosition(); - Long xid = xidEvent.getXid(); + private void processXidEvent(Event event) { + EventHeaderV4 header = event.getHeader(); + XidEventData data = event.getData(); + + String binlogFilename = binaryLogClient.getBinlogFilename(); + Long position = header.getNextPosition(); + Long xid = data.getXid(); BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position); transaction.setNextBinlogPosition(binlogPosition); @@ -274,14 +235,13 @@ public class EventProcessor { replicator.commit(transaction, true); - transaction = new Transaction(this); - + transaction = new Transaction(config); } - private void addRow(String type, Long tableId, Row row) { + private void addRow(String type, Long tableId, Serializable[] row) { if (transaction == null) { - transaction = new Transaction(this); + transaction = new Transaction(config); } Table t = tableMap.get(tableId); @@ -294,7 +254,7 @@ public class EventProcessor { } else { transaction.setNextBinlogPosition(replicator.getNextBinlogPosition()); replicator.commit(transaction, false); - transaction = new Transaction(this); + transaction = new Transaction(config); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java index 9656a04..396815a 100644 --- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.mysql.binlog; import com.alibaba.fastjson.JSONObject; -import com.google.code.or.common.glossary.Row; +import java.io.Serializable; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -31,17 +31,15 @@ public class Transaction { private BinlogPosition nextBinlogPosition; private Long xid; - private EventProcessor eventProcessor; private Config config; private List<DataRow> list = new LinkedList<>(); - public Transaction(EventProcessor eventProcessor) { - this.eventProcessor = eventProcessor; - this.config = eventProcessor.getConfig(); + public Transaction(Config config) { + this.config = config; } - public boolean addRow(String type, Table table, Row row) { + public boolean addRow(String type, Table table, Serializable[] row) { if (list.size() == config.maxTransactionRows) { return false; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java index bf621b5..fd6555c 100644 --- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java @@ -31,8 +31,12 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.mysql.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class BinlogPositionManager { + private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class); + private DataSource dataSource; private Config config; @@ -67,7 +71,12 @@ public class BinlogPositionManager { } private void initPositionDefault() throws Exception { - initPositionFromMqTail(); + + try { + initPositionFromMqTail(); + } catch (Exception e) { + logger.error("Init position from mq error.", e); + } if (binlogFilename == null || nextPosition == null) { initPositionFromBinlogTail(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/343ab198/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java index 97339d8..6b60abd 100644 --- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java @@ -19,10 +19,19 @@ package org.apache.rocketmq.mysql.schema.column; import java.sql.Timestamp; import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; public class DateTimeColumnParser extends ColumnParser { - private static SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + private static SimpleDateFormat dateTimeFormat; + private static SimpleDateFormat dateTimeUtcFormat; + + static { + dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } @Override public Object getValue(Object value) { @@ -35,6 +44,10 @@ public class DateTimeColumnParser extends ColumnParser { return dateTimeFormat.format(value); } + if (value instanceof Long) { + return dateTimeUtcFormat.format(new Date((Long) value)); + } + return value; } }