Repository: flink
Updated Branches:
  refs/heads/table-retraction b237a3ef0 -> e265620d4 (forced update)


http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index f4bf647..4a00c1e 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -46,6 +47,7 @@ import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
+@SuppressWarnings("unchecked")
 public class NFAITCase extends TestLogger {
 
        @Test
@@ -56,12 +58,12 @@ public class NFAITCase extends TestLogger {
                SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
                Event endEvent = new Event(43, "end", 1.0);
 
-               inputEvents.add(new StreamRecord<Event>(startEvent, 1));
-               inputEvents.add(new StreamRecord<Event>(new Event(43, "foobar", 
1.0), 2));
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(new Event(43, "foobar", 
1.0), 2));
                inputEvents.add(new StreamRecord<Event>(new SubEvent(41, 
"barfoo", 1.0, 5.0), 3));
                inputEvents.add(new StreamRecord<Event>(middleEvent, 3));
-               inputEvents.add(new StreamRecord<Event>(new Event(43, "start", 
1.0), 4));
-               inputEvents.add(new StreamRecord<Event>(endEvent, 5));
+               inputEvents.add(new StreamRecord<>(new Event(43, "start", 1.0), 
4));
+               inputEvents.add(new StreamRecord<>(endEvent, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
@@ -212,12 +214,12 @@ public class NFAITCase extends TestLogger {
                final Event middleEvent;
                final Event endEvent;
 
-               events.add(new StreamRecord<Event>(new Event(1, "start", 1.0), 
1));
-               events.add(new StreamRecord<Event>(startEvent = new Event(2, 
"start", 1.0), 2));
-               events.add(new StreamRecord<Event>(middleEvent = new Event(3, 
"middle", 1.0), 3));
-               events.add(new StreamRecord<Event>(new Event(4, "foobar", 1.0), 
4));
-               events.add(new StreamRecord<Event>(endEvent = new Event(5, 
"end", 1.0), 11));
-               events.add(new StreamRecord<Event>(new Event(6, "end", 1.0), 
13));
+               events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1));
+               events.add(new StreamRecord<>(startEvent = new Event(2, 
"start", 1.0), 2));
+               events.add(new StreamRecord<>(middleEvent = new Event(3, 
"middle", 1.0), 3));
+               events.add(new StreamRecord<>(new Event(4, "foobar", 1.0), 4));
+               events.add(new StreamRecord<>(endEvent = new Event(5, "end", 
1.0), 11));
+               events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13));
 
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@@ -273,12 +275,12 @@ public class NFAITCase extends TestLogger {
                Set<Tuple2<Map<String, Event>, Long>> resultingTimeoutPatterns 
= new HashSet<>();
                Set<Tuple2<Map<String, Event>, Long>> expectedTimeoutPatterns = 
new HashSet<>();
 
-               events.add(new StreamRecord<Event>(new Event(1, "start", 1.0), 
1));
-               events.add(new StreamRecord<Event>(new Event(2, "start", 1.0), 
2));
-               events.add(new StreamRecord<Event>(new Event(3, "middle", 1.0), 
3));
-               events.add(new StreamRecord<Event>(new Event(4, "foobar", 1.0), 
4));
-               events.add(new StreamRecord<Event>(new Event(5, "end", 1.0), 
11));
-               events.add(new StreamRecord<Event>(new Event(6, "end", 1.0), 
13));
+               events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1));
+               events.add(new StreamRecord<>(new Event(2, "start", 1.0), 2));
+               events.add(new StreamRecord<>(new Event(3, "middle", 1.0), 3));
+               events.add(new StreamRecord<>(new Event(4, "foobar", 1.0), 4));
+               events.add(new StreamRecord<>(new Event(5, "end", 1.0), 11));
+               events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13));
 
                Map<String, Event> timeoutPattern1 = new HashMap<>();
                timeoutPattern1.put("start", new Event(1, "start", 1.0));
@@ -306,14 +308,14 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
-3268741540234334074L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new SimpleCondition<Event>() {
+               }).followedByAny("end").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
-8995174172182138608L;
 
                        @Override
