Hi fanchao

Yes. I suggest that.

Jake

> On Aug 25, 2020, at 11:20 AM, 范超 <fanc...@mgtv.com> wrote:
> 
> Thanks Jake. But can I just want to  implement the ouput-tag function in my 
> flatmap function not in the process function. I check the parameters for the 
> flatmap ,there is no ‘context’, so is it means I’ve to use process to rewrite 
> my flatmap function?
>  
> 发件人: Jake [mailto:ft20...@qq.com] 
> 发送时间: 2020年8月25日 星期二 11:06
> 收件人: 范超 <fanc...@mgtv.com>
> 抄送: user <user@flink.apache.org>
> 主题: Re: How to sink invalid data from flatmap
>  
> Hi fanchao
>  
> use side output, see[1]
>  
> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html>
>  
> Jake
> 
> 
> On Aug 25, 2020, at 10:54 AM, 范超 <fanc...@mgtv.com <mailto:fanc...@mgtv.com>> 
> wrote:
>  
> Hi, 
> I’m using the custom flatmap function to validate the kafka json string 
> message, if the kafka message is valid to transform to a pojo  (using GSON), 
> then go on with the next sink step.
> If it can not be parsed as a POJO, the GSON will throw the 
> “com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I 
> just catch this exception, and then go on, but this invalidated json message 
> is just omitted.
>  
> But now, I want to save the invalidated json message to sink the original 
> kafka json string to another table, but don’t know how to implement in my 
> custom flatmap function, because the richmapfucntion has limited the collect 
> type.
> Could someone give me some advice please?
> Thanks in advance!
> Chao Fan

Reply via email to