Hi fanchao use side output, see[1]
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html> Jake > On Aug 25, 2020, at 10:54 AM, 范超 <fanc...@mgtv.com> wrote: > > Hi, > I’m using the custom flatmap function to validate the kafka json string > message, if the kafka message is valid to transform to a pojo (using GSON), > then go on with the next sink step. > If it can not be parsed as a POJO, the GSON will throw the > “com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I > just catch this exception, and then go on, but this invalidated json message > is just omitted. > > But now, I want to save the invalidated json message to sink the original > kafka json string to another table, but don’t know how to implement in my > custom flatmap function, because the richmapfucntion has limited the collect > type. > Could someone give me some advice please? > Thanks in advance! > Chao Fan