@@ -353,13 +355,13 @@ public class NFAITCase extends TestLogger {
                SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
                Event endEvent=  new Event(46, "end", 1.0);
 
-               inputEvents.add(new StreamRecord<Event>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
                inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
                inputEvents.add(new StreamRecord<Event>(middleEvent2, 4));
                inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
                inputEvents.add(new StreamRecord<Event>(nextOne1, 6));
                inputEvents.add(new StreamRecord<Event>(nextOne2, 7));
-               inputEvents.add(new StreamRecord<Event>(endEvent, 8));
+               inputEvents.add(new StreamRecord<>(endEvent, 8));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
@@ -368,21 +370,21 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle-first").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+               
}).followedByAny("middle-first").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
                        private static final long serialVersionUID = 
6215754202506583964L;
 
                        @Override
                        public boolean filter(SubEvent value) throws Exception {
                                return value.getVolume() > 5.0;
                        }
-               
}).followedBy("middle-second").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+               
}).followedByAny("middle-second").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
                        private static final long serialVersionUID = 
6215754202506583964L;
 
                        @Override
                        public boolean filter(SubEvent value) throws Exception {
                                return value.getName().equals("next-one");
                        }
-               }).followedBy("end").where(new SimpleCondition<Event>() {
+               }).followedByAny("end").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
7056763917392056548L;
 
                        @Override
@@ -449,28 +451,28 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               
}).oneOrMore().allowCombinations().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().allowCombinations().optional().followedByAny("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
-               }).followedBy("end2").where(new SimpleCondition<Event>() {
+               }).followedByAny("end2").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("d");
                        }
-               }).followedBy("end3").where(new SimpleCondition<Event>() {
+               }).followedByAny("end3").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -537,7 +539,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -555,27 +557,14 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(4, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
-                       Sets.newHashSet(startEvent, middleEvent1, end1),
-                       Sets.newHashSet(startEvent, middleEvent2, end1),
-                       Sets.newHashSet(startEvent, end1)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end1),
+                       Lists.newArrayList(startEvent, middleEvent1, end1),
+                       Lists.newArrayList(startEvent, middleEvent2, end1),
+                       Lists.newArrayList(startEvent, end1)
+               ));
        }
 
        @Test
@@ -724,14 +713,14 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle-first").where(new 
SimpleCondition<Event>() {
+               }).followedByAny("middle-first").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               
}).oneOrMore().allowCombinations().optional().followedBy("middle-second").where(new
 SimpleCondition<Event>() {
+               
}).oneOrMore().allowCombinations().optional().followedByAny("middle-second").where(new
 SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -803,21 +792,21 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("branching").where(new SimpleCondition<Event>() {
+               }).followedByAny("branching").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).followedBy("merging").where(new SimpleCondition<Event>() {
+               }).followedByAny("merging").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("f");
                        }
-               }).followedBy("kleene").where(new SimpleCondition<Event>() {
+               }).followedByAny("kleene").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -940,7 +929,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("d");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -958,25 +947,12 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(2, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(start, middleEvent1, middleEvent2, end),
-                       Sets.newHashSet(start, middleEvent2, end)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(start, middleEvent1, middleEvent2, 
end),
+                       Lists.newArrayList(start, middleEvent2, end)
+               ));
        }
 
        @Test
@@ -1000,14 +976,14 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore().allowCombinations().followedBy("end1").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().allowCombinations().followedByAny("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1183,14 +1159,14 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore().followedBy("end1").where(new 
SimpleCondition<Event>() {
+               }).oneOrMore().followedByAny("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1201,26 +1177,16 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
-
-               assertEquals(3, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end1),
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
-                       Sets.newHashSet(startEvent, middleEvent1, end1)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, end1),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end1),
+                       Lists.newArrayList(startEvent, middleEvent2, 
middleEvent3, end1),
+                       Lists.newArrayList(startEvent, middleEvent3, end1),
+                       Lists.newArrayList(startEvent, middleEvent2, end1),
+                       Lists.newArrayList(startEvent, middleEvent1, end1)
+               ));
        }
 
        @Test
