[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16575696#comment-16575696
 ] 

Jeff Zhang commented on FLINK-10119:
------------------------------------

[~CrestOfWave] Please use English to create ticket. 

> 存在数据非json格式,使用KafkaJsonTableSource的话,job无法拉起。
> ---------------------------------------------
>
>                 Key: FLINK-10119
>                 URL: https://issues.apache.org/jira/browse/FLINK-10119
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.5.1
>         Environment: 无
>            Reporter: sean.miao
>            Priority: Major
>
> 开启checkpoint和savepoint,同时开启了job的自动拉起。
> flink从kafka消费数据,使用的是Kafka010JsonTableSource。发现只要有一条数据非json格式,就会导致应用挂掉无法拉起。
> 当前,这仅是满足了处理语义,但是导致应用不可以用就不太好了吧。能不能改成像spark 
> sql一样,不满足格式的数据,增加到一个专门存储无法解析的数据的列里面。
>  
> 我们目前的做法是
> JsonRowDeserializationSchema
> @Override
> public Row deserialize(byte[] message) throws IOException {
>  try {
>  final JsonNode root = objectMapper.readTree(message);
>  return convertRow(root, (RowTypeInfo) typeInfo);
>  } catch (Throwable t) {
>  throw new IOException("Failed to deserialize JSON object.", t);
>  }
> }
> catch 里抛异常改成了传入一个 “{}”,会使得所有不能解析数据给所有列返回空值。



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to