Hi, 明启,我也遇到了类似的问题,会不会是因为并行度的问题导致?
-----邮件原件----- 发件人: 明启 孙 [mailto:374060...@qq.com] 发送时间: 2020年9月15日 星期二 10:45 收件人: user-zh <user-zh@flink.apache.org> 主题: flink RichFilterFunction重复过滤一条数据 场景: flink消费kafka,然后过滤掉某种类型的数据,然后打印一条warn类型的数据。 在测试时,我往kafka写了一条会被过滤掉的数据,偶尔会正常打印一条warn,更多的时候会重复打印该条数据,类似重复消费该条数据。 然后我在warn之后写了一条print语句,这时候就能正常过滤,过滤一条就打印一次warn,不会出现过滤一条数据,重复打印warn。因为这会导致我后续正常的数据无法消费,不知道这是什么问题。 代码: @Override Public boolean filter(Genericrecord record) throws Exception{ String op_type=record.get("op_type")!=null ? record.get("op_type")!=null.toString():"-"; if("D".equals(op_type)){ logger.warn(record.toString()) //System.out.println("过滤掉"); return false; }return true; } smq