@@ -1322,25 +1288,12 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
-
-               assertEquals(2, allPatterns.size());
-               assertEquals(Sets.<Set<Event>>newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent3, 
end1)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end1),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, end1)
+               ));
        }
 
        @Test
@@ -1375,25 +1328,13 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(middleEvent1, middleEvent2, end1),
+                       Lists.newArrayList(middleEvent2, middleEvent3, end1)
+               ));
 
-               assertEquals(2, allPatterns.size());
-               assertEquals(Sets.<Set<Event>>newHashSet(
-                       Sets.newHashSet(middleEvent1, middleEvent2, end1),
-                       Sets.newHashSet(middleEvent2, middleEvent3, end1)
-               ), resultingPatterns);
        }
 
        @Test
@@ -1442,49 +1383,6 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testTimesStrictWithFollowedBy() {
-               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
-               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 2));
-               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
-               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
-               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
-               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
-
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("c");
-                       }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("a");
-                       }
-               
}).times(2).allowCombinations().consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("b");
-                       }
-               });
-
-               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
-
-               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent2, ConsecutiveData.end)
-               ));
-       }
-
-       @Test
        public void testTimesNotStrictWithFollowedByEager() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
@@ -1543,7 +1441,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1571,49 +1469,6 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testTimesStrictWithFollowedByNotEager() {
-               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
-               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 2));
-               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
-               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
-               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
-
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("c");
-                       }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("a");
-                       }
-               
}).times(2).allowCombinations().consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("b");
-                       }
-               });
-
-               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
-
-               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
-                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
-               ));
-       }
-
-       @Test
        public void testTimesStrictWithNextAndConsecutive() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
@@ -1789,29 +1644,16 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
-
-               assertEquals(6, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(middleEvent1,  middleEvent2, 
middleEvent3),
-                       Sets.newHashSet(middleEvent1,  middleEvent2),
-                       Sets.newHashSet(middleEvent1),
-                       Sets.newHashSet(middleEvent2,  middleEvent3),
-                       Sets.newHashSet(middleEvent2),
-                       Sets.newHashSet(middleEvent3)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(middleEvent1,  middleEvent2, 
middleEvent3),
+                       Lists.newArrayList(middleEvent1,  middleEvent2),
+                       Lists.newArrayList(middleEvent1),
+                       Lists.newArrayList(middleEvent2,  middleEvent3),
+                       Lists.newArrayList(middleEvent2),
+                       Lists.newArrayList(middleEvent3)
+               ));
        }
 
        @Test
@@ -1895,26 +1737,13 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(3, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent,  middleEvent1, 
middleEvent2, middleEvent3),
-                       Sets.newHashSet(startEvent,  middleEvent1, 
middleEvent2),
-                       Sets.newHashSet(startEvent,  middleEvent1)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(startEvent,  middleEvent1, 
middleEvent2, middleEvent3),
+                       Lists.newArrayList(startEvent,  middleEvent1, 
middleEvent2),
+                       Lists.newArrayList(startEvent,  middleEvent1)
+               ));
        }
 
        ///////////////////////////////         Optional           
////////////////////////////////////////
@@ -1978,7 +1807,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -2025,7 +1854,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -2071,7 +1900,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -2098,6 +1927,54 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
+       public void testOneOrMoreStrictOptional() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               
}).oneOrMore().consecutive().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
+               ));
+       }
+
+       @Test
        public void testTimesStrictOptional1() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
@@ -2201,94 +2078,92 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testStrictCombinationsOneOrMore() {
-               List<List<Event>> resultingPatterns = 
testStrictOneOrMore(false);
+       public void testStrictOneOrMore() {
+               List<List<Event>> resultingPatterns = 
testOneOrMore(Quantifier.ConsumingStrategy.STRICT);
 
                compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
-                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end),
-                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
-                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.end),
-                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
-                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent4, ConsecutiveData.end)
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testSkipTillNextOneOrMore() {
+               List<List<Event>> resultingPatterns = 
testOneOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_NEXT);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end)
                ));
        }
 
        @Test
