Kostas Kloudas created FLINK-6578: ------------------------------------- Summary: SharedBuffer creates self-loops when having elements with same value/timestamp. Key: FLINK-6578 URL: https://issues.apache.org/jira/browse/FLINK-6578 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.3.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.3.0
This is a test that fails with the current implementation due to the fact that the looping state accepts the two {{middleEvent1}} elements but the shared buffer cannot distinguish between them and gets trapped in an infinite loop leading to running out of memory. {code} @Test public void testEagerZeroOrMoreSameElement() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); Event startEvent = new Event(40, "c", 1.0); Event middleEvent1 = new Event(41, "a", 2.0); Event middleEvent2 = new Event(42, "a", 3.0); Event middleEvent3 = new Event(43, "a", 4.0); Event end1 = new Event(44, "b", 5.0); inputEvents.add(new StreamRecord<>(startEvent, 1)); inputEvents.add(new StreamRecord<>(middleEvent1, 3)); inputEvents.add(new StreamRecord<>(middleEvent1, 3)); inputEvents.add(new StreamRecord<>(middleEvent1, 3)); inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5)); inputEvents.add(new StreamRecord<>(middleEvent3, 6)); inputEvents.add(new StreamRecord<>(middleEvent3, 6)); inputEvents.add(new StreamRecord<>(end1, 7)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }); NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1), Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1), Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1), Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1), Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1), Lists.newArrayList(startEvent, middleEvent1, end1), Lists.newArrayList(startEvent, end1) )); } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)