Repository: incubator-rocketmq-externals Updated Branches: refs/heads/release-rocketmq-mysql-1.1.0 [created] a0aeee629
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java new file mode 100644 index 0000000..fd6555c --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.position; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Set; +import javax.sql.DataSource; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.mysql.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BinlogPositionManager { + private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class); + + private DataSource dataSource; + private Config config; + + private String binlogFilename; + private Long nextPosition; + + public BinlogPositionManager(Config config, DataSource dataSource) { + this.config = config; + this.dataSource = dataSource; + } + + public void initBeginPosition() throws Exception { + + if (config.startType == null || config.startType.equals("DEFAULT")) { + initPositionDefault(); + + } else if (config.startType.equals("NEW_EVENT")) { + initPositionFromBinlogTail(); + + } else if (config.startType.equals("LAST_PROCESSED")) { + initPositionFromMqTail(); + + } else if (config.startType.equals("SPECIFIED")) { + binlogFilename = config.binlogFilename; + nextPosition = config.nextPosition; + + } + + if (binlogFilename == null || nextPosition == null) { + throw new Exception("binlogFilename | nextPosition is null."); + } + } + + private void initPositionDefault() throws Exception { + + try { + initPositionFromMqTail(); + } catch (Exception e) { + logger.error("Init position from mq error.", e); + } + + if (binlogFilename == null || nextPosition == null) { + initPositionFromBinlogTail(); + } + + } + + private void initPositionFromMqTail() throws Exception { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP"); + consumer.setNamesrvAddr(config.mqNamesrvAddr); + consumer.setMessageModel(MessageModel.valueOf("BROADCASTING")); + consumer.start(); + + Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(config.mqTopic); + MessageQueue queue = queues.iterator().next(); + + if (queue != null) { + Long offset = consumer.maxOffset(queue); + if (offset > 0) + offset--; + + PullResult pullResult = consumer.pull(queue, "*", offset, 100); + + if (pullResult.getPullStatus() == PullStatus.FOUND) { + MessageExt msg = pullResult.getMsgFoundList().get(0); + String json = new String(msg.getBody(), "UTF-8"); + + JSONObject js = JSON.parseObject(json); + binlogFilename = (String) js.get("binlogFilename"); + nextPosition = js.getLong("nextPosition"); + } + } + + } + + private void initPositionFromBinlogTail() throws SQLException { + String sql = "SHOW MASTER STATUS"; + + Connection conn = null; + ResultSet rs = null; + + try { + Connection connection = dataSource.getConnection(); + rs = connection.createStatement().executeQuery(sql); + + while (rs.next()) { + binlogFilename = rs.getString("File"); + nextPosition = rs.getLong("Position"); + } + + } finally { + + if (conn != null) { + conn.close(); + } + if (rs != null) { + rs.close(); + } + } + + } + + public String getBinlogFilename() { + return binlogFilename; + } + + public Long getPosition() { + return nextPosition; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java new file mode 100644 index 0000000..38aca7f --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.productor; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.mysql.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RocketMQProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class); + + private DefaultMQProducer producer; + private Config config; + + public RocketMQProducer(Config config) { + this.config = config; + } + + public void start() throws MQClientException { + producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP"); + producer.setNamesrvAddr(config.mqNamesrvAddr); + producer.start(); + } + + public long push(String json) throws Exception { + LOGGER.debug(json); + + Message message = new Message(config.mqTopic, json.getBytes("UTF-8")); + SendResult sendResult = producer.send(message); + + return sendResult.getQueueOffset(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java new file mode 100644 index 0000000..b8e8321 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import javax.sql.DataSource; +import org.apache.rocketmq.mysql.binlog.EventProcessor; +import org.apache.rocketmq.mysql.schema.column.ColumnParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Database { + private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class); + + private static final String SQL = "select table_name,column_name,data_type,column_type,character_set_name " + + "from information_schema.columns " + + "where table_schema = ?"; + private String name; + + private DataSource dataSource; + + private Map<String, Table> tableMap = new HashMap<String, Table>(); + + public Database(String name, DataSource dataSource) { + this.name = name; + this.dataSource = dataSource; + } + + public void init() throws SQLException { + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + conn = dataSource.getConnection(); + + ps = conn.prepareStatement(SQL); + ps.setString(1, name); + rs = ps.executeQuery(); + + while (rs.next()) { + String tableName = rs.getString(1); + String colName = rs.getString(2); + String dataType = rs.getString(3); + String colType = rs.getString(4); + String charset = rs.getString(5); + + ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset); + + if (!tableMap.containsKey(tableName)) { + addTable(tableName); + } + Table table = tableMap.get(tableName); + table.addCol(colName); + table.addParser(columnParser); + } + + } finally { + if (conn != null) { + conn.close(); + } + if (ps != null) { + ps.close(); + } + if (rs != null) { + rs.close(); + } + } + + } + + private void addTable(String tableName) { + + LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName); + + Table table = new Table(name, tableName); + tableMap.put(tableName, table); + } + + public Table getTable(String tableName) { + + return tableMap.get(tableName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java new file mode 100644 index 0000000..2baf2a2 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.sql.DataSource; +import org.apache.rocketmq.mysql.binlog.EventProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Schema { + private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class); + + private static final String SQL = "select schema_name from information_schema.schemata"; + + private static final List<String> IGNORED_DATABASES = new ArrayList<>( + Arrays.asList(new String[] {"information_schema", "mysql", "performance_schema", "sys"}) + ); + + private DataSource dataSource; + + private Map<String, Database> dbMap; + + public Schema(DataSource dataSource) { + this.dataSource = dataSource; + } + + public void load() throws SQLException { + + dbMap = new HashMap<>(); + + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + conn = dataSource.getConnection(); + + ps = conn.prepareStatement(SQL); + rs = ps.executeQuery(); + + while (rs.next()) { + String dbName = rs.getString(1); + if (!IGNORED_DATABASES.contains(dbName)) { + Database database = new Database(dbName, dataSource); + dbMap.put(dbName, database); + } + } + + } finally { + + if (conn != null) { + conn.close(); + } + if (ps != null) { + ps.close(); + } + if (rs != null) { + rs.close(); + } + } + + for (Database db : dbMap.values()) { + db.init(); + } + + } + + public Table getTable(String dbName, String tableName) { + + if (dbMap == null) { + reload(); + } + + Database database = dbMap.get(dbName); + if (database == null) { + return null; + } + + Table table = database.getTable(tableName); + if (table == null) { + return null; + } + + return table; + } + + private void reload() { + + while (true) { + try { + load(); + break; + } catch (Exception e) { + LOGGER.error("Reload schema error.", e); + } + } + } + + public void reset() { + dbMap = null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java new file mode 100644 index 0000000..54592a9 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema; + +import java.util.LinkedList; +import java.util.List; +import org.apache.rocketmq.mysql.schema.column.ColumnParser; + +public class Table { + private String database; + private String name; + private List<String> colList = new LinkedList<String>(); + private List<ColumnParser> parserList = new LinkedList<ColumnParser>(); + + public Table(String database, String table) { + this.database = database; + this.name = table; + } + + public void addCol(String column) { + colList.add(column); + } + + public void addParser(ColumnParser columnParser) { + parserList.add(columnParser); + } + + public List<String> getColList() { + return colList; + } + + public String getDatabase() { + return database; + } + + public String getName() { + return name; + } + + public List<ColumnParser> getParserList() { + return parserList; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java new file mode 100644 index 0000000..667db75 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +import java.math.BigInteger; + +public class BigIntColumnParser extends ColumnParser { + + private static BigInteger max = BigInteger.ONE.shiftLeft(64); + + private boolean signed; + + public BigIntColumnParser(String colType) { + this.signed = !colType.matches(".* unsigned$"); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof BigInteger) { + return value; + } + + Long l = (Long) value; + if (!signed && l < 0) { + return max.add(BigInteger.valueOf(l)); + } else { + return l; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java new file mode 100644 index 0000000..5f2920b --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public abstract class ColumnParser { + + public static ColumnParser getColumnParser(String dataType, String colType, String charset) { + + switch (dataType) { + case "tinyint": + case "smallint": + case "mediumint": + case "int": + return new IntColumnParser(dataType, colType); + case "bigint": + return new BigIntColumnParser(colType); + case "tinytext": + case "text": + case "mediumtext": + case "longtext": + case "varchar": + case "char": + return new StringColumnParser(charset); + case "date": + case "datetime": + case "timestamp": + return new DateTimeColumnParser(); + case "time": + return new TimeColumnParser(); + case "year": + return new YearColumnParser(); + case "enum": + return new EnumColumnParser(colType); + case "set": + return new SetColumnParser(colType); + default: + return new DefaultColumnParser(); + } + } + + public static String[] extractEnumValues(String colType) { + String[] enumValues = {}; + Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType); + if (matcher.matches()) { + enumValues = matcher.group(2).replace("'", "").split(","); + } + + return enumValues; + } + + public abstract Object getValue(Object value); + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java new file mode 100644 index 0000000..6b60abd --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; + +public class DateTimeColumnParser extends ColumnParser { + + private static SimpleDateFormat dateTimeFormat; + private static SimpleDateFormat dateTimeUtcFormat; + + static { + dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Timestamp) { + return dateTimeFormat.format(value); + } + + if (value instanceof Long) { + return dateTimeUtcFormat.format(new Date((Long) value)); + } + + return value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java new file mode 100644 index 0000000..46eb48e --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +import org.apache.commons.codec.binary.Base64; + +public class DefaultColumnParser extends ColumnParser { + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof byte[]) { + return Base64.encodeBase64String((byte[]) value); + } + + return value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java new file mode 100644 index 0000000..2942103 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +public class EnumColumnParser extends ColumnParser { + + private String[] enumValues; + + public EnumColumnParser(String colType) { + enumValues = extractEnumValues(colType); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof String) { + return value; + } + + Integer i = (Integer) value; + if (i == 0) { + return null; + } else { + return enumValues[i - 1]; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java new file mode 100644 index 0000000..96cf999 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +public class IntColumnParser extends ColumnParser { + + private int bits; + private boolean signed; + + public IntColumnParser(String dataType, String colType) { + + switch (dataType) { + case "tinyint": + bits = 8; + break; + case "smallint": + bits = 16; + break; + case "mediumint": + bits = 24; + break; + case "int": + bits = 32; + } + + this.signed = !colType.matches(".* unsigned$"); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Long) { + return value; + } + + if (value instanceof Integer) { + Integer i = (Integer) value; + if (signed || i > 0) { + return i; + } else { + return (1L << bits) + i; + } + } + + return value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java new file mode 100644 index 0000000..fb28c30 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +public class SetColumnParser extends ColumnParser { + + private String[] enumValues; + + public SetColumnParser(String colType) { + enumValues = extractEnumValues(colType); + } + + @Override + public Object getValue(Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value; + } + + StringBuilder builder = new StringBuilder(); + long l = (Long) value; + + boolean needSplit = false; + for (int i = 0; i < enumValues.length; i++) { + if (((l >> i) & 1) == 1) { + if (needSplit) + builder.append(","); + + builder.append(enumValues[i]); + needSplit = true; + } + } + + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java new file mode 100644 index 0000000..19068c9 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +import org.apache.commons.codec.Charsets; + +public class StringColumnParser extends ColumnParser { + + private String charset; + + public StringColumnParser(String charset) { + this.charset = charset.toLowerCase(); + } + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof String) { + return value; + } + + byte[] bytes = (byte[]) value; + + switch (charset) { + case "utf8": + case "utf8mb4": + return new String(bytes, Charsets.UTF_8); + case "latin1": + case "ascii": + return new String(bytes, Charsets.ISO_8859_1); + case "ucs2": + return new String(bytes, Charsets.UTF_16); + default: + return new String(bytes, Charsets.toCharset(charset)); + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java new file mode 100644 index 0000000..384b06e --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +import java.sql.Time; +import java.sql.Timestamp; + +public class TimeColumnParser extends ColumnParser { + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Timestamp) { + + return new Time(((Timestamp) value).getTime()); + } + + return value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java new file mode 100644 index 0000000..0419933 --- /dev/null +++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql.schema.column; + +import java.sql.Date; +import java.util.Calendar; + +public class YearColumnParser extends ColumnParser { + + @Override + public Object getValue(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Date) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime((Date) value); + return calendar.get(Calendar.YEAR); + } + + return value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/resources/logback.xml b/rocketmq-mysql/src/main/resources/logback.xml new file mode 100644 index 0000000..d4993de --- /dev/null +++ b/rocketmq-mysql/src/main/resources/logback.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<configuration> + + + <appender name="DefaultConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + + <appender name="DefaultFileAppender" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>./logs/rocketmq_mysql.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>./logs/rocketmq_mysql.%i.log + </fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>10</maxIndex> + </rollingPolicy> + <triggeringPolicy + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>100MB</maxFileSize> + </triggeringPolicy> + <encoder> + <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <appender name="PositionAppender" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>./logs/position.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>./logs/position.%i.log + </fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>10</maxIndex> + </rollingPolicy> + <triggeringPolicy + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>10MB</maxFileSize> + </triggeringPolicy> + <encoder> + <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <root> + <level value="INFO"/> + <appender-ref ref="DefaultConsoleAppender"/> + <appender-ref ref="DefaultFileAppender"/> + </root> + + <logger name="PositionLogger" additivity="false"> + <level value="INFO"/> + <appender-ref ref="PositionAppender"/> + </logger> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf b/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf new file mode 100644 index 0000000..4a7a35f --- /dev/null +++ b/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +mysqlAddr= +mysqlPort= +mysqlUsername= +mysqlPassword= + +mqNamesrvAddr= +mqTopic= + +#startType= +#binlogFilename= +#nextPosition= +#maxTransactionRows= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java new file mode 100644 index 0000000..ebf0926 --- /dev/null +++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql; + +import java.math.BigInteger; +import org.apache.rocketmq.mysql.schema.column.BigIntColumnParser; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class BigIntColumnParserTest { + + @Test + public void testBigInt() { + BigIntColumnParser parser = new BigIntColumnParser("bigint(20) unsigned"); + + BigInteger v1 = (BigInteger) parser.getValue(Long.MIN_VALUE); + BigInteger v2 = BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE); + assertEquals(v1, v2); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java new file mode 100644 index 0000000..5c40060 --- /dev/null +++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql; + +import org.apache.rocketmq.mysql.schema.column.EnumColumnParser; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +public class EnumColumnParserTest { + + @Test + public void testEnum() { + String colType = "enum('a','b','c','d')"; + + EnumColumnParser parser = new EnumColumnParser(colType); + String v = (String) parser.getValue(3); + assertEquals(v, "c"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java new file mode 100644 index 0000000..4972947 --- /dev/null +++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql; + +import org.apache.rocketmq.mysql.schema.column.IntColumnParser; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class IntColumnParserTest { + + @Test + public void testInt() { + IntColumnParser parser = new IntColumnParser("int", "int(10) unsigned"); + + Long v1 = (Long) parser.getValue(Integer.MIN_VALUE); + Long v2 = (long) Integer.MAX_VALUE + 1; + assertEquals(v1, v2); + } + + @Test + public void testSmallint() { + IntColumnParser parser = new IntColumnParser("smallint", "smallint(5) unsigned"); + + Long v1 = (Long) parser.getValue((int) Short.MIN_VALUE); + Long v2 = (long) (Short.MAX_VALUE + 1); + assertEquals(v1, v2); + } + + @Test + public void testTinyint() { + IntColumnParser parser = new IntColumnParser("tinyint", "tinyint(3) unsigned"); + + Long v1 = (Long) parser.getValue((int) Byte.MIN_VALUE); + Long v2 = (long) (Byte.MAX_VALUE + 1); + assertEquals(v1, v2); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java new file mode 100644 index 0000000..3fbf4ba --- /dev/null +++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mysql; + +import org.apache.rocketmq.mysql.schema.column.SetColumnParser; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SetColumnParserTest { + + @Test + public void testSet() { + String colType = "set('a','b','c','d')"; + + SetColumnParser parser = new SetColumnParser(colType); + String v = (String) parser.getValue(1001L); + assertEquals(v, "a,d"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/style/copyright/Apache.xml ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/style/copyright/Apache.xml b/rocketmq-mysql/style/copyright/Apache.xml new file mode 100644 index 0000000..e3e3dec --- /dev/null +++ b/rocketmq-mysql/style/copyright/Apache.xml @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<component name="CopyrightManager"> + <copyright> + <option name="myName" value="Apache" /> + <option name="notice" value="Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." /> + </copyright> +</component> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/style/copyright/profiles_settings.xml ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/style/copyright/profiles_settings.xml b/rocketmq-mysql/style/copyright/profiles_settings.xml new file mode 100644 index 0000000..747c7e2 --- /dev/null +++ b/rocketmq-mysql/style/copyright/profiles_settings.xml @@ -0,0 +1,64 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<component name="CopyrightManager"> + <settings default="Apache"> + <module2copyright> + <element module="All" copyright="Apache"/> + </module2copyright> + <LanguageOptions name="GSP"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="HTML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="JAVA"> + <option name="fileTypeOverride" value="3" /> + <option name="addBlankAfter" value="false" /> + </LanguageOptions> + <LanguageOptions name="JSP"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="JSPX"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="MXML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="Properties"> + <option name="fileTypeOverride" value="3"/> + <option name="block" value="false"/> + </LanguageOptions> + <LanguageOptions name="SPI"> + <option name="fileTypeOverride" value="3"/> + <option name="block" value="false"/> + </LanguageOptions> + <LanguageOptions name="XML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="__TEMPLATE__"> + <option name="separateBefore" value="true"/> + <option name="lenBefore" value="1"/> + </LanguageOptions> + </settings> +</component> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/style/rmq_checkstyle.xml ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/style/rmq_checkstyle.xml b/rocketmq-mysql/style/rmq_checkstyle.xml new file mode 100644 index 0000000..2872eb7 --- /dev/null +++ b/rocketmq-mysql/style/rmq_checkstyle.xml @@ -0,0 +1,134 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE module PUBLIC + "-//Puppy Crawl//DTD Check Configuration 1.3//EN" + "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> +<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding --> +<module name="Checker"> + + <property name="localeLanguage" value="en"/> + + <!--To configure the check to report on the first instance in each file--> + <module name="FileTabCharacter"/> + + <!-- header --> + <module name="RegexpHeader"> + <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="System\.out\.println"/> + <property name="message" value="Prohibit invoking System.out.println in source code !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="//FIXME"/> + <property name="message" value="Recommended fix FIXME task !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="//TODO"/> + <property name="message" value="Recommended fix TODO task !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="@alibaba"/> + <property name="message" value="Recommended remove @alibaba keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@taobao"/> + <property name="message" value="Recommended remove @taobao keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@author"/> + <property name="message" value="Recommended remove @author tag in javadoc!"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" + value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/> + <property name="message" value="Not allow chinese character !"/> + </module> + + <module name="FileLength"> + <property name="max" value="3000"/> + </module> + + <module name="TreeWalker"> + + <module name="UnusedImports"> + <property name="processJavadoc" value="true"/> + </module> + <module name="RedundantImport"/> + + <!--<module name="IllegalImport" />--> + + <!--Checks that classes that override equals() also override hashCode()--> + <module name="EqualsHashCode"/> + <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.--> + <module name="SimplifyBooleanExpression"/> + <module name="OneStatementPerLine"/> + <module name="UnnecessaryParentheses"/> + <!--Checks for over-complicated boolean return statements. For example the following code--> + <module name="SimplifyBooleanReturn"/> + + <!--Check that the default is after all the cases in producerGroup switch statement--> + <module name="DefaultComesLast"/> + <!--Detects empty statements (standalone ";" semicolon)--> + <module name="EmptyStatement"/> + <!--Checks that long constants are defined with an upper ell--> + <module name="UpperEll"/> + <module name="ConstantName"> + <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/> + </module> + <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property--> + <module name="LocalVariableName"/> + <!--Validates identifiers for local, final variables, including catch parameters--> + <module name="LocalFinalVariableName"/> + <!--Validates identifiers for non-static fields--> + <module name="MemberName"/> + <!--Validates identifiers for class type parameters--> + <module name="ClassTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <!--Validates identifiers for method type parameters--> + <module name="MethodTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <module name="PackageName"/> + <module name="ParameterName"/> + <module name="StaticVariableName"/> + <module name="TypeName"/> + <!--Checks that there are no import statements that use the * notation--> + <module name="AvoidStarImport"/> + + <!--whitespace--> + <module name="GenericWhitespace"/> + <module name="NoWhitespaceBefore"/> + <module name="NoWhitespaceAfter"/> + <module name="WhitespaceAround"> + <property name="allowEmptyConstructors" value="true"/> + <property name="allowEmptyMethods" value="true"/> + </module> + <module name="Indentation"/> + <module name="MethodParamPad"/> + <module name="ParenPad"/> + <module name="TypecastParenPad"/> + </module> +</module> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/a0aeee62/rocketmq-mysql/style/rmq_codeStyle.xml ---------------------------------------------------------------------- diff --git a/rocketmq-mysql/style/rmq_codeStyle.xml b/rocketmq-mysql/style/rmq_codeStyle.xml new file mode 100644 index 0000000..7c7ce54 --- /dev/null +++ b/rocketmq-mysql/style/rmq_codeStyle.xml @@ -0,0 +1,143 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<code_scheme name="rocketmq"> + <option name="USE_SAME_INDENTS" value="true"/> + <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/> + <option name="OTHER_INDENT_OPTIONS"> + <value> + <option name="INDENT_SIZE" value="4"/> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + <option name="TAB_SIZE" value="4"/> + <option name="USE_TAB_CHARACTER" value="false"/> + <option name="SMART_TABS" value="false"/> + <option name="LABEL_INDENT_SIZE" value="0"/> + <option name="LABEL_INDENT_ABSOLUTE" value="false"/> + <option name="USE_RELATIVE_INDENTS" value="false"/> + </value> + </option> + <option name="PREFER_LONGER_NAMES" value="false"/> + <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/> + <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/> + <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND"> + <value/> + </option> + <option name="IMPORT_LAYOUT_TABLE"> + <value> + <package name="" withSubpackages="true" static="false"/> + <emptyLine/> + <package name="" withSubpackages="true" static="true"/> + </value> + </option> + <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/> + <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/> + <option name="JD_P_AT_EMPTY_LINES" value="false"/> + <option name="JD_KEEP_INVALID_TAGS" value="false"/> + <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="true"/> + <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/> + <option name="LABELED_STATEMENT_WRAP" value="1"/> + <option name="WRAP_COMMENTS" value="true"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <JavaCodeStyleSettings> + <option name="CLASS_NAMES_IN_JAVADOC" value="3"/> + </JavaCodeStyleSettings> + <XML> + <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/> + </XML> + <ADDITIONAL_INDENT_OPTIONS fileType="haml"> + <option name="INDENT_SIZE" value="2"/> + </ADDITIONAL_INDENT_OPTIONS> + <codeStyleSettings language="Groovy"> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="HOCON"> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + </codeStyleSettings> + <codeStyleSettings language="JAVA"> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/> + <option name="LABELED_STATEMENT_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="JSON"> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + </codeStyleSettings> + <codeStyleSettings language="Scala"> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="INDENT_SIZE" value="4"/> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + <option name="TAB_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="XML"> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> +</code_scheme> \ No newline at end of file