-       public void testStrictEagerOneOrMore() {
-               List<List<Event>> resultingPatterns = testStrictOneOrMore(true);
+       public void testSkipTillAnyOneOrMore() {
+               List<List<Event>> resultingPatterns = 
testOneOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_ANY);
 
                compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, 
ConsecutiveData.middleEvent4, ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent4, ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent4, 
ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end)
                ));
        }
 
-       private List<List<Event>> testStrictOneOrMore(boolean eager) {
+       private List<List<Event>> testOneOrMore(Quantifier.ConsumingStrategy 
strategy) {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
                inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2));
                inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
                inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
-               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 5));
-               inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 6));
-               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent4, 7));
-               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 8));
-
-               Pattern<Event, ?> pattern;
-               if (eager) {
-                       pattern = Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
-                               private static final long serialVersionUID = 
5726188262756267490L;
-
-                               @Override
-                               public boolean filter(Event value) throws 
Exception {
-                                       return value.getName().equals("c");
-                               }
-                       }).followedBy("middle").where(new 
SimpleCondition<Event>() {
-                               private static final long serialVersionUID = 
5726188262756267490L;
-
-                               @Override
-                               public boolean filter(Event value) throws 
Exception {
-                                       return value.getName().equals("a");
-                               }
-                       }).oneOrMore().consecutive()
-                                       .followedBy("end1").where(new 
SimpleCondition<Event>() {
-                                               private static final long 
serialVersionUID = 5726188262756267490L;
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 4));
+               inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent4, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
 
-                                               @Override
-                                               public boolean filter(Event 
value) throws Exception {
-                                                       return 
value.getName().equals("b");
-                                               }
-                                       });
-               } else {
-                       pattern = Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
-                               private static final long serialVersionUID = 
5726188262756267490L;
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
 
-                               @Override
-                               public boolean filter(Event value) throws 
Exception {
-                                       return value.getName().equals("c");
-                               }
-                       }).followedBy("middle").where(new 
SimpleCondition<Event>() {
-                               private static final long serialVersionUID = 
5726188262756267490L;
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
 
-                               @Override
-                               public boolean filter(Event value) throws 
Exception {
-                                       return value.getName().equals("a");
-                               }
-                       }).oneOrMore().allowCombinations().consecutive()
-                                       .followedBy("end1").where(new 
SimpleCondition<Event>() {
-                                               private static final long 
serialVersionUID = 5726188262756267490L;
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore();
 
-                                               @Override
-                                               public boolean filter(Event 
value) throws Exception {
-                                                       return 
value.getName().equals("b");
-                                               }
-                                       });
+               switch (strategy) {
+                       case STRICT:
+                               pattern = pattern.consecutive();
+                               break;
+                       case SKIP_TILL_NEXT:
+                               break;
+                       case SKIP_TILL_ANY:
+                               pattern = pattern.allowCombinations();
+                               break;
                }
 
+               pattern = pattern.followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
                return feedNFA(inputEvents, nfa);
@@ -2296,9 +2171,10 @@ public class NFAITCase extends TestLogger {
 
        @Test
        public void testStrictEagerZeroOrMore() {
-               List<List<Event>> resultingPatterns = 
testStrictZeroOrMore(true);
+               List<List<Event>> resultingPatterns = 
testZeroOrMore(Quantifier.ConsumingStrategy.STRICT);
 
                compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
@@ -2306,67 +2182,75 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
-       public void testStrictCombinationsZeroOrMore() {
-               List<List<Event>> resultingPatterns = 
testStrictZeroOrMore(false);
+       public void testSkipTillAnyZeroOrMore() {
+               List<List<Event>> resultingPatterns = 
testZeroOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_ANY);
 
                compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent4, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, 
ConsecutiveData.middleEvent4, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent4, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end),
-                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.end),
-                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
                        Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
                ));
        }
 
