[ https://issues.apache.org/jira/browse/FLINK-10577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhanghao updated FLINK-10577: ----------------------------- Description: I think greedy operator has some problem. Given the below java code: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple3<Integer, Long, String>> input = env.fromElements( Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "r"), Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "p"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:01").getTime(), "p"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:03").getTime(), "p"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:04").getTime(), "p"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:05").getTime(), "c"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:08").getTime(), "c"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:11").getTime(), "a") ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, Long, String>>(Time.seconds(2)) { private static final long serialVersionUID = 1L; @Override public long extractTimestamp(Tuple3<Integer, Long, String> element) { return element.f1; } }); AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent(); Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("start", strategy) .where(new SimpleCondition<Tuple3<Integer, Long, String>>() { private static final long serialVersionUID = 1L; @Override public boolean filter(Tuple3<Integer, Long, String> e) { return e.f2.equals("r") ? true : false; } }).followedBy("middle").where(new SimpleCondition<Tuple3<Integer, Long, String>>() { private static final long serialVersionUID = 1L; @Override public boolean filter(Tuple3<Integer, Long, String> e) throws Exception { return !e.f2.equals("r") ? true : false; } }).oneOrMore().greedy() .within(Time.seconds(10)); CEP.pattern(input.keyBy(0), pattern) .select(new PatternSelectFunction<Tuple3<Integer, Long, String>, String>() { private static final long serialVersionUID = 1L; @Override public String select(Map<String, List<Tuple3<Integer, Long, String>>> pattern) { StringBuilder builder = new StringBuilder(); List<Tuple3<Integer, Long, String>> start = pattern.get("start"); List<Tuple3<Integer, Long, String>> middle = pattern.get("middle"); for (Tuple3<Integer, Long, String> t : start) { builder.append(t.f0).append(","); } for (Tuple3<Integer, Long, String> t : middle) { builder.append(t.f0).append(","); } return builder.toString(); } }) .print(); env.execute();{code} I would like to see:100,100,100,100,100,100 however it matches 100,100 I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping some partial matches,it also matches 100,100. Is there something important about greedy operator that i misunderstood? was: I think greedy operator has some problem. Given the below java code: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple3<Integer, Long, String>> input = env.fromElements( Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "r"), Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "p"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:01").getTime(), "p"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:03").getTime(), "p"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:04").getTime(), "p"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:05").getTime(), "c"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:08").getTime(), "c"), Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:11").getTime(), "a") ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, Long, String>>(Time.seconds(2)) { private static final long serialVersionUID = 1L; @Override public long extractTimestamp(Tuple3<Integer, Long, String> element) { return element.f1; } }); AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent(); Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("start", strategy) .where(new SimpleCondition<Tuple3<Integer, Long, String>>() { private static final long serialVersionUID = 1L; @Override public boolean filter(Tuple3<Integer, Long, String> e) { return e.f2.equals("r") ? true : false; } }).followedBy("middle").where(new SimpleCondition<Tuple3<Integer, Long, String>>() { private static final long serialVersionUID = 1L; @Override public boolean filter(Tuple3<Integer, Long, String> e) throws Exception { return !e.f2.equals("r") ? true : false; } }).oneOrMore().greedy() .within(Time.seconds(10)); CEP.pattern(input.keyBy(0), pattern) .select(new PatternSelectFunction<Tuple3<Integer, Long, String>, String>() { private static final long serialVersionUID = 1L; @Override public String select(Map<String, List<Tuple3<Integer, Long, String>>> pattern) { StringBuilder builder = new StringBuilder(); List<Tuple3<Integer, Long, String>> start = pattern.get("start"); List<Tuple3<Integer, Long, String>> middle = pattern.get("middle"); for (Tuple3<Integer, Long, String> t : start) { builder.append(t.f0).append(","); } for (Tuple3<Integer, Long, String> t : middle) { builder.append(t.f0).append(","); } return builder.toString(); } }) .print(); env.execute();{code} I would like to see:100,100,100,100,100,100 however it matches 100,100 I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping some partial matches,it still failed,it also matches 100,100. Is there something important about greedy operator that i misunderstood? > CEP's greedy() doesn't work > --------------------------- > > Key: FLINK-10577 > URL: https://issues.apache.org/jira/browse/FLINK-10577 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.6.0 > Reporter: zhanghao > Priority: Major > > I think greedy operator has some problem. > Given the below java code: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream<Tuple3<Integer, Long, String>> input = env.fromElements( > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:00").getTime(), "r"), > Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 > 13:00:00").getTime(), "p"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:01").getTime(), "p"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:03").getTime(), "p"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:04").getTime(), "p"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:05").getTime(), "c"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:08").getTime(), "c"), > Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 > 13:00:11").getTime(), "a") > ).assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, Long, > String>>(Time.seconds(2)) { > private static final long serialVersionUID = 1L; > @Override > public long extractTimestamp(Tuple3<Integer, Long, String> element) { > return element.f1; > } > }); > AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent(); > Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, > Long, String>>begin("start", strategy) > .where(new SimpleCondition<Tuple3<Integer, Long, String>>() { > private static final long serialVersionUID = 1L; > @Override > public boolean filter(Tuple3<Integer, Long, String> e) { > return e.f2.equals("r") ? true : false; > } > }).followedBy("middle").where(new SimpleCondition<Tuple3<Integer, Long, > String>>() { > private static final long serialVersionUID = 1L; > @Override > public boolean filter(Tuple3<Integer, Long, String> e) throws Exception { > return !e.f2.equals("r") ? true : false; > } > }).oneOrMore().greedy() > .within(Time.seconds(10)); > CEP.pattern(input.keyBy(0), pattern) > .select(new PatternSelectFunction<Tuple3<Integer, Long, String>, String>() { > private static final long serialVersionUID = 1L; > @Override > public String select(Map<String, List<Tuple3<Integer, Long, String>>> > pattern) { > StringBuilder builder = new StringBuilder(); > List<Tuple3<Integer, Long, String>> start = pattern.get("start"); > List<Tuple3<Integer, Long, String>> middle = pattern.get("middle"); > for (Tuple3<Integer, Long, String> t : start) { > builder.append(t.f0).append(","); > } > for (Tuple3<Integer, Long, String> t : middle) { > builder.append(t.f0).append(","); > } > return builder.toString(); > } > }) > .print(); > env.execute();{code} > I would like to see:100,100,100,100,100,100 > however it matches 100,100 > I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping > some partial matches,it also matches 100,100. > Is there something important about greedy operator that i misunderstood? -- This message was sent by Atlassian JIRA (v7.6.3#76005)