大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List集合,但是遇到的问题是每次都需要输入第四条数据才会触发Pattern的select函数去打印List。 具体实现代码如下: public class Run3 { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final DataStream<String> source = env.socketTextStream("localhost", 8888) .assignTimestampsAndWatermarks( WatermarkStrategy.<String>forMonotonousTimestamps() .withTimestampAssigner((String s, long ts) -> System.currentTimeMillis()) ) .keyBy(s -> s); source.print("source "); final Pattern<String, String> pattern = Pattern.<String>begin("begin", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new SimpleCondition<String>() { @Override public boolean filter(String s) throws Exception { return true; } }).times(3); final PatternStream<String> patternStream = CEP.pattern(source, pattern); patternStream.select(new PatternSelectFunction<String, Object>() { @Override public Object select(Map<String, List<String>> pattern) { return pattern.get("begin"); } }).print("result "); env.execute(); } }
环境如下: Flink 1.12.2 OS:Windows 10 编程工具:IDEA 2021.1.2 使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间 运行结果如下所示: [cid:image001.png@01D75E43.4DB77F10]