Repository: flink Updated Branches: refs/heads/table-retraction b237a3ef0 -> e265620d4 (forced update)
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index f4bf647..4a00c1e 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.cep.Event; import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.Quantifier; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.windowing.time.Time; @@ -46,6 +47,7 @@ import java.util.Set; import static org.junit.Assert.assertEquals; +@SuppressWarnings("unchecked") public class NFAITCase extends TestLogger { @Test @@ -56,12 +58,12 @@ public class NFAITCase extends TestLogger { SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); Event endEvent = new Event(43, "end", 1.0); - inputEvents.add(new StreamRecord<Event>(startEvent, 1)); - inputEvents.add(new StreamRecord<Event>(new Event(43, "foobar", 1.0), 2)); + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(43, "foobar", 1.0), 2)); inputEvents.add(new StreamRecord<Event>(new SubEvent(41, "barfoo", 1.0, 5.0), 3)); inputEvents.add(new StreamRecord<Event>(middleEvent, 3)); - inputEvents.add(new StreamRecord<Event>(new Event(43, "start", 1.0), 4)); - inputEvents.add(new StreamRecord<Event>(endEvent, 5)); + inputEvents.add(new StreamRecord<>(new Event(43, "start", 1.0), 4)); + inputEvents.add(new StreamRecord<>(endEvent, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @@ -212,12 +214,12 @@ public class NFAITCase extends TestLogger { final Event middleEvent; final Event endEvent; - events.add(new StreamRecord<Event>(new Event(1, "start", 1.0), 1)); - events.add(new StreamRecord<Event>(startEvent = new Event(2, "start", 1.0), 2)); - events.add(new StreamRecord<Event>(middleEvent = new Event(3, "middle", 1.0), 3)); - events.add(new StreamRecord<Event>(new Event(4, "foobar", 1.0), 4)); - events.add(new StreamRecord<Event>(endEvent = new Event(5, "end", 1.0), 11)); - events.add(new StreamRecord<Event>(new Event(6, "end", 1.0), 13)); + events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1)); + events.add(new StreamRecord<>(startEvent = new Event(2, "start", 1.0), 2)); + events.add(new StreamRecord<>(middleEvent = new Event(3, "middle", 1.0), 3)); + events.add(new StreamRecord<>(new Event(4, "foobar", 1.0), 4)); + events.add(new StreamRecord<>(endEvent = new Event(5, "end", 1.0), 11)); + events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @@ -273,12 +275,12 @@ public class NFAITCase extends TestLogger { Set<Tuple2<Map<String, Event>, Long>> resultingTimeoutPatterns = new HashSet<>(); Set<Tuple2<Map<String, Event>, Long>> expectedTimeoutPatterns = new HashSet<>(); - events.add(new StreamRecord<Event>(new Event(1, "start", 1.0), 1)); - events.add(new StreamRecord<Event>(new Event(2, "start", 1.0), 2)); - events.add(new StreamRecord<Event>(new Event(3, "middle", 1.0), 3)); - events.add(new StreamRecord<Event>(new Event(4, "foobar", 1.0), 4)); - events.add(new StreamRecord<Event>(new Event(5, "end", 1.0), 11)); - events.add(new StreamRecord<Event>(new Event(6, "end", 1.0), 13)); + events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1)); + events.add(new StreamRecord<>(new Event(2, "start", 1.0), 2)); + events.add(new StreamRecord<>(new Event(3, "middle", 1.0), 3)); + events.add(new StreamRecord<>(new Event(4, "foobar", 1.0), 4)); + events.add(new StreamRecord<>(new Event(5, "end", 1.0), 11)); + events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13)); Map<String, Event> timeoutPattern1 = new HashMap<>(); timeoutPattern1.put("start", new Event(1, "start", 1.0)); @@ -306,14 +308,14 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = -3268741540234334074L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new SimpleCondition<Event>() { + }).followedByAny("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = -8995174172182138608L; @Override @@ -353,13 +355,13 @@ public class NFAITCase extends TestLogger { SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0); Event endEvent= new Event(46, "end", 1.0); - inputEvents.add(new StreamRecord<Event>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(startEvent, 1)); inputEvents.add(new StreamRecord<Event>(middleEvent1, 3)); inputEvents.add(new StreamRecord<Event>(middleEvent2, 4)); inputEvents.add(new StreamRecord<Event>(middleEvent3, 5)); inputEvents.add(new StreamRecord<Event>(nextOne1, 6)); inputEvents.add(new StreamRecord<Event>(nextOne2, 7)); - inputEvents.add(new StreamRecord<Event>(endEvent, 8)); + inputEvents.add(new StreamRecord<>(endEvent, 8)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @@ -368,21 +370,21 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle-first").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { + }).followedByAny("middle-first").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { private static final long serialVersionUID = 6215754202506583964L; @Override public boolean filter(SubEvent value) throws Exception { return value.getVolume() > 5.0; } - }).followedBy("middle-second").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { + }).followedByAny("middle-second").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { private static final long serialVersionUID = 6215754202506583964L; @Override public boolean filter(SubEvent value) throws Exception { return value.getName().equals("next-one"); } - }).followedBy("end").where(new SimpleCondition<Event>() { + }).followedByAny("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 7056763917392056548L; @Override @@ -449,28 +451,28 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore().allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().optional().followedByAny("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } - }).followedBy("end2").where(new SimpleCondition<Event>() { + }).followedByAny("end2").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).followedBy("end3").where(new SimpleCondition<Event>() { + }).followedByAny("end3").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -537,7 +539,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -555,27 +557,14 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(4, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), - Sets.newHashSet(startEvent, middleEvent1, end1), - Sets.newHashSet(startEvent, middleEvent2, end1), - Sets.newHashSet(startEvent, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, middleEvent2, end1), + Lists.newArrayList(startEvent, end1) + )); } @Test @@ -724,14 +713,14 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle-first").where(new SimpleCondition<Event>() { + }).followedByAny("middle-first").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore().allowCombinations().optional().followedBy("middle-second").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().optional().followedByAny("middle-second").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -803,21 +792,21 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("branching").where(new SimpleCondition<Event>() { + }).followedByAny("branching").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).followedBy("merging").where(new SimpleCondition<Event>() { + }).followedByAny("merging").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("f"); } - }).followedBy("kleene").where(new SimpleCondition<Event>() { + }).followedByAny("kleene").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -940,7 +929,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -958,25 +947,12 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(2, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(start, middleEvent1, middleEvent2, end), - Sets.newHashSet(start, middleEvent2, end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(start, middleEvent1, middleEvent2, end), + Lists.newArrayList(start, middleEvent2, end) + )); } @Test @@ -1000,14 +976,14 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore().allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().followedByAny("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1183,14 +1159,14 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore().followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().followedByAny("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1201,26 +1177,16 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } - - assertEquals(3, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), - Sets.newHashSet(startEvent, middleEvent1, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, end1) + )); } @Test @@ -1322,25 +1288,12 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } - - assertEquals(2, allPatterns.size()); - assertEquals(Sets.<Set<Event>>newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), - Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1) + )); } @Test @@ -1375,25 +1328,13 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(middleEvent1, middleEvent2, end1), + Lists.newArrayList(middleEvent2, middleEvent3, end1) + )); - assertEquals(2, allPatterns.size()); - assertEquals(Sets.<Set<Event>>newHashSet( - Sets.newHashSet(middleEvent1, middleEvent2, end1), - Sets.newHashSet(middleEvent2, middleEvent3, end1) - ), resultingPatterns); } @Test @@ -1442,49 +1383,6 @@ public class NFAITCase extends TestLogger { } @Test - public void testTimesStrictWithFollowedBy() { - List<StreamRecord<Event>> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(2).allowCombinations().consecutive().followedBy("end1").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent2, ConsecutiveData.end) - )); - } - - @Test public void testTimesNotStrictWithFollowedByEager() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); @@ -1543,7 +1441,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1571,49 +1469,6 @@ public class NFAITCase extends TestLogger { } @Test - public void testTimesStrictWithFollowedByNotEager() { - List<StreamRecord<Event>> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(2).allowCombinations().consecutive().followedBy("end1").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end) - )); - } - - @Test public void testTimesStrictWithNextAndConsecutive() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); @@ -1789,29 +1644,16 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } - - assertEquals(6, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3), - Sets.newHashSet(middleEvent1, middleEvent2), - Sets.newHashSet(middleEvent1), - Sets.newHashSet(middleEvent2, middleEvent3), - Sets.newHashSet(middleEvent2), - Sets.newHashSet(middleEvent3) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(middleEvent1, middleEvent2, middleEvent3), + Lists.newArrayList(middleEvent1, middleEvent2), + Lists.newArrayList(middleEvent1), + Lists.newArrayList(middleEvent2, middleEvent3), + Lists.newArrayList(middleEvent2), + Lists.newArrayList(middleEvent3) + )); } @Test @@ -1895,26 +1737,13 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(3, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2), - Sets.newHashSet(startEvent, middleEvent1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2), + Lists.newArrayList(startEvent, middleEvent1) + )); } /////////////////////////////// Optional //////////////////////////////////////// @@ -1978,7 +1807,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2025,7 +1854,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2071,7 +1900,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2098,6 +1927,54 @@ public class NFAITCase extends TestLogger { } @Test + public void testOneOrMoreStrictOptional() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test public void testTimesStrictOptional1() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); @@ -2201,94 +2078,92 @@ public class NFAITCase extends TestLogger { } @Test - public void testStrictCombinationsOneOrMore() { - List<List<Event>> resultingPatterns = testStrictOneOrMore(false); + public void testStrictOneOrMore() { + List<List<Event>> resultingPatterns = testOneOrMore(Quantifier.ConsumingStrategy.STRICT); compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent4, ConsecutiveData.end) + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end) + )); + } + + @Test + public void testSkipTillNextOneOrMore() { + List<List<Event>> resultingPatterns = testOneOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_NEXT); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end) )); } @Test - public void testStrictEagerOneOrMore() { - List<List<Event>> resultingPatterns = testStrictOneOrMore(true); + public void testSkipTillAnyOneOrMore() { + List<List<Event>> resultingPatterns = testOneOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_ANY); compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent4, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent4, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end) )); } - private List<List<Event>> testStrictOneOrMore(boolean eager) { + private List<List<Event>> testOneOrMore(Quantifier.ConsumingStrategy strategy) { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2)); inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 5)); - inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent4, 7)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 8)); - - Pattern<Event, ?> pattern; - if (eager) { - pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).oneOrMore().consecutive() - .followedBy("end1").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 4)); + inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent4, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - } else { - pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).oneOrMore().allowCombinations().consecutive() - .followedBy("end1").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore(); - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); + switch (strategy) { + case STRICT: + pattern = pattern.consecutive(); + break; + case SKIP_TILL_NEXT: + break; + case SKIP_TILL_ANY: + pattern = pattern.allowCombinations(); + break; } + pattern = pattern.followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); return feedNFA(inputEvents, nfa); @@ -2296,9 +2171,10 @@ public class NFAITCase extends TestLogger { @Test public void testStrictEagerZeroOrMore() { - List<List<Event>> resultingPatterns = testStrictZeroOrMore(true); + List<List<Event>> resultingPatterns = testZeroOrMore(Quantifier.ConsumingStrategy.STRICT); compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) @@ -2306,67 +2182,75 @@ public class NFAITCase extends TestLogger { } @Test - public void testStrictCombinationsZeroOrMore() { - List<List<Event>> resultingPatterns = testStrictZeroOrMore(false); + public void testSkipTillAnyZeroOrMore() { + List<List<Event>> resultingPatterns = testZeroOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_ANY); compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent4, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent4, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) )); } - private List<List<Event>> testStrictZeroOrMore(boolean eager) { + @Test + public void testSkipTillNextZeroOrMore() { + List<List<Event>> resultingPatterns = testZeroOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_NEXT); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + private List<List<Event>> testZeroOrMore(Quantifier.ConsumingStrategy strategy) { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2)); inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 4)); inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent4, 6)); inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - Pattern<Event, ?> pattern = eager - ? Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).oneOrMore().optional().consecutive().followedBy("end1").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }) - : Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional(); - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + switch (strategy) { + case STRICT: + pattern = pattern.consecutive(); + break; + case SKIP_TILL_NEXT: + break; + case SKIP_TILL_ANY: + pattern = pattern.allowCombinations(); + break; + } - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).oneOrMore().allowCombinations().optional().consecutive().followedBy("end1").where(new SimpleCondition<Event>() { + pattern = pattern.followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2380,7 +2264,6 @@ public class NFAITCase extends TestLogger { return feedNFA(inputEvents, nfa); } - @Test public void testTimesStrict() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); @@ -2400,14 +2283,14 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).allowCombinations().consecutive().followedBy("end1").where(new SimpleCondition<Event>() { + }).times(2).consecutive().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2444,7 +2327,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2733,16 +2616,10 @@ public class NFAITCase extends TestLogger { Lists.<List<Event>>newArrayList( Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4), Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1), - Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent3), - Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent2), Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1), Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4), Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1), - Lists.newArrayList(startEvent1, endEvent, middleEvent4), Lists.newArrayList(startEvent1, endEvent, middleEvent1), - Lists.newArrayList(startEvent1, endEvent, middleEvent2), - Lists.newArrayList(startEvent1, endEvent, middleEvent3), - Lists.newArrayList(startEvent2, endEvent, middleEvent4), Lists.newArrayList(startEvent2, endEvent, middleEvent3) ) ); @@ -2798,9 +2675,7 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - - return resultingPatterns; + return feedNFA(inputEvents, nfa); } private static class MySubeventIterCondition extends IterativeCondition<SubEvent> { @@ -2887,9 +2762,7 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - - return resultingPatterns; + return feedNFA(inputEvents, nfa); } private static class MyEventIterCondition extends IterativeCondition<Event> { @@ -2978,7 +2851,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { + }).followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { private static final long serialVersionUID = 2178338526904474690L; @Override @@ -3043,14 +2916,14 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).oneOrMore().followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { + }).oneOrMore().followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { private static final long serialVersionUID = 2178338526904474690L; @Override public boolean filter(SubEvent value) throws Exception { return value.getName().startsWith("foo"); } - }).followedBy("end").where(new IterativeCondition<Event>() { + }).followedByAny("end").where(new IterativeCondition<Event>() { private static final long serialVersionUID = 7056763917392056548L; @Override @@ -3084,6 +2957,131 @@ public class NFAITCase extends TestLogger { ); } + + /////////////////////////////////////// Skip till next ///////////////////////////// + + @Test + public void testBranchingPatternSkipTillNext() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "start", 1.0); + SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10.0); + SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0); + SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0); + SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0); + SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0); + Event endEvent= new Event(46, "end", 1.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<Event>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<Event>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<Event>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<Event>(nextOne1, 6)); + inputEvents.add(new StreamRecord<Event>(nextOne2, 7)); + inputEvents.add(new StreamRecord<>(endEvent, 8)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle-first").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + }).followedBy("middle-second").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().equals("next-one"); + } + }).followedByAny("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> patterns = feedNFA(inputEvents, nfa); + + compareMaps(patterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, nextOne1, endEvent) + )); + } + + @Test + public void testBranchingPatternMixedFollowedBy() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "start", 1.0); + SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10.0); + SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0); + SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0); + SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0); + SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0); + Event endEvent= new Event(46, "end", 1.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<Event>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<Event>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<Event>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<Event>(nextOne1, 6)); + inputEvents.add(new StreamRecord<Event>(nextOne2, 7)); + inputEvents.add(new StreamRecord<>(endEvent, 8)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedByAny("middle-first").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + }).followedBy("middle-second").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().equals("next-one"); + } + }).followedByAny("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> patterns = feedNFA(inputEvents, nfa); + + compareMaps(patterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, nextOne1, endEvent), + Lists.newArrayList(startEvent, middleEvent2, nextOne1, endEvent), + Lists.newArrayList(startEvent, middleEvent3, nextOne1, endEvent) + )); + } + + + ///////////////////////////////////////// Utility //////////////////////////////// private List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) { List<List<Event>> resultingPatterns = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index da1f9c8..ced9efe 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -149,45 +149,6 @@ public class NFACompilerTest extends TestLogger { assertEquals(0, endingState.getStateTransitions().size()); } - @Test - public void testNFACompilerWithKleeneStar() { - - Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter) - .followedBy("middle").subtype(SubEvent.class).oneOrMore().optional() - .followedBy("end").where(endFilter); - - NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false); - - Set<State<Event>> states = nfa.getStates(); - assertEquals(5, states.size()); - - - Set<Tuple2<String, Set<Tuple2<String, StateTransitionAction>>>> stateMap = new HashSet<>(); - for (State<Event> state : states) { - stateMap.add(Tuple2.of(state.getName(), unfoldTransitions(state))); - } - - assertEquals(stateMap, newHashSet( - Tuple2.of("start", newHashSet(Tuple2.of("middle", StateTransitionAction.TAKE))), - Tuple2.of("middle", newHashSet( - Tuple2.of("middle", StateTransitionAction.IGNORE), - Tuple2.of("middle", StateTransitionAction.TAKE) - )), - Tuple2.of("middle", newHashSet( - Tuple2.of("middle", StateTransitionAction.IGNORE), - Tuple2.of("middle", StateTransitionAction.TAKE), - Tuple2.of("end", StateTransitionAction.PROCEED) - )), - Tuple2.of("end", newHashSet( - Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE), - Tuple2.of("end", StateTransitionAction.IGNORE) - )), - Tuple2.of(NFACompiler.ENDING_STATE_NAME, Sets.newHashSet()) - )); - - } - - private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) { final Set<Tuple2<String, StateTransitionAction>> transitions = new HashSet<>(); for (StateTransition<T> transition : state.getStateTransitions()) { http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index d599ec9..8465bc3 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -208,6 +208,7 @@ public class CEPOperatorTest extends TestLogger { * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033 */ @Test + @SuppressWarnings("unchecked") public void testKeyedAdvancingTimeWithoutElements() throws Exception { final KeySelector<Event, Integer> keySelector = new TestKeySelector(); @@ -522,7 +523,7 @@ public class CEPOperatorTest extends TestLogger { return value.getName().equals("start"); } }) - .followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { + .followedByAny("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { private static final long serialVersionUID = 6215754202506583964L; @Override @@ -530,7 +531,7 @@ public class CEPOperatorTest extends TestLogger { return value.getVolume() > 5.0; } }) - .followedBy("end").where(new SimpleCondition<Event>() { + .followedByAny("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 7056763917392056548L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index 8c4304a..3c1da2e 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -20,6 +20,7 @@ package org.apache.flink.cep.pattern; import org.apache.flink.cep.Event; import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy; import org.apache.flink.cep.pattern.conditions.OrCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.cep.pattern.conditions.SubtypeCondition; @@ -58,8 +59,8 @@ public class PatternTest extends TestLogger { assertNotNull(previous2 = previous.getPrevious()); assertNull(previous2.getPrevious()); - assertTrue(pattern instanceof FollowedByPattern); - assertTrue(previous instanceof FollowedByPattern); + assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier().getConsumingStrategy()); + assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, previous.getQuantifier().getConsumingStrategy()); assertEquals(pattern.getName(), "end"); assertEquals(previous.getName(), "next"); @@ -137,7 +138,7 @@ public class PatternTest extends TestLogger { assertNotNull(previous2 = previous.getPrevious()); assertNull(previous2.getPrevious()); - assertTrue(pattern instanceof FollowedByPattern); + assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier().getConsumingStrategy()); assertNotNull(previous.getCondition()); assertEquals(pattern.getName(), "end"); @@ -177,7 +178,7 @@ public class PatternTest extends TestLogger { assertNotNull(previous2 = previous.getPrevious()); assertNull(previous2.getPrevious()); - assertTrue(pattern instanceof FollowedByPattern); + assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier().getConsumingStrategy()); assertFalse(previous.getCondition() instanceof OrCondition); assertTrue(previous2.getCondition() instanceof OrCondition);