目标是用flink作业实现类似canal server的功能
CREATE TABLE `binlog_table` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '${mysql.hostname}', 'port' = '3306', 'username' = '${mysql.username}', 'password' = '${mysql.password}', 'database-name' = '${mysql.database}', 'table-name' = '${mysql.table}' ); CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = '${bootstrap.servers}', 'format' = 'canal-json' ); INSERT INTO `kafka_sink` (SELECT * FROM `binlog_table`); 出来的结果是这样: { "data": [ { "id": 3, "name": "自动付款接口BuyETC金额", "sys_id": "0184", "sequence": 2, "filter": "(a=1)", "tag": "MerId(商户号)", "remark": "O", "create_date": "2020-11-02 15:01:31", "update_date": "2021-04-07 09:23:59", "reserve": "", "sys_name": "NHL", "metric_seq": 0, "advanced_function": "", "value_type": "sum", "value_field": "value", "status": 1, "syn_date": "2021-01-28 19:31:36", "confirmer": null, "confirm_time": null, "index_explain": "aa", "field_name": null, "tag_values": null } ], "type": "INSERT" } 并不是标准的canal json格式。改用upsert-kafka connector试了也不行 CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' = 'json', 'value.format' = 'json' ); 出来的数据长这样 {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06 12:27:30","update_date":"2021-04-06 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null}