大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List集合,但是遇到的问题是每次都需要输入第四条数据才会触发Pattern的select函数去打印List。
具体实现代码如下:
public class Run3 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        final DataStream<String> source = env.socketTextStream("localhost", 
8888)
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<String>forMonotonousTimestamps()
                                .withTimestampAssigner((String s, long ts) -> 
System.currentTimeMillis())
                )
                .keyBy(s -> s);
        source.print("source ");
        final Pattern<String, String> pattern = Pattern.<String>begin("begin", 
AfterMatchSkipStrategy.skipPastLastEvent())
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String s) throws Exception {
                        return true;
                    }
                }).times(3);
        final PatternStream<String> patternStream = CEP.pattern(source, 
pattern);
        patternStream.select(new PatternSelectFunction<String, Object>() {
            @Override
            public Object select(Map<String, List<String>> pattern) {
                return pattern.get("begin");
            }
        }).print("result ");
        env.execute();
    }
}

环境如下:
Flink 1.12.2
OS:Windows 10
编程工具:IDEA 2021.1.2
使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间

运行结果如下所示:
[cid:image001.png@01D75E43.4DB77F10]

回复