http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index ccae848..825ba957 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -46,9 +46,9 @@ public class NFAITCase extends TestLogger {
        public void testSimplePatternNFA() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
-               Event startEvent = new Event(42, "start", 1.0);
+               Event startEvent = new Event(41, "start", 1.0);
                SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-               Event endEvent=  new Event(43, "end", 1.0);
+               Event endEvent = new Event(43, "end", 1.0);
 
                inputEvents.add(new StreamRecord<Event>(startEvent, 1));
                inputEvents.add(new StreamRecord<Event>(new Event(43, "foobar", 
1.0), 2));
@@ -102,6 +102,99 @@ public class NFAITCase extends TestLogger {
                assertEquals(endEvent, patternMap.get("end"));
        }
 
+       @Test
+       public void testStrictContinuityWithResults() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event end = new Event(42, "b", 4.0);
+
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(end, 5));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).next("end").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(1, allPatterns.size());
+               assertEquals(Sets.<Set<Event>>newHashSet(
+                       Sets.newHashSet(middleEvent1, end)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testStrictContinuityNoResults() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "c", 3.0);
+               Event end = new Event(43, "b", 4.0);
+
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(end, 5));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).next("end").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                       }
+               }
+
+               assertEquals(Sets.newHashSet(), resultingPatterns);
+       }
+
        /**
         * Tests that the NFA successfully filters out expired elements with 
respect to the window
         * length
@@ -327,6 +420,1247 @@ public class NFAITCase extends TestLogger {
                ), patterns);
        }
 
+       @Test
+       public void testComplexBranchingAfterKleeneStar() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+               Event end2 = new Event(45, "d", 6.0);
+               Event end3 = new Event(46, "d", 7.0);
+               Event end4 = new Event(47, "e", 8.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+               inputEvents.add(new StreamRecord<>(end2, 7));
+               inputEvents.add(new StreamRecord<>(end3, 8));
+               inputEvents.add(new StreamRecord<>(end4, 9));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               })
+                       .followedBy("end2").where(new FilterFunction<Event>() {
+                               private static final long serialVersionUID = 
5726188262756267490L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("d");
+                               }
+                       })
+                       .followedBy("end3").where(new FilterFunction<Event>() {
+                               private static final long serialVersionUID = 
5726188262756267490L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("e");
+                               }
+                       });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(16, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end1, end2, end4),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1, end2, end4),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent3, 
end1, end2, end4),
+                       Sets.newHashSet(startEvent, middleEvent2, middleEvent3, 
end1, end2, end4),
+                       Sets.newHashSet(startEvent, middleEvent1, end1, end2, 
end4),
+                       Sets.newHashSet(startEvent, middleEvent2, end1, end2, 
end4),
+                       Sets.newHashSet(startEvent, middleEvent3, end1, end2, 
end4),
+                       Sets.newHashSet(startEvent, end1, end2, end4),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end1, end3, end4),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1, end3, end4),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent3, 
end1, end3, end4),
+                       Sets.newHashSet(startEvent, middleEvent2, middleEvent3, 
end1, end3, end4),
+                       Sets.newHashSet(startEvent, middleEvent1, end1, end3, 
end4),
+                       Sets.newHashSet(startEvent, middleEvent2, end1, end3, 
end4),
+                       Sets.newHashSet(startEvent, middleEvent3, end1, end3, 
end4),
+                       Sets.newHashSet(startEvent, end1, end3, end4)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testKleeneStar() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(4, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
+                       Sets.newHashSet(startEvent, middleEvent1, end1),
+                       Sets.newHashSet(startEvent, middleEvent2, end1),
+                       Sets.newHashSet(startEvent, end1)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testEagerKleeneStar() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore(true).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(4, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end1),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
+                       Sets.newHashSet(startEvent, middleEvent1, end1),
+                       Sets.newHashSet(startEvent, end1)
+               ), resultingPatterns);
+       }
+
+
+       @Test
+       public void testBeginWithKleeneStar() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event middleEvent1 = new Event(40, "a", 2.0);
+               Event middleEvent2 = new Event(41, "a", 3.0);
+               Event middleEvent3 = new Event(41, "a", 3.0);
+               Event end = new Event(42, "b", 4.0);
+
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<>(end, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore().followedBy("end").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(7, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(middleEvent1, middleEvent2, 
middleEvent3, end),
+                       Sets.newHashSet(middleEvent1, middleEvent2, end),
+                       Sets.newHashSet(middleEvent2, middleEvent3, end),
+                       Sets.newHashSet(middleEvent1, end),
+                       Sets.newHashSet(middleEvent2, end),
+                       Sets.newHashSet(middleEvent3, end),
+                       Sets.newHashSet(end)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testKleeneStarAfterKleeneStar() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "d", 3.0);
+               Event middleEvent3 = new Event(43, "d", 4.0);
+               Event end = new Event(44, "e", 4.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<>(end, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle-first").where(new FilterFunction<Event>() 
{
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore(false).followedBy("middle-second").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               }).zeroOrMore(false).followedBy("end").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("e");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(8, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent3, 
end),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end),
+                       Sets.newHashSet(startEvent, middleEvent2, middleEvent3, 
end),
+                       Sets.newHashSet(startEvent, middleEvent3, end),
+                       Sets.newHashSet(startEvent, middleEvent2, end),
+                       Sets.newHashSet(startEvent, middleEvent1, end),
+                       Sets.newHashSet(startEvent, end)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testKleeneStarAfterBranching() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event merging = new Event(42, "f", 3.0);
+               Event kleene1 = new Event(43, "d", 4.0);
+               Event kleene2 = new Event(44, "d", 4.0);
+               Event end = new Event(45, "e", 4.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(merging, 5));
+               inputEvents.add(new StreamRecord<>(kleene1, 6));
+               inputEvents.add(new StreamRecord<>(kleene2, 7));
+               inputEvents.add(new StreamRecord<>(end, 8));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
 
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("branching").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).followedBy("merging").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("f");
+                       }
+               }).followedBy("kleene").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               }).zeroOrMore(false).followedBy("end").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("e");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(8, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent, middleEvent1, merging, end),
+                       Sets.newHashSet(startEvent, middleEvent1, merging, 
kleene1, end),
+                       Sets.newHashSet(startEvent, middleEvent1, merging, 
kleene2, end),
+                       Sets.newHashSet(startEvent, middleEvent1, merging, 
kleene1, kleene2, end),
+                       Sets.newHashSet(startEvent, middleEvent2, merging, end),
+                       Sets.newHashSet(startEvent, middleEvent2, merging, 
kleene1, end),
+                       Sets.newHashSet(startEvent, middleEvent2, merging, 
kleene2, end),
+                       Sets.newHashSet(startEvent, middleEvent2, merging, 
kleene1, kleene2, end)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testStrictContinuityNoResultsAfterKleeneStar() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event start = new Event(40, "d", 2.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 2.0);
+               Event middleEvent3 = new Event(43, "c", 3.0);
+               Event end = new Event(44, "b", 4.0);
+
+               inputEvents.add(new StreamRecord<>(start, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+               inputEvents.add(new StreamRecord<>(end, 5));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore()
+                       .next("end").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                       }
+               }
+
+               assertEquals(Sets.newHashSet(), resultingPatterns);
+       }
+
+       @Test
+       public void testStrictContinuityResultsAfterKleeneStar() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event start = new Event(40, "d", 2.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 2.0);
+               Event end = new Event(43, "b", 4.0);
+
+               inputEvents.add(new StreamRecord<>(start, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+               inputEvents.add(new StreamRecord<>(end, 5));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore(false)
+                       .next("end").where(new FilterFunction<Event>() {
+                               private static final long serialVersionUID = 
5726188262756267490L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("b");
+                               }
+                       });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(2, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(start, middleEvent1, middleEvent2, end),
+                       Sets.newHashSet(start, middleEvent2, end)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testAtLeastOne() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(3, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
+                       Sets.newHashSet(startEvent, middleEvent1, end1),
+                       Sets.newHashSet(startEvent, middleEvent2, end1)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testBeginWithAtLeastOne() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent1 = new Event(41, "a", 2.0);
+               Event startEvent2 = new Event(42, "a", 3.0);
+               Event startEvent3 = new Event(42, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent1, 3));
+               inputEvents.add(new StreamRecord<>(startEvent2, 4));
+               inputEvents.add(new StreamRecord<>(startEvent3, 5));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore(false).followedBy("end").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(7, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent1, startEvent2, startEvent3, 
end1),
+                       Sets.newHashSet(startEvent1, startEvent2, end1),
+                       Sets.newHashSet(startEvent1, startEvent3, end1),
+                       Sets.newHashSet(startEvent2, startEvent3, end1),
+                       Sets.newHashSet(startEvent1, end1),
+                       Sets.newHashSet(startEvent2, end1),
+                       Sets.newHashSet(startEvent3, end1)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testNextZeroOrMore() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "start", 1.0);
+               Event middleEvent1 = new Event(40, "middle", 2.0);
+               Event middleEvent2 = new Event(40, "middle", 3.0);
+               Event middleEvent3 = new Event(40, "middle", 4.0);
+               Event endEvent = new Event(46, "end", 1.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1L));
+               inputEvents.add(new StreamRecord<>(new Event(1, "event", 1.0), 
2L));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3L));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4L));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5L));
+               inputEvents.add(new StreamRecord<>(endEvent, 6L));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).next("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("middle");
+                       }
+               }).zeroOrMore(false).followedBy("end").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(1, allPatterns.size());
+               assertEquals(Sets.<Set<Event>>newHashSet(
+                       Sets.newHashSet(startEvent, endEvent)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testAtLeastOneEager() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore(true).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(3, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end1),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
+                       Sets.newHashSet(startEvent, middleEvent1, end1)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testOptional() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent, 5));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).optional().followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(2, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent, middleEvent, end1),
+                       Sets.newHashSet(startEvent, end1)
+               ), resultingPatterns);
+       }
+
+
+       @Test
+       public void testTimes() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).next("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(1, allPatterns.size());
+               assertEquals(Sets.<Set<Event>>newHashSet(
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testStartWithTimes() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(2, allPatterns.size());
+               assertEquals(Sets.<Set<Event>>newHashSet(
+                       Sets.newHashSet(middleEvent1, middleEvent2, end1),
+                       Sets.newHashSet(middleEvent2, middleEvent3, end1)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testStartWithOptional() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).optional().followedBy("end1").where(new 
FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(2, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent,  end1),
+                       Sets.newHashSet(end1)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testEndWithZeroOrMore() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore();
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(4, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent,  middleEvent1, 
middleEvent2, middleEvent3),
+                       Sets.newHashSet(startEvent,  middleEvent1, 
middleEvent2),
+                       Sets.newHashSet(startEvent,  middleEvent1),
+                       Sets.newHashSet(startEvent)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testStartAndEndWithZeroOrMore() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "d", 5.0);
+               Event end2 = new Event(45, "d", 5.0);
+               Event end3 = new Event(46, "d", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+               inputEvents.add(new StreamRecord<>(end2, 6));
+               inputEvents.add(new StreamRecord<>(end3, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore();
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(6, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(middleEvent1,  middleEvent2, 
middleEvent3),
+                       Sets.newHashSet(middleEvent1,  middleEvent2),
+                       Sets.newHashSet(middleEvent1),
+                       Sets.newHashSet(middleEvent2,  middleEvent3),
+                       Sets.newHashSet(middleEvent2),
+                       Sets.newHashSet(middleEvent3)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testEndWithOptional() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).optional();
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(2, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent,  middleEvent1),
+                       Sets.newHashSet(startEvent)
+               ), resultingPatterns);
+       }
+
+       @Test
+       public void testEndWithOneOrMore() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore();
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               Set<Set<Event>> resultingPatterns = new HashSet<>();
+               List<Collection<Event>> allPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> foundPattern : patterns) {
+                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
+                               allPatterns.add(foundPattern.values());
+                       }
+               }
+
+               assertEquals(3, allPatterns.size());
+               assertEquals(Sets.newHashSet(
+                       Sets.newHashSet(startEvent,  middleEvent1, 
middleEvent2, middleEvent3),
+                       Sets.newHashSet(startEvent,  middleEvent1, 
middleEvent2),
+                       Sets.newHashSet(startEvent,  middleEvent1)
+               ), resultingPatterns);
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 9f65132..40a0e7e 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep.nfa;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.pattern.FilterFunctions;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -51,12 +52,12 @@ public class NFATest extends TestLogger {
                streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 
3L));
                streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 
4L));
 
-               State<Event> startingState = new State<>("", 
State.StateType.Start);
-               State<Event> startState = new State<>("start", 
State.StateType.Normal);
-               State<Event> endState = new State<>("end", 
State.StateType.Final);
-               StateTransition<Event> starting2Start = new StateTransition<>(
-                       StateTransitionAction.TAKE,
-                       startState,
+               State<Event> startState = new State<>("start", 
State.StateType.Start);
+               State<Event> endState = new State<>("end", 
State.StateType.Normal);
+               State<Event> endingState = new State<>("", 
State.StateType.Final);
+
+               startState.addTake(
+                       endState,
                        new FilterFunction<Event>() {
                                private static final long serialVersionUID = 
-4869589195918650396L;
 
@@ -64,12 +65,9 @@ public class NFATest extends TestLogger {
                                public boolean filter(Event value) throws 
Exception {
                                        return value.getName().equals("start");
                                }
-                       }
-               );
-
-               StateTransition<Event> start2End = new StateTransition<>(
-                       StateTransitionAction.TAKE,
-                       endState,
+                       });
+               endState.addTake(
+                       endingState,
                        new FilterFunction<Event>() {
                                private static final long serialVersionUID = 
2979804163709590673L;
 
@@ -77,18 +75,12 @@ public class NFATest extends TestLogger {
                                public boolean filter(Event value) throws 
Exception {
                                        return value.getName().equals("end");
                                }
-                       }
-               );
-
-               StateTransition<Event> start2Start = new 
StateTransition<>(StateTransitionAction.IGNORE, startState, null);
-
-               startingState.addStateTransition(starting2Start);
-               startState.addStateTransition(start2End);
-               startState.addStateTransition(start2Start);
+                       });
+               endState.addIgnore(FilterFunctions.<Event>trueFunction());
 
-               nfa.addState(startingState);
                nfa.addState(startState);
                nfa.addState(endState);
+               nfa.addState(endingState);
 
                Set<Map<String, Event>> expectedPatterns = new HashSet<>();
 
@@ -196,8 +188,10 @@ public class NFATest extends TestLogger {
        public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, 
List<StreamRecord<T>> inputs) {
                Set<Map<String, T>> actualPatterns = new HashSet<>();
 
-               for (StreamRecord<T> streamEvent: inputs) {
-                       Collection<Map<String, T>> matchedPatterns = 
nfa.process(streamEvent.getValue(), streamEvent.getTimestamp()).f0;
+               for (StreamRecord<T> streamEvent : inputs) {
+                       Collection<Map<String, T>> matchedPatterns = 
nfa.process(
+                               streamEvent.getValue(),
+                               streamEvent.getTimestamp()).f0;
 
                        actualPatterns.addAll(matchedPatterns);
                }
@@ -213,24 +207,12 @@ public class NFATest extends TestLogger {
                State<Event> startState = new State<>("start", 
State.StateType.Normal);
                State<Event> endState = new State<>("end", 
State.StateType.Final);
 
-               StateTransition<Event> starting2Start = new StateTransition<>(
-                       StateTransitionAction.TAKE,
-                       startState,
-                       new NameFilter("start"));
 
-               StateTransition<Event> start2End = new StateTransition<>(
-                       StateTransitionAction.TAKE,
-                       endState,
+               startingState.addTake(
+                       new NameFilter("start"));
+               startState.addTake(
                        new NameFilter("end"));
-
-               StateTransition<Event> start2Start = new StateTransition<>(
-                       StateTransitionAction.IGNORE,
-                       startState,
-                       null);
-
-               startingState.addStateTransition(starting2Start);
-               startState.addStateTransition(start2End);
-               startState.addStateTransition(start2Start);
+               startState.addIgnore(null);
 
                nfa.addState(startingState);
                nfa.addState(startState);
@@ -253,12 +235,12 @@ public class NFATest extends TestLogger {
        private NFA<Event> createStartEndNFA(long windowLength) {
                NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 
windowLength, false);
 
-               State<Event> startingState = new State<>("", 
State.StateType.Start);
-               State<Event> startState = new State<>("start", 
State.StateType.Normal);
-               State<Event> endState = new State<>("end", 
State.StateType.Final);
-               StateTransition<Event> starting2Start = new StateTransition<>(
-                       StateTransitionAction.TAKE,
-                       startState,
+               State<Event> startState = new State<>("start", 
State.StateType.Start);
+               State<Event> endState = new State<>("end", 
State.StateType.Normal);
+               State<Event> endingState = new State<>("", 
State.StateType.Final);
+
+               startState.addTake(
+                       endState,
                        new FilterFunction<Event>() {
                                private static final long serialVersionUID = 
-4869589195918650396L;
 
@@ -267,10 +249,8 @@ public class NFATest extends TestLogger {
                                        return value.getName().equals("start");
                                }
                        });
-
-               StateTransition<Event> start2End = new StateTransition<>(
-                       StateTransitionAction.TAKE,
-                       endState,
+               endState.addTake(
+                       endingState,
                        new FilterFunction<Event>() {
                                private static final long serialVersionUID = 
2979804163709590673L;
 
@@ -279,19 +259,11 @@ public class NFATest extends TestLogger {
                                        return value.getName().equals("end");
                                }
                        });
+               endState.addIgnore(FilterFunctions.<Event>trueFunction());
 
-               StateTransition<Event> start2Start = new StateTransition<>(
-                       StateTransitionAction.IGNORE,
-                       startState,
-                       null);
-
-               startingState.addStateTransition(starting2Start);
-               startState.addStateTransition(start2End);
-               startState.addStateTransition(start2Start);
-
-               nfa.addState(startingState);
                nfa.addState(startState);
                nfa.addState(endState);
+               nfa.addState(endingState);
 
                return nfa;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index d11f3a8..93d78cc 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
+import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
@@ -28,22 +30,45 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import static org.junit.Assert.assertTrue;
+import static com.google.common.collect.Sets.newHashSet;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class NFACompilerTest extends TestLogger {
 
+       private static final FilterFunction<Event> startFilter = new 
FilterFunction<Event>() {
+               private static final long serialVersionUID = 
3314714776170474221L;
+
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getPrice() > 2;
+               }
+       };
+
+       private static final FilterFunction<Event> endFilter = new 
FilterFunction<Event>() {
+               private static final long serialVersionUID = 
3990995859716364087L;
+
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("end");
+               }
+       };
+
+       private static final TypeSerializer<Event> serializer = 
TypeExtractor.createTypeInfo(Event.class)
+               .createSerializer(new ExecutionConfig());
+
        @Rule
        public ExpectedException expectedException = ExpectedException.none();
 
@@ -81,83 +106,96 @@ public class NFACompilerTest extends TestLogger {
         */
        @Test
        public void testNFACompilerWithSimplePattern() {
-               Pattern<Event, Event> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
-                       private static final long serialVersionUID = 
3314714776170474221L;
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getPrice() > 2;
-                       }
-               })
-               .followedBy("middle").subtype(SubEvent.class)
-               .next("end").where(new FilterFunction<Event>() {
-                               private static final long serialVersionUID = 
3990995859716364087L;
+               Pattern<Event, Event> pattern = 
Pattern.<Event>begin("start").where(startFilter)
+                       .followedBy("middle").subtype(SubEvent.class)
+                       .next("end").where(endFilter);
 
-                               @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("end");
-                       }
-               });
-
-               TypeInformation<Event> typeInformation = 
TypeExtractor.createTypeInfo(Event.class);
-
-               NFA<Event> nfa = NFACompiler.compile(pattern, 
typeInformation.createSerializer(new ExecutionConfig()), false);
+               NFA<Event> nfa = NFACompiler.compile(pattern, serializer, 
false);
 
                Set<State<Event>> states = nfa.getStates();
