你kafka里面的是json么?format是json么?
String resultCreateTableSql = createKafkaSourceSQL +" WITH ( " +" 'connector' = 'kafka' ," +" 'topic' = '" + kafkaTopic +"'," +" 'properties.bootstrap.servers' = '" +
kafkaBootstrapServers +"'," +" 'properties.group.id' = '" + kafkaGroupId +"'," +" 'format' = '" + kafkaFormat +"'," +" 'scan.startup.mode' = '" +
scanStartUpMode +"'," +" 'json.fail-on-missing-field' = 'false'," +" 'json.ignore-parse-errors' = 'true' )";
json.fail-on-missing-field
json.ignore-parse-errors
这两个参数你加了么?加了没用?
在 2020/8/18 14:34, 赵一旦 写道:
我刚刚接触flinksql,主要是感觉这个问题很明显,大家如果生产中使用的话,应该都已经有方案才对,但是好像没啥人有回应。
shizk233 <wangwangdaxian...@gmail.com> 于2020年8月18日周二 下午2:26写道:
考虑修改一下json解析的逻辑来处理异常数据?
赵一旦 <hinobl...@gmail.com> 于2020年8月18日周二 上午11:59写道:
有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
api,然后捕获所有异常即可。
赵一旦 <hinobl...@gmail.com> 于2020年8月17日周一 下午7:15写道:
kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。
现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。