Repository: flink Updated Branches: refs/heads/master 1649f3520 -> ffca0e76e
[FLINK-6265] [cep] Fix consecutive() for times() Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffca0e76 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffca0e76 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffca0e76 Branch: refs/heads/master Commit: ffca0e76eaaefe6251a7664b93c4ec007e4305e9 Parents: 1649f35 Author: kl0u <kklou...@gmail.com> Authored: Tue Apr 4 18:23:50 2017 +0200 Committer: kl0u <kklou...@gmail.com> Committed: Wed Apr 5 17:20:01 2017 +0200 ---------------------------------------------------------------------- .../flink/cep/nfa/compiler/NFACompiler.java | 1 - .../org/apache/flink/cep/nfa/NFAITCase.java | 136 ++++++++++++++++++- 2 files changed, 133 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ffca0e76/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index e441c4b..e8077fa 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -266,7 +266,6 @@ public class NFACompiler { for (int i = 0; i < times - 1; i++) { lastSink = createSingletonState( lastSink, - currentPattern instanceof FollowedByPattern && !currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)); } return createSingletonState(lastSink, currentPattern instanceof FollowedByPattern); http://git-wip-us.apache.org/repos/asf/flink/blob/ffca0e76/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index da5f413..7359ca8 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -1336,9 +1336,10 @@ public class NFAITCase extends TestLogger { } } - assertEquals(1, allPatterns.size()); + assertEquals(2, allPatterns.size()); assertEquals(Sets.<Set<Event>>newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1) + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), + Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1) ), resultingPatterns); } @@ -1363,7 +1364,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).followedBy("end1").where(new SimpleCondition<Event>() { + }).times(2).consecutive().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1396,6 +1397,135 @@ public class NFAITCase extends TestLogger { } @Test + public void testTimesNonStrictWithNext() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + + @Test + public void testTimesStrictWithFollowedBy() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).consecutive().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent2, ConsecutiveData.end) + )); + } + + @Test + public void testTimesStrictWithNextAndConsecutive() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).consecutive().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList()); + } + + @Test public void testStartWithOptional() { List<StreamRecord<Event>> inputEvents = new ArrayList<>();