-
                assertEquals(4, states.size());
 
                Map<String, State<Event>> stateMap = new HashMap<>();
-
-               for (State<Event> state: states) {
+               for (State<Event> state : states) {
                        stateMap.put(state.getName(), state);
                }
 
-               
assertTrue(stateMap.containsKey(NFACompiler.BEGINNING_STATE_NAME));
-               State<Event> beginningState = 
stateMap.get(NFACompiler.BEGINNING_STATE_NAME);
-
-               assertTrue(beginningState.isStart());
-
                assertTrue(stateMap.containsKey("start"));
                State<Event> startState = stateMap.get("start");
+               assertTrue(startState.isStart());
+               final Set<Tuple2<String, StateTransitionAction>> 
startTransitions = unfoldTransitions(startState);
+               assertEquals(newHashSet(
+                       Tuple2.of("middle", StateTransitionAction.TAKE)
+               ), startTransitions);
 
-               Collection<StateTransition<Event>> startTransitions = 
startState.getStateTransitions();
-               Map<String, StateTransition<Event>> startTransitionMap = new 
HashMap<>();
+               assertTrue(stateMap.containsKey("middle"));
+               State<Event> middleState = stateMap.get("middle");
+               final Set<Tuple2<String, StateTransitionAction>> 
middleTransitions = unfoldTransitions(middleState);
+               assertEquals(newHashSet(
+                       Tuple2.of("middle", StateTransitionAction.IGNORE),
+                       Tuple2.of("end", StateTransitionAction.TAKE)
+               ), middleTransitions);
 
-               for (StateTransition<Event> transition: startTransitions) {
-                       
startTransitionMap.put(transition.getTargetState().getName(), transition);
-               }
+               assertTrue(stateMap.containsKey("end"));
+               State<Event> endState = stateMap.get("end");
+               final Set<Tuple2<String, StateTransitionAction>> endTransitions 
= unfoldTransitions(endState);
+               assertEquals(newHashSet(
+                       Tuple2.of(NFACompiler.ENDING_STATE_NAME, 
StateTransitionAction.TAKE)
+               ), endTransitions);
+
+               assertTrue(stateMap.containsKey(NFACompiler.ENDING_STATE_NAME));
+               State<Event> endingState = 
stateMap.get(NFACompiler.ENDING_STATE_NAME);
+               assertTrue(endingState.isFinal());
+               assertEquals(0, endingState.getStateTransitions().size());
+       }
 
