roncenzhao created FLINK-16334: ---------------------------------- Summary: flink-sql kafka-connector support ignore the invalid data during parsing bytes to json row Key: FLINK-16334 URL: https://issues.apache.org/jira/browse/FLINK-16334 Project: Flink Issue Type: Wish Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.10.0 Environment: flink1.10+kafka+json Reporter: roncenzhao
We found that, if we create table like this: {code:java} CREATE TABLE MyUserTable ( id BIGINT, name STRING ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'test_topic', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.zookeeper.connect' = 'xxx', 'connector.properties.group.id' = 'g_test', --'connector.startup-mode' = 'earliest-offset', --'connector.startup-mode' = 'latest-offset', 'connector.startup-mode' = 'group-offsets', 'format.type' = 'json', 'format.fail-on-missing-field' = 'false' ); {code} If execute `select * from MyUserTable` and the current row is not json type, the job will be failed and the offset of the consumer group will be reset to the latest offset. I think we should add some configuration like 'format.fail-on-missing-field' e.g 'format.fail-on-invalid-json' to ignore current invalid row. Looking forward to your reply! -- This message was sent by Atlassian Jira (v8.3.4#803005)