-       private List<List<Event>> testStrictZeroOrMore(boolean eager) {
+       @Test
+       public void testSkipTillNextZeroOrMore() {
+               List<List<Event>> resultingPatterns = 
testZeroOrMore(Quantifier.ConsumingStrategy.SKIP_TILL_NEXT);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent4, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
+               ));
+       }
+
+       private List<List<Event>> testZeroOrMore(Quantifier.ConsumingStrategy 
strategy) {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
                inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
                inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2));
                inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
                inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 4));
                inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
-               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent4, 6));
                inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
 
-               Pattern<Event, ?> pattern = eager
-                               ? Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
-                                       private static final long 
serialVersionUID = 5726188262756267490L;
-
-                                       @Override
-                                       public boolean filter(Event value) 
throws Exception {
-                                               return 
value.getName().equals("c");
-                                       }
-                               }).followedBy("middle").where(new 
SimpleCondition<Event>() {
-                                       private static final long 
serialVersionUID = 5726188262756267490L;
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
 
-                                       @Override
-                                       public boolean filter(Event value) 
throws Exception {
-                                               return 
value.getName().equals("a");
-                                       }
-                               
}).oneOrMore().optional().consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
-                                       private static final long 
serialVersionUID = 5726188262756267490L;
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
 
-                                       @Override
-                                       public boolean filter(Event value) 
throws Exception {
-                                               return 
value.getName().equals("b");
-                                       }
-                               })
-                               : Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
-                                       private static final long 
serialVersionUID = 5726188262756267490L;
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().optional();
 
-                                       @Override
-                                       public boolean filter(Event value) 
throws Exception {
-                                               return 
value.getName().equals("c");
-                                       }
-                               }).followedBy("middle").where(new 
SimpleCondition<Event>() {
-                                       private static final long 
serialVersionUID = 5726188262756267490L;
+               switch (strategy) {
+                       case STRICT:
+                               pattern = pattern.consecutive();
+                               break;
+                       case SKIP_TILL_NEXT:
+                               break;
+                       case SKIP_TILL_ANY:
+                               pattern = pattern.allowCombinations();
+                               break;
+               }
 
-                                       @Override
-                                       public boolean filter(Event value) 
throws Exception {
-                                               return 
value.getName().equals("a");
-                                       }
-                               
}).oneOrMore().allowCombinations().optional().consecutive().followedBy("end1").where(new
 SimpleCondition<Event>() {
+               pattern = pattern.followedBy("end1").where(new 
SimpleCondition<Event>() {
                                        private static final long 
serialVersionUID = 5726188262756267490L;
 
                                        @Override
@@ -2380,7 +2264,6 @@ public class NFAITCase extends TestLogger {
                return feedNFA(inputEvents, nfa);
        }
 
-
        @Test
        public void testTimesStrict() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
@@ -2400,14 +2283,14 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               
}).times(2).allowCombinations().consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
+               }).times(2).consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -2444,7 +2327,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -2733,16 +2616,10 @@ public class NFAITCase extends TestLogger {
                                Lists.<List<Event>>newArrayList(
                                                Lists.newArrayList(startEvent1, 
endEvent, middleEvent1, middleEvent2, middleEvent4),
                                                Lists.newArrayList(startEvent1, 
endEvent, middleEvent2, middleEvent1),
-                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent4, middleEvent3),
-                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent4, middleEvent2),
                                                Lists.newArrayList(startEvent1, 
endEvent, middleEvent3, middleEvent1),
                                                Lists.newArrayList(startEvent2, 
endEvent, middleEvent3, middleEvent4),
                                                Lists.newArrayList(startEvent1, 
endEvent, middleEvent4, middleEvent1),
-                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent4),
                                                Lists.newArrayList(startEvent1, 
endEvent, middleEvent1),
-                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent2),
-                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent3),
-                                               Lists.newArrayList(startEvent2, 
endEvent, middleEvent4),
                                                Lists.newArrayList(startEvent2, 
endEvent, middleEvent3)
                                )
                );
@@ -2798,9 +2675,7 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-               return resultingPatterns;
+               return feedNFA(inputEvents, nfa);
        }
 
        private static class MySubeventIterCondition extends 