-               assertEquals(2, startTransitionMap.size());
-               assertTrue(startTransitionMap.containsKey("start"));
+       @Test
+       public void testNFACompilerWithKleeneStar() {
 
-               StateTransition<Event> reflexiveTransition = 
startTransitionMap.get("start");
-               assertEquals(StateTransitionAction.IGNORE, 
reflexiveTransition.getAction());
+               Pattern<Event, Event> pattern = 
Pattern.<Event>begin("start").where(startFilter)
+                       
.followedBy("middle").subtype(SubEvent.class).zeroOrMore()
+                       .followedBy("end").where(endFilter);
 
-               assertTrue(startTransitionMap.containsKey("middle"));
-               StateTransition<Event> startMiddleTransition = 
startTransitionMap.get("middle");
-               assertEquals(StateTransitionAction.TAKE, 
startMiddleTransition.getAction());
+               NFA<Event> nfa = NFACompiler.compile(pattern, serializer, 
false);
 
-               assertTrue(stateMap.containsKey("middle"));
-               State<Event> middleState = stateMap.get("middle");
+               Set<State<Event>> states = nfa.getStates();
+               assertEquals(5, states.size());
 
-               Map<String, StateTransition<Event>> middleTransitionMap = new 
HashMap<>();
 
-               for (StateTransition<Event> transition: 
middleState.getStateTransitions()) {
-                       
middleTransitionMap.put(transition.getTargetState().getName(), transition);
+               Set<Tuple2<String, Set<Tuple2<String, StateTransitionAction>>>> 
stateMap = new HashSet<>();
+               for (State<Event> state : states) {
+                       stateMap.add(Tuple2.of(state.getName(), 
unfoldTransitions(state)));
                }
 
-               assertEquals(1, middleTransitionMap.size());
+               assertEquals(stateMap, newHashSet(
+                       Tuple2.of("start", newHashSet(Tuple2.of("middle", 
StateTransitionAction.TAKE))),
+                       Tuple2.of("middle", newHashSet(
+                               Tuple2.of("middle", 
StateTransitionAction.IGNORE),
+                               Tuple2.of("middle", StateTransitionAction.TAKE)
+                       )),
+                   Tuple2.of("middle", newHashSet(
+                           Tuple2.of("middle", StateTransitionAction.IGNORE),
+                           Tuple2.of("middle", StateTransitionAction.TAKE),
+                           Tuple2.of("end", StateTransitionAction.PROCEED)
+                   )),
+                       Tuple2.of("end", newHashSet(
+                               Tuple2.of(NFACompiler.ENDING_STATE_NAME, 
StateTransitionAction.TAKE),
+                               Tuple2.of("end", StateTransitionAction.IGNORE)
+                       )),
+                   Tuple2.of(NFACompiler.ENDING_STATE_NAME, Sets.newHashSet())
+               ));
 
-               assertTrue(middleTransitionMap.containsKey("end"));
-               StateTransition<Event> middleEndTransition = 
middleTransitionMap.get("end");
+       }
 
-               assertEquals(StateTransitionAction.TAKE, 
middleEndTransition.getAction());
 
-               assertTrue(stateMap.containsKey("end"));
-               State<Event> endState = stateMap.get("end");
-
-               assertTrue(endState.isFinal());
-               assertEquals(0, endState.getStateTransitions().size());
+       private <T> Set<Tuple2<String, StateTransitionAction>> 
unfoldTransitions(final State<T> state) {
+               final Set<Tuple2<String, StateTransitionAction>> transitions = 
new HashSet<>();
+               for (StateTransition<T> transition : 
state.getStateTransitions()) {
+                       transitions.add(Tuple2.of(
+                               transition.getTargetState().getName(),
+                               transition.getAction()));
+               }
+               return transitions;
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 98c3f5a..68b0419 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -184,4 +184,58 @@ public class PatternTest extends TestLogger {
                assertEquals(previous2.getName(), "start");
        }
 
+       @Test(expected = MalformedPatternException.class)
+       public void testPatternCanHaveQuantifierSpecifiedOnce1() throws 
Exception {
+
+               Pattern.begin("start").where(new FilterFunction<Object>() {
+                       @Override
+                       public boolean filter(Object value) throws Exception {
+                               return true;
+                       }
+               }).oneOrMore().zeroOrMore();
+       }
+
+       @Test(expected = MalformedPatternException.class)
+       public void testPatternCanHaveQuantifierSpecifiedOnce2() throws 
Exception {
+
+               Pattern.begin("start").where(new FilterFunction<Object>() {
+                       @Override
+                       public boolean filter(Object value) throws Exception {
+                               return true;
+                       }
+               }).zeroOrMore().times(1);
+       }
+
+       @Test(expected = MalformedPatternException.class)
+       public void testPatternCanHaveQuantifierSpecifiedOnce3() throws 
Exception {
+
+               Pattern.begin("start").where(new FilterFunction<Object>() {
+                       @Override
+                       public boolean filter(Object value) throws Exception {
+                               return true;
+                       }
+               }).times(1).oneOrMore();
+       }
+
+       @Test(expected = MalformedPatternException.class)
+       public void testPatternCanHaveQuantifierSpecifiedOnce4() throws 
Exception {
+
+               Pattern.begin("start").where(new FilterFunction<Object>() {
+                       @Override
+                       public boolean filter(Object value) throws Exception {
+                               return true;
+                       }
+               }).oneOrMore().oneOrMore(true);
+       }
+
+       @Test(expected = MalformedPatternException.class)
+       public void testPatternCanHaveQuantifierSpecifiedOnce5() throws 
Exception {
+
+               Pattern.begin("start").where(new FilterFunction<Object>() {
+                       @Override
+                       public boolean filter(Object value) throws Exception {
+                               return true;
+                       }
+               }).oneOrMore(true).zeroOrMore(true);
+       }
 }

Reply via email to