http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index ccae848..825ba957 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -46,9 +46,9 @@ public class NFAITCase extends TestLogger { public void testSimplePatternNFA() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); - Event startEvent = new Event(42, "start", 1.0); + Event startEvent = new Event(41, "start", 1.0); SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); - Event endEvent= new Event(43, "end", 1.0); + Event endEvent = new Event(43, "end", 1.0); inputEvents.add(new StreamRecord<Event>(startEvent, 1)); inputEvents.add(new StreamRecord<Event>(new Event(43, "foobar", 1.0), 2)); @@ -102,6 +102,99 @@ public class NFAITCase extends TestLogger { assertEquals(endEvent, patternMap.get("end")); } + @Test + public void testStrictContinuityWithResults() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event middleEvent1 = new Event(41, "a", 2.0); + Event end = new Event(42, "b", 4.0); + + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(end, 5)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).next("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(1, allPatterns.size()); + assertEquals(Sets.<Set<Event>>newHashSet( + Sets.newHashSet(middleEvent1, end) + ), resultingPatterns); + } + + @Test + public void testStrictContinuityNoResults() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "c", 3.0); + Event end = new Event(43, "b", 4.0); + + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(end, 5)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).next("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + } + } + + assertEquals(Sets.newHashSet(), resultingPatterns); + } + /** * Tests that the NFA successfully filters out expired elements with respect to the window * length @@ -327,6 +420,1247 @@ public class NFAITCase extends TestLogger { ), patterns); } + @Test + public void testComplexBranchingAfterKleeneStar() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + Event end2 = new Event(45, "d", 6.0); + Event end3 = new Event(46, "d", 7.0); + Event end4 = new Event(47, "e", 8.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<>(end1, 6)); + inputEvents.add(new StreamRecord<>(end2, 7)); + inputEvents.add(new StreamRecord<>(end3, 8)); + inputEvents.add(new StreamRecord<>(end4, 9)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }) + .followedBy("end2").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }) + .followedBy("end3").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("e"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(16, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4), + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end2, end4), + Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end2, end4), + Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end2, end4), + Sets.newHashSet(startEvent, middleEvent1, end1, end2, end4), + Sets.newHashSet(startEvent, middleEvent2, end1, end2, end4), + Sets.newHashSet(startEvent, middleEvent3, end1, end2, end4), + Sets.newHashSet(startEvent, end1, end2, end4), + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4), + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end3, end4), + Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end3, end4), + Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end3, end4), + Sets.newHashSet(startEvent, middleEvent1, end1, end3, end4), + Sets.newHashSet(startEvent, middleEvent2, end1, end3, end4), + Sets.newHashSet(startEvent, middleEvent3, end1, end3, end4), + Sets.newHashSet(startEvent, end1, end3, end4) + ), resultingPatterns); + } + + @Test + public void testKleeneStar() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(4, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), + Sets.newHashSet(startEvent, middleEvent1, end1), + Sets.newHashSet(startEvent, middleEvent2, end1), + Sets.newHashSet(startEvent, end1) + ), resultingPatterns); + } + + @Test + public void testEagerKleeneStar() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore(true).followedBy("end1").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(4, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), + Sets.newHashSet(startEvent, middleEvent1, end1), + Sets.newHashSet(startEvent, end1) + ), resultingPatterns); + } + + + @Test + public void testBeginWithKleeneStar() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event middleEvent1 = new Event(40, "a", 2.0); + Event middleEvent2 = new Event(41, "a", 3.0); + Event middleEvent3 = new Event(41, "a", 3.0); + Event end = new Event(42, "b", 4.0); + + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<>(end, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore().followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(7, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3, end), + Sets.newHashSet(middleEvent1, middleEvent2, end), + Sets.newHashSet(middleEvent2, middleEvent3, end), + Sets.newHashSet(middleEvent1, end), + Sets.newHashSet(middleEvent2, end), + Sets.newHashSet(middleEvent3, end), + Sets.newHashSet(end) + ), resultingPatterns); + } + + @Test + public void testKleeneStarAfterKleeneStar() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "d", 3.0); + Event middleEvent3 = new Event(43, "d", 4.0); + Event end = new Event(44, "e", 4.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<>(end, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle-first").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore(false).followedBy("middle-second").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("e"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(8, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end), + Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end), + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end), + Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end), + Sets.newHashSet(startEvent, middleEvent3, end), + Sets.newHashSet(startEvent, middleEvent2, end), + Sets.newHashSet(startEvent, middleEvent1, end), + Sets.newHashSet(startEvent, end) + ), resultingPatterns); + } + + @Test + public void testKleeneStarAfterBranching() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event merging = new Event(42, "f", 3.0); + Event kleene1 = new Event(43, "d", 4.0); + Event kleene2 = new Event(44, "d", 4.0); + Event end = new Event(45, "e", 4.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(merging, 5)); + inputEvents.add(new StreamRecord<>(kleene1, 6)); + inputEvents.add(new StreamRecord<>(kleene2, 7)); + inputEvents.add(new StreamRecord<>(end, 8)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("branching").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("merging").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("f"); + } + }).followedBy("kleene").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("e"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(8, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1, merging, end), + Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, end), + Sets.newHashSet(startEvent, middleEvent1, merging, kleene2, end), + Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, kleene2, end), + Sets.newHashSet(startEvent, middleEvent2, merging, end), + Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, end), + Sets.newHashSet(startEvent, middleEvent2, merging, kleene2, end), + Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, kleene2, end) + ), resultingPatterns); + } + + @Test + public void testStrictContinuityNoResultsAfterKleeneStar() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event start = new Event(40, "d", 2.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 2.0); + Event middleEvent3 = new Event(43, "c", 3.0); + Event end = new Event(44, "b", 4.0); + + inputEvents.add(new StreamRecord<>(start, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(middleEvent3, 4)); + inputEvents.add(new StreamRecord<>(end, 5)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore() + .next("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + } + } + + assertEquals(Sets.newHashSet(), resultingPatterns); + } + + @Test + public void testStrictContinuityResultsAfterKleeneStar() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event start = new Event(40, "d", 2.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 2.0); + Event end = new Event(43, "b", 4.0); + + inputEvents.add(new StreamRecord<>(start, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(end, 5)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore(false) + .next("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(2, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(start, middleEvent1, middleEvent2, end), + Sets.newHashSet(start, middleEvent2, end) + ), resultingPatterns); + } + + @Test + public void testAtLeastOne() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(3, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), + Sets.newHashSet(startEvent, middleEvent1, end1), + Sets.newHashSet(startEvent, middleEvent2, end1) + ), resultingPatterns); + } + + @Test + public void testBeginWithAtLeastOne() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent1 = new Event(41, "a", 2.0); + Event startEvent2 = new Event(42, "a", 3.0); + Event startEvent3 = new Event(42, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent1, 3)); + inputEvents.add(new StreamRecord<>(startEvent2, 4)); + inputEvents.add(new StreamRecord<>(startEvent3, 5)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore(false).followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(7, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent1, startEvent2, startEvent3, end1), + Sets.newHashSet(startEvent1, startEvent2, end1), + Sets.newHashSet(startEvent1, startEvent3, end1), + Sets.newHashSet(startEvent2, startEvent3, end1), + Sets.newHashSet(startEvent1, end1), + Sets.newHashSet(startEvent2, end1), + Sets.newHashSet(startEvent3, end1) + ), resultingPatterns); + } + + @Test + public void testNextZeroOrMore() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "start", 1.0); + Event middleEvent1 = new Event(40, "middle", 2.0); + Event middleEvent2 = new Event(40, "middle", 3.0); + Event middleEvent3 = new Event(40, "middle", 4.0); + Event endEvent = new Event(46, "end", 1.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1L)); + inputEvents.add(new StreamRecord<>(new Event(1, "event", 1.0), 2L)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3L)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4L)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5L)); + inputEvents.add(new StreamRecord<>(endEvent, 6L)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).next("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("middle"); + } + }).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(1, allPatterns.size()); + assertEquals(Sets.<Set<Event>>newHashSet( + Sets.newHashSet(startEvent, endEvent) + ), resultingPatterns); + } + + @Test + public void testAtLeastOneEager() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore(true).followedBy("end1").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(3, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), + Sets.newHashSet(startEvent, middleEvent1, end1) + ), resultingPatterns); + } + + @Test + public void testOptional() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent, 5)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).optional().followedBy("end1").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(2, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent, end1), + Sets.newHashSet(startEvent, end1) + ), resultingPatterns); + } + + + @Test + public void testTimes() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(middleEvent3, 4)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).followedBy("end1").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(1, allPatterns.size()); + assertEquals(Sets.<Set<Event>>newHashSet( + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1) + ), resultingPatterns); + } + + @Test + public void testStartWithTimes() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(middleEvent3, 4)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).followedBy("end1").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(2, allPatterns.size()); + assertEquals(Sets.<Set<Event>>newHashSet( + Sets.newHashSet(middleEvent1, middleEvent2, end1), + Sets.newHashSet(middleEvent2, middleEvent3, end1) + ), resultingPatterns); + } + + @Test + public void testStartWithOptional() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).optional().followedBy("end1").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(2, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, end1), + Sets.newHashSet(end1) + ), resultingPatterns); + } + + @Test + public void testEndWithZeroOrMore() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore(); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(4, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3), + Sets.newHashSet(startEvent, middleEvent1, middleEvent2), + Sets.newHashSet(startEvent, middleEvent1), + Sets.newHashSet(startEvent) + ), resultingPatterns); + } + + @Test + public void testStartAndEndWithZeroOrMore() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "d", 5.0); + Event end2 = new Event(45, "d", 5.0); + Event end3 = new Event(46, "d", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<>(end1, 6)); + inputEvents.add(new StreamRecord<>(end2, 6)); + inputEvents.add(new StreamRecord<>(end3, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore(); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(6, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3), + Sets.newHashSet(middleEvent1, middleEvent2), + Sets.newHashSet(middleEvent1), + Sets.newHashSet(middleEvent2, middleEvent3), + Sets.newHashSet(middleEvent2), + Sets.newHashSet(middleEvent3) + ), resultingPatterns); + } + + @Test + public void testEndWithOptional() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).optional(); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(2, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1), + Sets.newHashSet(startEvent) + ), resultingPatterns); + } + + @Test + public void testEndWithOneOrMore() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore(); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + Set<Set<Event>> resultingPatterns = new HashSet<>(); + List<Collection<Event>> allPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> foundPattern : patterns) { + resultingPatterns.add(new HashSet<>(foundPattern.values())); + allPatterns.add(foundPattern.values()); + } + } + + assertEquals(3, allPatterns.size()); + assertEquals(Sets.newHashSet( + Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3), + Sets.newHashSet(startEvent, middleEvent1, middleEvent2), + Sets.newHashSet(startEvent, middleEvent1) + ), resultingPatterns); + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index 9f65132..40a0e7e 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -21,6 +21,7 @@ package org.apache.flink.cep.nfa; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.cep.Event; +import org.apache.flink.cep.pattern.FilterFunctions; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -51,12 +52,12 @@ public class NFATest extends TestLogger { streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L)); streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L)); - State<Event> startingState = new State<>("", State.StateType.Start); - State<Event> startState = new State<>("start", State.StateType.Normal); - State<Event> endState = new State<>("end", State.StateType.Final); - StateTransition<Event> starting2Start = new StateTransition<>( - StateTransitionAction.TAKE, - startState, + State<Event> startState = new State<>("start", State.StateType.Start); + State<Event> endState = new State<>("end", State.StateType.Normal); + State<Event> endingState = new State<>("", State.StateType.Final); + + startState.addTake( + endState, new FilterFunction<Event>() { private static final long serialVersionUID = -4869589195918650396L; @@ -64,12 +65,9 @@ public class NFATest extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - } - ); - - StateTransition<Event> start2End = new StateTransition<>( - StateTransitionAction.TAKE, - endState, + }); + endState.addTake( + endingState, new FilterFunction<Event>() { private static final long serialVersionUID = 2979804163709590673L; @@ -77,18 +75,12 @@ public class NFATest extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("end"); } - } - ); - - StateTransition<Event> start2Start = new StateTransition<>(StateTransitionAction.IGNORE, startState, null); - - startingState.addStateTransition(starting2Start); - startState.addStateTransition(start2End); - startState.addStateTransition(start2Start); + }); + endState.addIgnore(FilterFunctions.<Event>trueFunction()); - nfa.addState(startingState); nfa.addState(startState); nfa.addState(endState); + nfa.addState(endingState); Set<Map<String, Event>> expectedPatterns = new HashSet<>(); @@ -196,8 +188,10 @@ public class NFATest extends TestLogger { public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) { Set<Map<String, T>> actualPatterns = new HashSet<>(); - for (StreamRecord<T> streamEvent: inputs) { - Collection<Map<String, T>> matchedPatterns = nfa.process(streamEvent.getValue(), streamEvent.getTimestamp()).f0; + for (StreamRecord<T> streamEvent : inputs) { + Collection<Map<String, T>> matchedPatterns = nfa.process( + streamEvent.getValue(), + streamEvent.getTimestamp()).f0; actualPatterns.addAll(matchedPatterns); } @@ -213,24 +207,12 @@ public class NFATest extends TestLogger { State<Event> startState = new State<>("start", State.StateType.Normal); State<Event> endState = new State<>("end", State.StateType.Final); - StateTransition<Event> starting2Start = new StateTransition<>( - StateTransitionAction.TAKE, - startState, - new NameFilter("start")); - StateTransition<Event> start2End = new StateTransition<>( - StateTransitionAction.TAKE, - endState, + startingState.addTake( + new NameFilter("start")); + startState.addTake( new NameFilter("end")); - - StateTransition<Event> start2Start = new StateTransition<>( - StateTransitionAction.IGNORE, - startState, - null); - - startingState.addStateTransition(starting2Start); - startState.addStateTransition(start2End); - startState.addStateTransition(start2Start); + startState.addIgnore(null); nfa.addState(startingState); nfa.addState(startState); @@ -253,12 +235,12 @@ public class NFATest extends TestLogger { private NFA<Event> createStartEndNFA(long windowLength) { NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), windowLength, false); - State<Event> startingState = new State<>("", State.StateType.Start); - State<Event> startState = new State<>("start", State.StateType.Normal); - State<Event> endState = new State<>("end", State.StateType.Final); - StateTransition<Event> starting2Start = new StateTransition<>( - StateTransitionAction.TAKE, - startState, + State<Event> startState = new State<>("start", State.StateType.Start); + State<Event> endState = new State<>("end", State.StateType.Normal); + State<Event> endingState = new State<>("", State.StateType.Final); + + startState.addTake( + endState, new FilterFunction<Event>() { private static final long serialVersionUID = -4869589195918650396L; @@ -267,10 +249,8 @@ public class NFATest extends TestLogger { return value.getName().equals("start"); } }); - - StateTransition<Event> start2End = new StateTransition<>( - StateTransitionAction.TAKE, - endState, + endState.addTake( + endingState, new FilterFunction<Event>() { private static final long serialVersionUID = 2979804163709590673L; @@ -279,19 +259,11 @@ public class NFATest extends TestLogger { return value.getName().equals("end"); } }); + endState.addIgnore(FilterFunctions.<Event>trueFunction()); - StateTransition<Event> start2Start = new StateTransition<>( - StateTransitionAction.IGNORE, - startState, - null); - - startingState.addStateTransition(starting2Start); - startState.addStateTransition(start2End); - startState.addStateTransition(start2Start); - - nfa.addState(startingState); nfa.addState(startState); nfa.addState(endState); + nfa.addState(endingState); return nfa; } http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index d11f3a8..93d78cc 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -18,9 +18,11 @@ package org.apache.flink.cep.nfa.compiler; +import com.google.common.collect.Sets; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.cep.Event; import org.apache.flink.cep.SubEvent; @@ -28,22 +30,45 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.State; import org.apache.flink.cep.nfa.StateTransition; import org.apache.flink.cep.nfa.StateTransitionAction; +import org.apache.flink.cep.pattern.MalformedPatternException; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.util.TestLogger; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; -import static org.junit.Assert.assertTrue; +import static com.google.common.collect.Sets.newHashSet; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class NFACompilerTest extends TestLogger { + private static final FilterFunction<Event> startFilter = new FilterFunction<Event>() { + private static final long serialVersionUID = 3314714776170474221L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() > 2; + } + }; + + private static final FilterFunction<Event> endFilter = new FilterFunction<Event>() { + private static final long serialVersionUID = 3990995859716364087L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }; + + private static final TypeSerializer<Event> serializer = TypeExtractor.createTypeInfo(Event.class) + .createSerializer(new ExecutionConfig()); + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -81,83 +106,96 @@ public class NFACompilerTest extends TestLogger { */ @Test public void testNFACompilerWithSimplePattern() { - Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { - private static final long serialVersionUID = 3314714776170474221L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getPrice() > 2; - } - }) - .followedBy("middle").subtype(SubEvent.class) - .next("end").where(new FilterFunction<Event>() { - private static final long serialVersionUID = 3990995859716364087L; + Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter) + .followedBy("middle").subtype(SubEvent.class) + .next("end").where(endFilter); - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - }); - - TypeInformation<Event> typeInformation = TypeExtractor.createTypeInfo(Event.class); - - NFA<Event> nfa = NFACompiler.compile(pattern, typeInformation.createSerializer(new ExecutionConfig()), false); + NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false); Set<State<Event>> states = nfa.getStates(); - assertEquals(4, states.size()); Map<String, State<Event>> stateMap = new HashMap<>(); - - for (State<Event> state: states) { + for (State<Event> state : states) { stateMap.put(state.getName(), state); } - assertTrue(stateMap.containsKey(NFACompiler.BEGINNING_STATE_NAME)); - State<Event> beginningState = stateMap.get(NFACompiler.BEGINNING_STATE_NAME); - - assertTrue(beginningState.isStart()); - assertTrue(stateMap.containsKey("start")); State<Event> startState = stateMap.get("start"); + assertTrue(startState.isStart()); + final Set<Tuple2<String, StateTransitionAction>> startTransitions = unfoldTransitions(startState); + assertEquals(newHashSet( + Tuple2.of("middle", StateTransitionAction.TAKE) + ), startTransitions); - Collection<StateTransition<Event>> startTransitions = startState.getStateTransitions(); - Map<String, StateTransition<Event>> startTransitionMap = new HashMap<>(); + assertTrue(stateMap.containsKey("middle")); + State<Event> middleState = stateMap.get("middle"); + final Set<Tuple2<String, StateTransitionAction>> middleTransitions = unfoldTransitions(middleState); + assertEquals(newHashSet( + Tuple2.of("middle", StateTransitionAction.IGNORE), + Tuple2.of("end", StateTransitionAction.TAKE) + ), middleTransitions); - for (StateTransition<Event> transition: startTransitions) { - startTransitionMap.put(transition.getTargetState().getName(), transition); - } + assertTrue(stateMap.containsKey("end")); + State<Event> endState = stateMap.get("end"); + final Set<Tuple2<String, StateTransitionAction>> endTransitions = unfoldTransitions(endState); + assertEquals(newHashSet( + Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE) + ), endTransitions); + + assertTrue(stateMap.containsKey(NFACompiler.ENDING_STATE_NAME)); + State<Event> endingState = stateMap.get(NFACompiler.ENDING_STATE_NAME); + assertTrue(endingState.isFinal()); + assertEquals(0, endingState.getStateTransitions().size()); + } - assertEquals(2, startTransitionMap.size()); - assertTrue(startTransitionMap.containsKey("start")); + @Test + public void testNFACompilerWithKleeneStar() { - StateTransition<Event> reflexiveTransition = startTransitionMap.get("start"); - assertEquals(StateTransitionAction.IGNORE, reflexiveTransition.getAction()); + Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter) + .followedBy("middle").subtype(SubEvent.class).zeroOrMore() + .followedBy("end").where(endFilter); - assertTrue(startTransitionMap.containsKey("middle")); - StateTransition<Event> startMiddleTransition = startTransitionMap.get("middle"); - assertEquals(StateTransitionAction.TAKE, startMiddleTransition.getAction()); + NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false); - assertTrue(stateMap.containsKey("middle")); - State<Event> middleState = stateMap.get("middle"); + Set<State<Event>> states = nfa.getStates(); + assertEquals(5, states.size()); - Map<String, StateTransition<Event>> middleTransitionMap = new HashMap<>(); - for (StateTransition<Event> transition: middleState.getStateTransitions()) { - middleTransitionMap.put(transition.getTargetState().getName(), transition); + Set<Tuple2<String, Set<Tuple2<String, StateTransitionAction>>>> stateMap = new HashSet<>(); + for (State<Event> state : states) { + stateMap.add(Tuple2.of(state.getName(), unfoldTransitions(state))); } - assertEquals(1, middleTransitionMap.size()); + assertEquals(stateMap, newHashSet( + Tuple2.of("start", newHashSet(Tuple2.of("middle", StateTransitionAction.TAKE))), + Tuple2.of("middle", newHashSet( + Tuple2.of("middle", StateTransitionAction.IGNORE), + Tuple2.of("middle", StateTransitionAction.TAKE) + )), + Tuple2.of("middle", newHashSet( + Tuple2.of("middle", StateTransitionAction.IGNORE), + Tuple2.of("middle", StateTransitionAction.TAKE), + Tuple2.of("end", StateTransitionAction.PROCEED) + )), + Tuple2.of("end", newHashSet( + Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE), + Tuple2.of("end", StateTransitionAction.IGNORE) + )), + Tuple2.of(NFACompiler.ENDING_STATE_NAME, Sets.newHashSet()) + )); - assertTrue(middleTransitionMap.containsKey("end")); - StateTransition<Event> middleEndTransition = middleTransitionMap.get("end"); + } - assertEquals(StateTransitionAction.TAKE, middleEndTransition.getAction()); - assertTrue(stateMap.containsKey("end")); - State<Event> endState = stateMap.get("end"); - - assertTrue(endState.isFinal()); - assertEquals(0, endState.getStateTransitions().size()); + private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) { + final Set<Tuple2<String, StateTransitionAction>> transitions = new HashSet<>(); + for (StateTransition<T> transition : state.getStateTransitions()) { + transitions.add(Tuple2.of( + transition.getTargetState().getName(), + transition.getAction())); + } + return transitions; } + } http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index 98c3f5a..68b0419 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -184,4 +184,58 @@ public class PatternTest extends TestLogger { assertEquals(previous2.getName(), "start"); } + @Test(expected = MalformedPatternException.class) + public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception { + + Pattern.begin("start").where(new FilterFunction<Object>() { + @Override + public boolean filter(Object value) throws Exception { + return true; + } + }).oneOrMore().zeroOrMore(); + } + + @Test(expected = MalformedPatternException.class) + public void testPatternCanHaveQuantifierSpecifiedOnce2() throws Exception { + + Pattern.begin("start").where(new FilterFunction<Object>() { + @Override + public boolean filter(Object value) throws Exception { + return true; + } + }).zeroOrMore().times(1); + } + + @Test(expected = MalformedPatternException.class) + public void testPatternCanHaveQuantifierSpecifiedOnce3() throws Exception { + + Pattern.begin("start").where(new FilterFunction<Object>() { + @Override + public boolean filter(Object value) throws Exception { + return true; + } + }).times(1).oneOrMore(); + } + + @Test(expected = MalformedPatternException.class) + public void testPatternCanHaveQuantifierSpecifiedOnce4() throws Exception { + + Pattern.begin("start").where(new FilterFunction<Object>() { + @Override + public boolean filter(Object value) throws Exception { + return true; + } + }).oneOrMore().oneOrMore(true); + } + + @Test(expected = MalformedPatternException.class) + public void testPatternCanHaveQuantifierSpecifiedOnce5() throws Exception { + + Pattern.begin("start").where(new FilterFunction<Object>() { + @Override + public boolean filter(Object value) throws Exception { + return true; + } + }).oneOrMore(true).zeroOrMore(true); + } }