Hi, 明启,我也遇到了类似的问题,会不会是因为并行度的问题导致?


-----邮件原件-----
发件人: 明启 孙 [mailto:374060...@qq.com] 
发送时间: 2020年9月15日 星期二 10:45
收件人: user-zh <user-zh@flink.apache.org>
主题: flink RichFilterFunction重复过滤一条数据

场景:

flink消费kafka,然后过滤掉某种类型的数据,然后打印一条warn类型的数据。

在测试时,我往kafka写了一条会被过滤掉的数据,偶尔会正常打印一条warn,更多的时候会重复打印该条数据,类似重复消费该条数据。

然后我在warn之后写了一条print语句,这时候就能正常过滤,过滤一条就打印一次warn,不会出现过滤一条数据,重复打印warn。因为这会导致我后续正常的数据无法消费,不知道这是什么问题。
代码:
@Override
   Public boolean filter(Genericrecord record) throws Exception{
     String op_type=record.get("op_type")!=null ? 
record.get("op_type")!=null.toString():"-";
     if("D".equals(op_type)){
          logger.warn(record.toString())
        //System.out.println("过滤掉");
         return false;
        }return true;
 }
smq

回复