目标是用flink作业实现类似canal server的功能

CREATE TABLE `binlog_table` (

                                `id` INT,

                                `name` STRING,

                                `sys_id` STRING,

                                `sequence` INT,

                                `filter` STRING,

                                `tag` STRING,

                                `remark` STRING,

                                `create_date` TIMESTAMP,

                                `update_date` TIMESTAMP,

                                `reserve` STRING,

                                `sys_name` STRING,

                                `metric_seq` INT,

                                `advanced_function` STRING,

                                `value_type` STRING,

                                `value_field` STRING,

                                `status` INT,

                                `syn_date` TIMESTAMP,

                                `confirmer` STRING,

                                `confirm_time` TIMESTAMP,

                                `index_explain` STRING,

                                `field_name` STRING,

                                `tag_values` STRING,

                                PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

      'connector' = 'mysql-cdc',

      'hostname' = '${mysql.hostname}',

      'port' = '3306',

      'username' = '${mysql.username}',

      'password' = '${mysql.password}',

      'database-name' = '${mysql.database}',

      'table-name' = '${mysql.table}'

      );




CREATE TABLE `kafka_sink` (

                              `id` INT,

                              `name` STRING,

                              `sys_id` STRING,

                              `sequence` INT,

                              `filter` STRING,

                              `tag` STRING,

                              `remark` STRING,

                              `create_date` TIMESTAMP,

                              `update_date` TIMESTAMP,

                              `reserve` STRING,

                              `sys_name` STRING,

                              `metric_seq` INT,

                              `advanced_function` STRING,

                              `value_type` STRING,

                              `value_field` STRING,

                              `status` INT,

                              `syn_date` TIMESTAMP,

                              `confirmer` STRING,

                              `confirm_time` TIMESTAMP,

                              `index_explain` STRING,

                              `field_name` STRING,

                              `tag_values` STRING,

                              PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

      'connector' = 'kafka',

      'topic' = '${topic}',

      'properties.bootstrap.servers' = '${bootstrap.servers}',

      'format' = 'canal-json'

      );




INSERT INTO `kafka_sink`

    (SELECT *

     FROM `binlog_table`);

出来的结果是这样:


{
"data": [
{
"id": 3,
"name": "自动付款接口BuyETC金额",
"sys_id": "0184",
"sequence": 2,
"filter": "(a=1)",
"tag": "MerId(商户号)",
"remark": "O",
"create_date": "2020-11-02 15:01:31",
"update_date": "2021-04-07 09:23:59",
"reserve": "",
"sys_name": "NHL",
"metric_seq": 0,
"advanced_function": "",
"value_type": "sum",
"value_field": "value",
"status": 1,
"syn_date": "2021-01-28 19:31:36",
"confirmer": null,
"confirm_time": null,
"index_explain": "aa",
"field_name": null,
"tag_values": null
}
],
"type": "INSERT"
}
并不是标准的canal json格式。改用upsert-kafka connector试了也不行



CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, 
`sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` 
TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, 
`metric_seq` INT, `advanced_function` STRING, `value_type` STRING, 
`value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, 
`confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, 
`tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 
'upsert-kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = 
'${bootstrap.servers}', 'key.format' = 'json',
'value.format' = 'json' );


出来的数据长这样



{"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06
 12:27:30","update_date":"2021-04-06 
12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07
 
16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null}


回复