Hi,
可以尝试使用: json.ignore-parse-errors[1] 来忽略解析的报错,需要注意这个参数会忽略所有的解析错误

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#json-ignore-parse-errors

Best,
Weihua


On Mon, Feb 20, 2023 at 10:14 AM casel.chen <casel_c...@126.com> wrote:

> 日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-20 09:58:56,"Shengkai Fang" <fskm...@gmail.com> 写道:
> >Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
> >
> >Best,
> >Shengkai
> >
> >casel.chen <casel_c...@126.com> 于2023年2月9日周四 12:03写道:
> >
> >> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> >> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> >> json格式解析时直接忽略不识别的type,例如
> >> 例1:
> >>
> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
> >> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
> >> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
> >> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp
> NOT
> >> NULL DEFAULT '0000-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
> >> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255)
> DEFAULT
> >> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255)
> DEFAULT
> >> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
> >> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255)
> DEFAULT
> >> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
> >> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
> >> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB
> DEFAULT
> >> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
> >>
> >>
> >> 例2:
> >> {
> >>     "action":"ALTER",
> >>     "before":[],
> >>     "bid":0,
> >>     "data":[],
> >>     "db":"db_test",
> >>     "dbValType":{
> >>         "col1":"varchar(22)",
> >>         "col2":"varchar(22)",
> >>         "col_pk":"varchar(22)"
> >>     },
> >>     "ddl":true,
> >>     "entryType":"ROWDATA",
> >>     "execTs":1669789188000,
> >>     "jdbcType":{
> >>         "col1":12,
> >>         "col2":12,
> >>         "col_pk":12
> >>     },
> >>     "pks":[],
> >>     "schema":"db_test",
> >>     "sendTs":1669789189533,
> >>     "sql":"alter table table_test add col2 varchar(22) null",
> >>     "table":"table_test",
> >>     "tableChanges":{
> >>         "table":{
> >>             "columns":[
> >>                 {
> >>                     "jdbcType":12, // jdbc 类型。
> >>                     "name":"col1",    // 字段名称。
> >>                     "position":0,  // 字段的顺序。
> >>                     "typeExpression":"varchar(22)", // 类型描述。
> >>                     "typeName":"varchar" // 类型名称。
> >>                 },
> >>                 {
> >>                     "jdbcType":12,
> >>                     "name":"col2",
> >>                     "position":1,
> >>                     "typeExpression":"varchar(22)",
> >>                     "typeName":"varchar"
> >>                 },
> >>                 {
> >>                     "jdbcType":12,
> >>                     "name":"col_pk",
> >>                     "position":2,
> >>                     "typeExpression":"varchar(22)",
> >>                     "typeName":"varchar"
> >>                 }
> >>             ],
> >>             "primaryKeyColumnNames":["col_pk"] // 主键名列表。
> >>         },
> >>         "type":"ALTER"
> >>     }
> >> }
>

回复