日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了

















在 2023-02-20 09:58:56,"Shengkai Fang" <fskm...@gmail.com> 写道:
>Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
>
>Best,
>Shengkai
>
>casel.chen <casel_c...@126.com> 于2023年2月9日周四 12:03写道:
>
>> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
>> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
>> json格式解析时直接忽略不识别的type,例如
>> 例1:
>> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
>> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
>> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
>> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT
>> NULL DEFAULT '0000-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
>> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255) DEFAULT
>> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255) DEFAULT
>> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
>> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255) DEFAULT
>> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
>> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
>> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT
>> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
>>
>>
>> 例2:
>> {
>>     "action":"ALTER",
>>     "before":[],
>>     "bid":0,
>>     "data":[],
>>     "db":"db_test",
>>     "dbValType":{
>>         "col1":"varchar(22)",
>>         "col2":"varchar(22)",
>>         "col_pk":"varchar(22)"
>>     },
>>     "ddl":true,
>>     "entryType":"ROWDATA",
>>     "execTs":1669789188000,
>>     "jdbcType":{
>>         "col1":12,
>>         "col2":12,
>>         "col_pk":12
>>     },
>>     "pks":[],
>>     "schema":"db_test",
>>     "sendTs":1669789189533,
>>     "sql":"alter table table_test add col2 varchar(22) null",
>>     "table":"table_test",
>>     "tableChanges":{
>>         "table":{
>>             "columns":[
>>                 {
>>                     "jdbcType":12, // jdbc 类型。
>>                     "name":"col1",    // 字段名称。
>>                     "position":0,  // 字段的顺序。
>>                     "typeExpression":"varchar(22)", // 类型描述。
>>                     "typeName":"varchar" // 类型名称。
>>                 },
>>                 {
>>                     "jdbcType":12,
>>                     "name":"col2",
>>                     "position":1,
>>                     "typeExpression":"varchar(22)",
>>                     "typeName":"varchar"
>>                 },
>>                 {
>>                     "jdbcType":12,
>>                     "name":"col_pk",
>>                     "position":2,
>>                     "typeExpression":"varchar(22)",
>>                     "typeName":"varchar"
>>                 }
>>             ],
>>             "primaryKeyColumnNames":["col_pk"] // 主键名列表。
>>         },
>>         "type":"ALTER"
>>     }
>> }

Reply via email to