场景:

flink消费kafka,然后过滤掉某种类型的数据,然后打印一条warn类型的数据。

在测试时,我往kafka写了一条会被过滤掉的数据,偶尔会正常打印一条warn,更多的时候会重复打印该条数据,类似重复消费该条数据。

然后我在warn之后写了一条print语句,这时候就能正常过滤,过滤一条就打印一次warn,不会出现过滤一条数据,重复打印warn。因为这会导致我后续正常的数据无法消费,不知道这是什么问题。
代码:
@Override
   Public boolean filter(Genericrecord record) throws Exception{
     String op_type=record.get("op_type")!=null ? 
record.get("op_type")!=null.toString():"-";
     if("D".equals(op_type)){
          logger.warn(record.toString())
        //System.out.println("过滤掉");
         return false;
        }return true;
 }
smq

回复