IterativeCondition<SubEvent> {
@@ -2887,9 +2762,7 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-               return resultingPatterns;
+               return feedNFA(inputEvents, nfa);
        }
 
        private static class MyEventIterCondition extends 
IterativeCondition<Event> {
@@ -2978,7 +2851,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle1").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+               }).followedByAny("middle1").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
                        private static final long serialVersionUID = 
2178338526904474690L;
 
                        @Override
@@ -3043,14 +2916,14 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               
}).oneOrMore().followedBy("middle1").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+               
}).oneOrMore().followedByAny("middle1").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
                        private static final long serialVersionUID = 
2178338526904474690L;
 
                        @Override
                        public boolean filter(SubEvent value) throws Exception {
                                return value.getName().startsWith("foo");
                        }
-               }).followedBy("end").where(new IterativeCondition<Event>() {
+               }).followedByAny("end").where(new IterativeCondition<Event>() {
                        private static final long serialVersionUID = 
7056763917392056548L;
 
                        @Override
@@ -3084,6 +2957,131 @@ public class NFAITCase extends TestLogger {
                );
        }
 
+
+       ///////////////////////////////////////   Skip till next     
/////////////////////////////
+
+       @Test
+       public void testBranchingPatternSkipTillNext() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "start", 1.0);
+               SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10.0);
+               SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
+               SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
+               SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
+               SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
+               Event endEvent=  new Event(46, "end", 1.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<Event>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<Event>(nextOne1, 6));
+               inputEvents.add(new StreamRecord<Event>(nextOne2, 7));
+               inputEvents.add(new StreamRecord<>(endEvent, 8));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedBy("middle-first").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return value.getVolume() > 5.0;
+                       }
+               
}).followedBy("middle-second").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return value.getName().equals("next-one");
+                       }
+               }).followedByAny("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> patterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(patterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(startEvent, middleEvent1, nextOne1, 
endEvent)
+               ));
+       }
+
+       @Test
+       public void testBranchingPatternMixedFollowedBy() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "start", 1.0);
+               SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10.0);
+               SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
+               SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
+               SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
+               SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
+               Event endEvent=  new Event(46, "end", 1.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<Event>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<Event>(nextOne1, 6));
+               inputEvents.add(new StreamRecord<Event>(nextOne2, 7));
+               inputEvents.add(new StreamRecord<>(endEvent, 8));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               
}).followedByAny("middle-first").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return value.getVolume() > 5.0;
+                       }
+               
}).followedBy("middle-second").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return value.getName().equals("next-one");
+                       }
+               }).followedByAny("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> patterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(patterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(startEvent, middleEvent1, nextOne1, 
endEvent),
+                       Lists.newArrayList(startEvent, middleEvent2, nextOne1, 
endEvent),
+                       Lists.newArrayList(startEvent, middleEvent3, nextOne1, 
endEvent)
+               ));
+       }
+
+
+       /////////////////////////////////////////    Utility      
////////////////////////////////
        private List<List<Event>> feedNFA(List<StreamRecord<Event>> 
inputEvents, NFA<Event> nfa) {
                List<List<Event>> resultingPatterns = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index da1f9c8..ced9efe 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -149,45 +149,6 @@ public class NFACompilerTest extends TestLogger {
                assertEquals(0, endingState.getStateTransitions().size());
        }
 
-       @Test
-       public void testNFACompilerWithKleeneStar() {
-
-               Pattern<Event, Event> pattern = 
Pattern.<Event>begin("start").where(startFilter)
-                       
.followedBy("middle").subtype(SubEvent.class).oneOrMore().optional()
-                       .followedBy("end").where(endFilter);
-
-               NFA<Event> nfa = NFACompiler.compile(pattern, serializer, 
false);
-
-               Set<State<Event>> states = nfa.getStates();
-               assertEquals(5, states.size());
-
-
-               Set<Tuple2<String, Set<Tuple2<String, StateTransitionAction>>>> 
stateMap = new HashSet<>();
-               for (State<Event> state : states) {
-                       stateMap.add(Tuple2.of(state.getName(), 
unfoldTransitions(state)));
-               }
-
-               assertEquals(stateMap, newHashSet(
-                       Tuple2.of("start", newHashSet(Tuple2.of("middle", 
StateTransitionAction.TAKE))),
-                       Tuple2.of("middle", newHashSet(
-                               Tuple2.of("middle", 
StateTransitionAction.IGNORE),
-                               Tuple2.of("middle", StateTransitionAction.TAKE)
-                       )),
-                   Tuple2.of("middle", newHashSet(
-                           Tuple2.of("middle", StateTransitionAction.IGNORE),
-                           Tuple2.of("middle", StateTransitionAction.TAKE),
-                           Tuple2.of("end", StateTransitionAction.PROCEED)
-                   )),
-                       Tuple2.of("end", newHashSet(
-                               Tuple2.of(NFACompiler.ENDING_STATE_NAME, 
StateTransitionAction.TAKE),
-                               Tuple2.of("end", StateTransitionAction.IGNORE)
-                       )),
-                   Tuple2.of(NFACompiler.ENDING_STATE_NAME, Sets.newHashSet())
-               ));
-
-       }
-
-
        private <T> Set<Tuple2<String, StateTransitionAction>> 
unfoldTransitions(final State<T> state) {
                final Set<Tuple2<String, StateTransitionAction>> transitions = 
new HashSet<>();
                for (StateTransition<T> transition : 
state.getStateTransitions()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index d599ec9..8465bc3 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -208,6 +208,7 @@ public class CEPOperatorTest extends TestLogger {
         * Tests that the internal time of a CEP operator advances only given 
watermarks. See FLINK-5033
         */
        @Test
+       @SuppressWarnings("unchecked")
        public void testKeyedAdvancingTimeWithoutElements() throws Exception {
                final KeySelector<Event, Integer> keySelector = new 
TestKeySelector();
 
@@ -522,7 +523,7 @@ public class CEPOperatorTest extends TestLogger {
                                                        return 
value.getName().equals("start");
                                                }
                                        })
-                                       
.followedBy("middle").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                                       
.followedByAny("middle").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
                                                private static final long 
serialVersionUID = 6215754202506583964L;
 
                                                @Override
@@ -530,7 +531,7 @@ public class CEPOperatorTest extends TestLogger {
                                                        return 
value.getVolume() > 5.0;
                                                }
                                        })
-                                       .followedBy("end").where(new 
SimpleCondition<Event>() {
+                                       .followedByAny("end").where(new 
SimpleCondition<Event>() {
                                                private static final long 
serialVersionUID = 7056763917392056548L;
 
                                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 8c4304a..3c1da2e 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.pattern;
 
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
 import org.apache.flink.cep.pattern.conditions.OrCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
@@ -58,8 +59,8 @@ public class PatternTest extends TestLogger {
                assertNotNull(previous2 = previous.getPrevious());
                assertNull(previous2.getPrevious());
 
-               assertTrue(pattern instanceof FollowedByPattern);
-               assertTrue(previous instanceof FollowedByPattern);
+               assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, 
pattern.getQuantifier().getConsumingStrategy());
+               assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, 
previous.getQuantifier().getConsumingStrategy());
 
                assertEquals(pattern.getName(), "end");
                assertEquals(previous.getName(), "next");
@@ -137,7 +138,7 @@ public class PatternTest extends TestLogger {
                assertNotNull(previous2 = previous.getPrevious());
                assertNull(previous2.getPrevious());
 
-               assertTrue(pattern instanceof FollowedByPattern);
+               assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, 
pattern.getQuantifier().getConsumingStrategy());
                assertNotNull(previous.getCondition());
 
                assertEquals(pattern.getName(), "end");
@@ -177,7 +178,7 @@ public class PatternTest extends TestLogger {
                assertNotNull(previous2 = previous.getPrevious());
                assertNull(previous2.getPrevious());
 
-               assertTrue(pattern instanceof FollowedByPattern);
+               assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, 
pattern.getQuantifier().getConsumingStrategy());
                assertFalse(previous.getCondition() instanceof OrCondition);
                assertTrue(previous2.getCondition() instanceof OrCondition);
 

Reply via email to