Hi, 可以尝试使用: json.ignore-parse-errors[1] 来忽略解析的报错,需要注意这个参数会忽略所有的解析错误
[1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#json-ignore-parse-errors Best, Weihua On Mon, Feb 20, 2023 at 10:14 AM casel.chen <casel_c...@126.com> wrote: > 日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了 > > > > > > > > > > > > > > > > > > 在 2023-02-20 09:58:56,"Shengkai Fang" <fskm...@gmail.com> 写道: > >Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗? > > > >Best, > >Shengkai > > > >casel.chen <casel_c...@126.com> 于2023年2月9日周四 12:03写道: > > > >> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致 > >> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal > >> json格式解析时直接忽略不识别的type,例如 > >> 例1: > >> > {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE > >> TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby` > >> varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT > >> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp > NOT > >> NULL DEFAULT '0000-00-00 00:00:00', `updatedby` varchar(255) DEFAULT > >> NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) > DEFAULT > >> NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) > DEFAULT > >> NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext, > >> `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) > DEFAULT > >> NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT > >> NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255) > >> DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB > DEFAULT > >> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null} > >> > >> > >> 例2: > >> { > >> "action":"ALTER", > >> "before":[], > >> "bid":0, > >> "data":[], > >> "db":"db_test", > >> "dbValType":{ > >> "col1":"varchar(22)", > >> "col2":"varchar(22)", > >> "col_pk":"varchar(22)" > >> }, > >> "ddl":true, > >> "entryType":"ROWDATA", > >> "execTs":1669789188000, > >> "jdbcType":{ > >> "col1":12, > >> "col2":12, > >> "col_pk":12 > >> }, > >> "pks":[], > >> "schema":"db_test", > >> "sendTs":1669789189533, > >> "sql":"alter table table_test add col2 varchar(22) null", > >> "table":"table_test", > >> "tableChanges":{ > >> "table":{ > >> "columns":[ > >> { > >> "jdbcType":12, // jdbc 类型。 > >> "name":"col1", // 字段名称。 > >> "position":0, // 字段的顺序。 > >> "typeExpression":"varchar(22)", // 类型描述。 > >> "typeName":"varchar" // 类型名称。 > >> }, > >> { > >> "jdbcType":12, > >> "name":"col2", > >> "position":1, > >> "typeExpression":"varchar(22)", > >> "typeName":"varchar" > >> }, > >> { > >> "jdbcType":12, > >> "name":"col_pk", > >> "position":2, > >> "typeExpression":"varchar(22)", > >> "typeName":"varchar" > >> } > >> ], > >> "primaryKeyColumnNames":["col_pk"] // 主键名列表。 > >> }, > >> "type":"ALTER" > >> } > >> } >