http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
new file mode 100644
index 0000000..6aaa4bb
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link IterativeCondition condition} which combines two conditions with a 
logical
+ * {@code OR} and returns {@code true} if at least one is {@code true}.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class OrCondition<T> extends IterativeCondition<T> {
+
+       private static final long serialVersionUID = 2554610954278485106L;
+
+       private final IterativeCondition<T> left;
+       private final IterativeCondition<T> right;
+
+       public OrCondition(final IterativeCondition<T> left, final 
IterativeCondition<T> right) {
+               this.left = left;
+               this.right = right;
+       }
+
+       @Override
+       public boolean filter(T value, Context<T> ctx) throws Exception {
+               return left.filter(value, ctx) || right.filter(value, ctx);
+       }
+
+       /**
+        * @return One of the {@link IterativeCondition conditions} combined in 
this condition.
+        */
+       public IterativeCondition<T> getLeft() {
+               return left;
+       }
+
+       /**
+        * @return One of the {@link IterativeCondition conditions} combined in 
this condition.
+        */
+       public IterativeCondition<T> getRight() {
+               return right;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
new file mode 100644
index 0000000..9ca52c5
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ *  A user-defined condition that decides if an element should be accepted in 
the pattern or not.
+ * Accepting an element also signals a state transition for the corresponding 
{@link org.apache.flink.cep.nfa.NFA}.
+ *
+ * <p>Contrary to the {@link IterativeCondition}, conditions that extend this 
class do not have access to the
+ * previously accepted elements in the pattern. Conditions that extend this 
class are simple {@code filter(...)}
+ * functions that decide based on the properties of the element at hand.
+ */
+public abstract class SimpleCondition<T> extends IterativeCondition<T> 
implements FilterFunction<T> {
+
+       private static final long serialVersionUID = 4942618239408140245L;
+
+       @Override
+       public boolean filter(T value, Context<T> ctx) throws Exception {
+               return filter(value);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
new file mode 100644
index 0000000..91f6c21
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link IterativeCondition condition} which filters elements of the given 
type.
+ * An element is filtered out iff it is not assignable to the given subtype of 
{@code T}.
+ *
+ * @param <T> Type of the elements to be filtered
+ */
+public class SubtypeCondition<T> extends SimpleCondition<T> {
+       private static final long serialVersionUID = -2990017519957561355L;
+
+       /** The subtype to filter for. */
+       private final Class<? extends T> subtype;
+
+       public SubtypeCondition(final Class<? extends T> subtype) {
+               this.subtype = subtype;
+       }
+
+       @Override
+       public boolean filter(T value) throws Exception {
+               return subtype.isAssignableFrom(value.getClass());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 5887017..42117ee 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.cep;
 
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -81,7 +81,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        new Event(8, "end", 1.0)
                );
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -89,7 +89,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        }
                })
                .followedBy("middle").subtype(SubEvent.class).where(
-                               new FilterFunction<SubEvent>() {
+                               new SimpleCondition<SubEvent>() {
 
                                        @Override
                                        public boolean filter(SubEvent value) 
throws Exception {
@@ -97,7 +97,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                                        }
                                }
                        )
-               .followedBy("end").where(new FilterFunction<Event>() {
+               .followedBy("end").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -156,7 +156,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        }
                });
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -164,7 +164,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        }
                })
                        .followedBy("middle").subtype(SubEvent.class).where(
-                               new FilterFunction<SubEvent>() {
+                               new SimpleCondition<SubEvent>() {
 
                                        @Override
                                        public boolean filter(SubEvent value) 
throws Exception {
@@ -172,7 +172,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                                        }
                                }
                        )
-                       .followedBy("end").where(new FilterFunction<Event>() {
+                       .followedBy("end").where(new SimpleCondition<Event>() {
 
                                @Override
                                public boolean filter(Event value) throws 
Exception {
@@ -236,19 +236,19 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        }
                });
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new FilterFunction<Event>() {
+               }).followedBy("end").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -325,19 +325,19 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        }
                });
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new FilterFunction<Event>() {
+               }).followedBy("end").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -378,7 +378,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
 
                Pattern<Tuple2<Integer, Integer>, ?> pattern =
                        Pattern.<Tuple2<Integer, Integer>>begin("start")
-                               .where(new FilterFunction<Tuple2<Integer, 
Integer>>() {
+                               .where(new SimpleCondition<Tuple2<Integer, 
Integer>>() {
                                        @Override
                                        public boolean filter(Tuple2<Integer, 
Integer> rec) throws Exception {
                                                return rec.f1 == 1;
@@ -456,19 +456,19 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        }
                });
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new FilterFunction<Event>() {
+               }).followedBy("end").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -524,26 +524,26 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                );
 
                Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
-                       .where(new FilterFunction<Event>() {
+                       .where(new SimpleCondition<Event>() {
                                @Override
                                public boolean filter(Event value) throws 
Exception {
                                        return value.getName().equals("start");
                                }
                        })
                        .followedBy("middle")
-                       .where(new FilterFunction<Event>() {
+                       .where(new SimpleCondition<Event>() {
                                @Override
                                public boolean filter(Event value) throws 
Exception {
                                        return value.getPrice() == 2.0;
                                }
                        })
-                       .or(new FilterFunction<Event>() {
+                       .or(new SimpleCondition<Event>() {
                                @Override
                                public boolean filter(Event value) throws 
Exception {
                                        return value.getPrice() == 5.0;
                                }
                        })
-                       .followedBy("end").where(new FilterFunction<Event>() {
+                       .followedBy("end").where(new SimpleCondition<Event>() {
 
                                @Override
                                public boolean filter(Event value) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 5b05f19..197767e 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -18,20 +18,26 @@
 
 package org.apache.flink.cep.nfa;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.flink.api.common.functions.FilterFunction;
+import com.google.common.primitives.Doubles;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -39,7 +45,6 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class NFAITCase extends TestLogger {
 
@@ -58,23 +63,21 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<Event>(new Event(43, "start", 
1.0), 4));
                inputEvents.add(new StreamRecord<Event>(endEvent, 5));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               })
-               .followedBy("middle").subtype(SubEvent.class).where(new 
FilterFunction<SubEvent>() {
-                               private static final long serialVersionUID = 
6215754202506583964L;
+               }).followedBy("middle").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
 
-                               @Override
-                               public boolean filter(SubEvent value) throws 
Exception {
-                                       return value.getVolume() > 5.0;
-                               }
-                       })
-               .followedBy("end").where(new FilterFunction<Event>() {
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return value.getVolume() > 5.0;
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
7056763917392056548L;
 
                        @Override
@@ -113,14 +116,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent1, 3));
                inputEvents.add(new StreamRecord<>(end, 5));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).next("end").where(new FilterFunction<Event>() {
+               }).next("end").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -163,14 +166,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent2, 4));
                inputEvents.add(new StreamRecord<>(end, 5));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).next("end").where(new FilterFunction<Event>() {
+               }).next("end").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -217,21 +220,21 @@ public class NFAITCase extends TestLogger {
                events.add(new StreamRecord<Event>(new Event(6, "end", 1.0), 
13));
 
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
7907391379273505897L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
-3268741540234334074L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new FilterFunction<Event>() {
+               }).followedBy("end").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
-8995174172182138608L;
 
                        @Override
@@ -240,11 +243,12 @@ public class NFAITCase extends TestLogger {
                        }
                }).within(Time.milliseconds(10));
 
-
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
                for (StreamRecord<Event> event: events) {
-                       Collection<Map<String, Event>> patterns = 
nfa.process(event.getValue(), event.getTimestamp()).f0;
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                                       event.getValue(),
+                                       event.getTimestamp()).f0;
 
                        resultingPatterns.addAll(patterns);
                }
@@ -269,7 +273,6 @@ public class NFAITCase extends TestLogger {
                Set<Tuple2<Map<String, Event>, Long>> resultingTimeoutPatterns 
= new HashSet<>();
                Set<Tuple2<Map<String, Event>, Long>> expectedTimeoutPatterns = 
new HashSet<>();
 
-
                events.add(new StreamRecord<Event>(new Event(1, "start", 1.0), 
1));
                events.add(new StreamRecord<Event>(new Event(2, "start", 1.0), 
2));
                events.add(new StreamRecord<Event>(new Event(3, "middle", 1.0), 
3));
@@ -296,21 +299,21 @@ public class NFAITCase extends TestLogger {
                expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern3, 11L));
                expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern4, 13L));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
7907391379273505897L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
-3268741540234334074L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new FilterFunction<Event>() {
+               }).followedBy("end").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
-8995174172182138608L;
 
                        @Override
@@ -319,7 +322,6 @@ public class NFAITCase extends TestLogger {
                        }
                }).within(Time.milliseconds(10));
 
-
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), true);
 
                for (StreamRecord<Event> event: events) {
@@ -359,38 +361,35 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<Event>(nextOne2, 7));
                inputEvents.add(new StreamRecord<Event>(endEvent, 8));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               })
-                       
.followedBy("middle-first").subtype(SubEvent.class).where(new 
FilterFunction<SubEvent>() {
-                               private static final long serialVersionUID = 
6215754202506583964L;
+               }).followedBy("middle-first").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
 
-                               @Override
-                               public boolean filter(SubEvent value) throws 
Exception {
-                                       return value.getVolume() > 5.0;
-                               }
-                       })
-                       
.followedBy("middle-second").subtype(SubEvent.class).where(new 
FilterFunction<SubEvent>() {
-                               private static final long serialVersionUID = 
6215754202506583964L;
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return value.getVolume() > 5.0;
+                       }
+               
}).followedBy("middle-second").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
 
-                               @Override
-                               public boolean filter(SubEvent value) throws 
Exception {
-                                       return 
value.getName().equals("next-one");
-                               }
-                       })
-                       .followedBy("end").where(new FilterFunction<Event>() {
-                               private static final long serialVersionUID = 
7056763917392056548L;
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return value.getName().equals("next-one");
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
 
-                               @Override
-                               public boolean filter(Event value) throws 
Exception {
-                                       return value.getName().equals("end");
-                               }
-                       });
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
@@ -443,44 +442,42 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(end3, 8));
                inputEvents.add(new StreamRecord<>(end4, 9));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).zeroOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
-               })
-                       .followedBy("end2").where(new FilterFunction<Event>() {
-                               private static final long serialVersionUID = 
5726188262756267490L;
+               }).followedBy("end2").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
 
-                               @Override
-                               public boolean filter(Event value) throws 
Exception {
-                                       return value.getName().equals("d");
-                               }
-                       })
-                       .followedBy("end3").where(new FilterFunction<Event>() {
-                               private static final long serialVersionUID = 
5726188262756267490L;
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               }).followedBy("end3").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
 
-                               @Override
-                               public boolean filter(Event value) throws 
Exception {
-                                       return value.getName().equals("e");
-                               }
-                       });
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("e");
+                       }
+               });
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
@@ -533,21 +530,21 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent2, 4));
                inputEvents.add(new StreamRecord<>(end1, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).zeroOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -597,21 +594,21 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent3, 5));
                inputEvents.add(new StreamRecord<>(end1, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(true).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).zeroOrMore(true).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -660,14 +657,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent3, 5));
                inputEvents.add(new StreamRecord<>(end, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore().followedBy("end").where(new 
FilterFunction<Event>() {
+               }).zeroOrMore().followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -720,28 +717,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent3, 5));
                inputEvents.add(new StreamRecord<>(end, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle-first").where(new FilterFunction<Event>() 
{
+               }).followedBy("middle-first").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false).followedBy("middle-second").where(new 
FilterFunction<Event>() {
+               }).zeroOrMore(false).followedBy("middle-second").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("d");
                        }
-               }).zeroOrMore(false).followedBy("end").where(new 
FilterFunction<Event>() {
+               }).zeroOrMore(false).followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -799,35 +796,35 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(kleene2, 7));
                inputEvents.add(new StreamRecord<>(end, 8));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("branching").where(new FilterFunction<Event>() {
+               }).followedBy("branching").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).followedBy("merging").where(new FilterFunction<Event>() {
+               }).followedBy("merging").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("f");
                        }
-               }).followedBy("kleene").where(new FilterFunction<Event>() {
+               }).followedBy("kleene").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("d");
                        }
-               }).zeroOrMore(false).followedBy("end").where(new 
FilterFunction<Event>() {
+               }).zeroOrMore(false).followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -881,14 +878,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent3, 4));
                inputEvents.add(new StreamRecord<>(end, 5));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("d");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -896,7 +893,7 @@ public class NFAITCase extends TestLogger {
                                return value.getName().equals("a");
                        }
                }).zeroOrMore()
-                       .next("end").where(new FilterFunction<Event>() {
+                       .next("end").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -909,7 +906,6 @@ public class NFAITCase extends TestLogger {
 
                Set<Set<Event>> resultingPatterns = new HashSet<>();
 
-
                for (StreamRecord<Event> inputEvent : inputEvents) {
                        Collection<Map<String, Event>> patterns = nfa.process(
                                inputEvent.getValue(),
@@ -937,29 +933,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent2, 3));
                inputEvents.add(new StreamRecord<>(end, 5));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("d");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false)
-                       .next("end").where(new FilterFunction<Event>() {
-                               private static final long serialVersionUID = 
5726188262756267490L;
+               }).zeroOrMore(false).next("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
 
-                               @Override
-                               public boolean filter(Event value) throws 
Exception {
-                                       return value.getName().equals("b");
-                               }
-                       });
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
@@ -998,21 +993,21 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent2, 4));
                inputEvents.add(new StreamRecord<>(end1, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).oneOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1059,14 +1054,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(startEvent3, 5));
                inputEvents.add(new StreamRecord<>(end1, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore(false).followedBy("end").where(new 
FilterFunction<Event>() {
+               }).oneOrMore(false).followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1120,21 +1115,21 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent3, 5L));
                inputEvents.add(new StreamRecord<>(endEvent, 6L));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
6215754202506583964L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).next("middle").where(new FilterFunction<Event>() {
+               }).next("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
6215754202506583964L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).zeroOrMore(false).followedBy("end").where(new 
FilterFunction<Event>() {
+               }).zeroOrMore(false).followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
7056763917392056548L;
 
                        @Override
@@ -1181,21 +1176,21 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent3, 5));
                inputEvents.add(new StreamRecord<>(end1, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore(true).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).oneOrMore(true).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1240,21 +1235,21 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent, 5));
                inputEvents.add(new StreamRecord<>(end1, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).optional().followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1303,21 +1298,21 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent3, 4));
                inputEvents.add(new StreamRecord<>(end1, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).next("middle").where(new FilterFunction<Event>() {
+               }).next("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).times(2).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).times(2).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1362,14 +1357,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent3, 4));
                inputEvents.add(new StreamRecord<>(end1, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).times(2).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).times(2).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1411,14 +1406,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(startEvent, 1));
                inputEvents.add(new StreamRecord<>(end1, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).optional().followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1464,14 +1459,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent2, 4));
                inputEvents.add(new StreamRecord<>(middleEvent3, 5));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1525,7 +1520,7 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(end2, 6));
                inputEvents.add(new StreamRecord<>(end3, 6));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1571,14 +1566,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(startEvent, 1));
                inputEvents.add(new StreamRecord<>(middleEvent1, 3));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1624,14 +1619,14 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(middleEvent2, 4));
                inputEvents.add(new StreamRecord<>(middleEvent3, 5));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1676,21 +1671,21 @@ public class NFAITCase extends TestLogger {
                Event middleEvent3 = new Event(43, "a", 4.0);
                Event end1 = new Event(44, "b", 5.0);
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).next("middle").where(new FilterFunction<Event>() {
+               }).next("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).times(2).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).times(2).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1719,21 +1714,21 @@ public class NFAITCase extends TestLogger {
                Event middleEvent = new Event(43, "a", 4.0);
                Event end1 = new Event(44, "b", 5.0);
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).optional().followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1761,21 +1756,21 @@ public class NFAITCase extends TestLogger {
                Event middleEvent2 = new Event(42, "a", 3.0);
                Event end1 = new Event(44, "b", 5.0);
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).oneOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1805,21 +1800,21 @@ public class NFAITCase extends TestLogger {
                Event middleEvent2 = new Event(42, "a", 3.0);
                Event end1 = new Event(44, "b", 5.0);
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
-               }).followedBy("middle").where(new FilterFunction<Event>() {
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false).followedBy("end1").where(new 
FilterFunction<Event>() {
+               }).zeroOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1841,4 +1836,459 @@ public class NFAITCase extends TestLogger {
                assertEquals(true, nfa.isEmpty());
        }
 
+
+       //////////////////////                  Iterative BooleanConditions     
                /////////////////////////
+
+       private final Event startEvent1 = new Event(40, "start", 1.0);
+       private final Event startEvent2 = new Event(40, "start", 2.0);
+       private final Event startEvent3 = new Event(40, "start", 3.0);
+       private final Event startEvent4 = new Event(40, "start", 4.0);
+       private final SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10);
+       private final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10);
+       private final SubEvent middleEvent3 = new SubEvent(43, "foo3", 3.0, 10);
+       private final SubEvent middleEvent4 = new SubEvent(43, "foo4", 1.0, 10);
+       private final Event nextOne = new Event(44, "next-one", 1.0);
+       private final Event endEvent = new Event(46, "end", 1.0);
+
+       @Test
+       public void testIterativeWithBranchingPatternEager() {
+               List<List<Event>> actual = 
testIterativeWithBranchingPattern(true);
+
+               compareMaps(actual,
+                               Lists.<List<Event>>newArrayList(
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent1, middleEvent2, middleEvent4),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent2, middleEvent1),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent1),
+                                               Lists.newArrayList(startEvent2, 
endEvent, middleEvent3, middleEvent4),
+                                               Lists.newArrayList(startEvent2, 
endEvent, middleEvent3)
+                               )
+               );
+       }
+
+       @Test
+       public void testIterativeWithBranchingPatternCombinations() {
+               List<List<Event>> actual = 
testIterativeWithBranchingPattern(false);
+
+               compareMaps(actual,
+                               Lists.<List<Event>>newArrayList(
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent1, middleEvent2, middleEvent4),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent2, middleEvent1),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent4, middleEvent3),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent4, middleEvent2),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent3, middleEvent1),
+                                               Lists.newArrayList(startEvent2, 
endEvent, middleEvent3, middleEvent4),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent4, middleEvent1),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent4),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent1),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent2),
+                                               Lists.newArrayList(startEvent1, 
endEvent, middleEvent3),
+                                               Lists.newArrayList(startEvent2, 
endEvent, middleEvent4),
+                                               Lists.newArrayList(startEvent2, 
endEvent, middleEvent3)
+                               )
+               );
+       }
+
+       private List<List<Event>> testIterativeWithBranchingPattern(boolean 
eager) {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(startEvent1, 1));
+               inputEvents.add(new StreamRecord<Event>(middleEvent1, 2));
+               inputEvents.add(new StreamRecord<Event>(middleEvent2, 3));
+               inputEvents.add(new StreamRecord<>(startEvent2, 4));
+               inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<Event>(middleEvent4, 5));
+               inputEvents.add(new StreamRecord<>(nextOne, 6));
+               inputEvents.add(new StreamRecord<>(endEvent, 8));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedBy("middle").subtype(SubEvent.class).where(new 
IterativeCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(SubEvent value, Context<SubEvent> 
ctx) throws Exception {
+                               if (!value.getName().startsWith("foo")) {
+                                       return false;
+                               }
+
+                               double sum = 0.0;
+                               for (Event event : 
ctx.getEventsForPattern("middle")) {
+                                       sum += event.getPrice();
+                               }
+                               sum += value.getPrice();
+                               return Double.compare(sum, 5.0) < 0;
+                       }
+               }).oneOrMore(eager).followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                                       inputEvent.getValue(),
+                                       inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> p: patterns) {
+                               resultingPatterns.add(new 
ArrayList<>(p.values()));
+                       }
+               }
+
+               return resultingPatterns;
+       }
+
+       @Test
+       public void testIterativeWithLoopingStartingEager() {
+               List<List<Event>> actual = 
testIterativeWithLoopingStarting(true);
+
+               compareMaps(actual,
+                               Lists.<List<Event>>newArrayList(
+                                               Lists.newArrayList(startEvent1, 
startEvent2, endEvent),
+                                               Lists.newArrayList(startEvent1, 
endEvent),
+                                               Lists.newArrayList(startEvent2, 
endEvent),
+                                               Lists.newArrayList(startEvent3, 
endEvent),
+                                               Lists.newArrayList(endEvent)
+                               )
+               );
+       }
+
+       @Test
+       public void testIterativeWithLoopingStartingCombination() {
+               List<List<Event>> actual = 
testIterativeWithLoopingStarting(false);
+
+               compareMaps(actual,
+                               Lists.<List<Event>>newArrayList(
+                                               Lists.newArrayList(startEvent1, 
startEvent2, endEvent),
+                                               Lists.newArrayList(startEvent1, 
startEvent3, endEvent),
+                                               Lists.newArrayList(startEvent1, 
endEvent),
+                                               Lists.newArrayList(startEvent2, 
endEvent),
+                                               Lists.newArrayList(startEvent3, 
endEvent),
+                                               Lists.newArrayList(endEvent)
+                               )
+               );
+       }
+
+       private List<List<Event>> testIterativeWithLoopingStarting(boolean 
eager) {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+               inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+               inputEvents.add(new StreamRecord<>(startEvent3, 3L));
+               inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+               // for now, a pattern inherits its continuity property from the 
followedBy() or next(), and the default
+               // behavior (which is the one applied in the case that the 
pattern graph starts with such a pattern)
+               // of a looping pattern is with relaxed continuity (as in 
followedBy).
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(Event value, Context<Event> ctx) 
throws Exception {
+                               if (!value.getName().equals("start")) {
+                                       return false;
+                               }
+
+                               double sum = 0.0;
+                               for (Event event : 
ctx.getEventsForPattern("start")) {
+                                       sum += event.getPrice();
+                               }
+                               sum += value.getPrice();
+                               return Double.compare(sum, 5.0) < 0;
+                       }
+               }).zeroOrMore(eager).followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                                       inputEvent.getValue(),
+                                       inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> p: patterns) {
+                               resultingPatterns.add(new 
ArrayList<>(p.values()));
+                       }
+               }
+
+               return resultingPatterns;
+       }
+
+       @Test
+       public void testIterativeWithPrevPatternDependency() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+               inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+               inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).oneOrMore().followedBy("end").where(new 
IterativeCondition<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
+
+                       @Override
+                       public boolean filter(Event value, Context<Event> ctx) 
throws Exception {
+                               if (!value.getName().equals("end")) {
+                                       return false;
+                               }
+
+                               double sum = 0.0;
+                               for (Event event : 
ctx.getEventsForPattern("start")) {
+                                       sum += event.getPrice();
+                               }
+                               return Double.compare(sum, 2.0) >= 0;
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                                       inputEvent.getValue(),
+                                       inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> p: patterns) {
+                               resultingPatterns.add(new 
ArrayList<>(p.values()));
+                       }
+               }
+
+               compareMaps(resultingPatterns,
+                               Lists.<List<Event>>newArrayList(
+                                               Lists.newArrayList(startEvent1, 
startEvent2, endEvent),
+                                               Lists.newArrayList(startEvent2, 
endEvent)
+                               )
+               );
+       }
+
+       @Test
+       public void testIterativeWithABACPattern() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(startEvent1, 1L)); //1
+               inputEvents.add(new StreamRecord<Event>(middleEvent1, 2L)); //1
+
+               inputEvents.add(new StreamRecord<>(startEvent2, 2L)); //2
+               inputEvents.add(new StreamRecord<>(startEvent3, 2L)); //3
+               inputEvents.add(new StreamRecord<Event>(middleEvent2, 2L)); //2
+
+               inputEvents.add(new StreamRecord<>(startEvent4, 2L)); //4
+               inputEvents.add(new StreamRecord<Event>(middleEvent3, 2L)); //3
+               inputEvents.add(new StreamRecord<Event>(middleEvent4, 2L)); //1
+               inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedBy("middle1").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
2178338526904474690L;
+
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return value.getName().startsWith("foo");
+                       }
+               }).followedBy("middle2").where(new IterativeCondition<Event>() {
+                       private static final long serialVersionUID = 
-1223388426808292695L;
+
+                       @Override
+                       public boolean filter(Event value, Context<Event> ctx) 
throws Exception {
+                               if (!value.getName().equals("start")) {
+                                       return false;
+                               }
+
+                               double sum = 0.0;
+                               for (Event e: 
ctx.getEventsForPattern("middle2")) {
+                                       sum += e.getPrice();
+                               }
+                               sum += value.getPrice();
+                               return Double.compare(sum, 5.0) <= 0;
+                       }
+               }).oneOrMore().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
562590474115118323L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                                       inputEvent.getValue(),
+                                       inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> p: patterns) {
+                               resultingPatterns.add(new 
ArrayList<>(p.values()));
+                       }
+               }
+
+               compareMaps(resultingPatterns,
+                               Lists.<List<Event>>newArrayList(
+                                               Lists.newArrayList(startEvent1, 
startEvent2, startEvent3, middleEvent1, endEvent),
+                                               Lists.newArrayList(startEvent1, 
middleEvent1, startEvent2, endEvent),
+                                               Lists.newArrayList(startEvent1, 
middleEvent2, startEvent4, endEvent),
+                                               Lists.newArrayList(startEvent2, 
middleEvent2, startEvent4, endEvent),
+                                               Lists.newArrayList(startEvent3, 
middleEvent2, startEvent4, endEvent)
+                               )
+               );
+       }
+
+       @Test
+       public void testIterativeWithPrevPatternDependencyAfterBranching() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+               inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+               inputEvents.add(new StreamRecord<Event>(middleEvent1, 4L));
+               inputEvents.add(new StreamRecord<>(startEvent3, 5L));
+               inputEvents.add(new StreamRecord<Event>(middleEvent2, 6L));
+               inputEvents.add(new StreamRecord<>(endEvent, 7L));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
6215754202506583964L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               
}).oneOrMore().followedBy("middle1").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
+                       private static final long serialVersionUID = 
2178338526904474690L;
+
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return value.getName().startsWith("foo");
+                       }
+               }).followedBy("end").where(new IterativeCondition<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
+
+                       @Override
+                       public boolean filter(Event value, Context<Event> ctx) 
throws Exception {
+                               if (!value.getName().equals("end")) {
+                                       return false;
+                               }
+
+                               double sum = 0.0;
+                               for (Event event : 
ctx.getEventsForPattern("start")) {
+                                       sum += event.getPrice();
+                               }
+                               return Double.compare(sum, 2.0) >= 0;
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                                       inputEvent.getValue(),
+                                       inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> p: patterns) {
+                               resultingPatterns.add(new 
ArrayList<>(p.values()));
+                       }
+               }
+
+               compareMaps(resultingPatterns,
+                               Lists.<List<Event>>newArrayList(
+                                               Lists.newArrayList(startEvent1, 
startEvent2, middleEvent1, endEvent),
+                                               Lists.newArrayList(startEvent2, 
middleEvent1, endEvent),
+                                               Lists.newArrayList(startEvent1, 
startEvent2, middleEvent2, endEvent),
+                                               Lists.newArrayList(startEvent1, 
startEvent2, startEvent3, middleEvent2, endEvent),
+                                               Lists.newArrayList(startEvent2, 
startEvent3, middleEvent2, endEvent),
+                                               Lists.newArrayList(startEvent2, 
middleEvent2, endEvent),
+                                               Lists.newArrayList(startEvent3, 
middleEvent2, endEvent)
+                               )
+               );
+       }
+
+       private void compareMaps(List<List<Event>> actual, List<List<Event>> 
expected) {
+               Assert.assertEquals(expected.size(), actual.size());
+
+               for (List<Event> p: actual) {
+                       Collections.sort(p, new EventComparator());
+               }
+
+               for (List<Event> p: expected) {
+                       Collections.sort(p, new EventComparator());
+               }
+
+               Collections.sort(actual, new ListEventComparator());
+               Collections.sort(expected, new ListEventComparator());
+               Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+       }
+
+       private class ListEventComparator implements Comparator<List<Event>> {
+
+               @Override
+               public int compare(List<Event> o1, List<Event> o2) {
+                       int sizeComp = Integer.compare(o1.size(), o2.size());
+                       if (sizeComp == 0) {
+                               EventComparator comp = new EventComparator();
+                               for (int i = 0; i < o1.size(); i++) {
+                                       int eventComp = comp.compare(o1.get(i), 
o2.get(i));
+                                       if (eventComp != 0) {
+                                               return eventComp;
+                                       }
+                               }
+                               return 0;
+                       } else {
+                               return sizeComp;
+                       }
+               }
+       }
+
+       private class EventComparator implements Comparator<Event> {
+
+               @Override
+               public int compare(Event o1, Event o2) {
+                       int nameComp = o1.getName().compareTo(o2.getName());
+                       int priceComp = Doubles.compare(o1.getPrice(), 
o2.getPrice());
+                       int idComp = Integer.compare(o1.getId(), o2.getId());
+                       if (nameComp == 0) {
+                               if (priceComp == 0) {
+                                       return idComp;
+                               } else {
+                                       return priceComp;
+                               }
+                       } else {
+                               return nameComp;
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 40a0e7e..d2e392b 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -19,9 +19,9 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.pattern.FilterFunctions;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -58,7 +58,7 @@ public class NFATest extends TestLogger {
 
                startState.addTake(
                        endState,
-                       new FilterFunction<Event>() {
+                       new SimpleCondition<Event>() {
                                private static final long serialVersionUID = 
-4869589195918650396L;
 
                                @Override
@@ -68,7 +68,7 @@ public class NFATest extends TestLogger {
                        });
                endState.addTake(
                        endingState,
-                       new FilterFunction<Event>() {
+                       new SimpleCondition<Event>() {
                                private static final long serialVersionUID = 
2979804163709590673L;
 
                                @Override
@@ -76,7 +76,7 @@ public class NFATest extends TestLogger {
                                        return value.getName().equals("end");
                                }
                        });
-               endState.addIgnore(FilterFunctions.<Event>trueFunction());
+               endState.addIgnore(BooleanConditions.<Event>trueFunction());
 
                nfa.addState(startState);
                nfa.addState(endState);
@@ -241,7 +241,7 @@ public class NFATest extends TestLogger {
 
                startState.addTake(
                        endState,
-                       new FilterFunction<Event>() {
+                       new SimpleCondition<Event>() {
                                private static final long serialVersionUID = 
-4869589195918650396L;
 
                                @Override
@@ -251,7 +251,7 @@ public class NFATest extends TestLogger {
                        });
                endState.addTake(
                        endingState,
-                       new FilterFunction<Event>() {
+                       new SimpleCondition<Event>() {
                                private static final long serialVersionUID = 
2979804163709590673L;
 
                                @Override
@@ -259,7 +259,7 @@ public class NFATest extends TestLogger {
                                        return value.getName().equals("end");
                                }
                        });
-               endState.addIgnore(FilterFunctions.<Event>trueFunction());
+               endState.addIgnore(BooleanConditions.<Event>trueFunction());
 
                nfa.addState(startState);
                nfa.addState(endState);
@@ -268,7 +268,7 @@ public class NFATest extends TestLogger {
                return nfa;
        }
 
-       private static class NameFilter implements FilterFunction<Event> {
+       private static class NameFilter extends SimpleCondition<Event> {
 
                private static final long serialVersionUID = 
7472112494752423802L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index 25618d5..f0a25d2 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -84,11 +84,16 @@ public class SharedBufferTest extends TestLogger {
                sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], 
timestamp, DeweyNumber.fromString("1.1.0"));
 
                Collection<LinkedHashMultimap<String, Event>> patterns3 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 
DeweyNumber.fromString("1.1.0"));
-               sharedBuffer.remove("b", events[7], timestamp);
+               sharedBuffer.release("b", events[7], timestamp);
                Collection<LinkedHashMultimap<String, Event>> patterns4 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 
DeweyNumber.fromString("1.1.0"));
                Collection<LinkedHashMultimap<String, Event>> patterns1 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 
DeweyNumber.fromString("2.0.0"));
                Collection<LinkedHashMultimap<String, Event>> patterns2 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 
DeweyNumber.fromString("1.0.0"));
-               sharedBuffer.remove("b", events[5], timestamp);
+               sharedBuffer.release("b", events[5], timestamp);
+
+               assertEquals(1L, patterns3.size());
+               assertEquals(0L, patterns4.size());
+               assertEquals(1L, patterns1.size());
+               assertEquals(1L, patterns2.size());
 
                assertTrue(sharedBuffer.isEmpty());
                assertTrue(patterns4.isEmpty());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 93d78cc..80b1bcb 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.cep.nfa.compiler;
 
 import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -32,6 +31,7 @@ import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
 import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
@@ -48,7 +48,7 @@ import static org.junit.Assert.assertTrue;
 
 public class NFACompilerTest extends TestLogger {
 
-       private static final FilterFunction<Event> startFilter = new 
FilterFunction<Event>() {
+       private static final SimpleCondition<Event> startFilter = new 
SimpleCondition<Event>() {
                private static final long serialVersionUID = 
3314714776170474221L;
 
                @Override
@@ -57,7 +57,7 @@ public class NFACompilerTest extends TestLogger {
                }
        };
 
-       private static final FilterFunction<Event> endFilter = new 
FilterFunction<Event>() {
+       private static final SimpleCondition<Event> endFilter = new 
SimpleCondition<Event>() {
                private static final long serialVersionUID = 
3990995859716364087L;
 
                @Override
@@ -91,7 +91,7 @@ public class NFACompilerTest extends TestLogger {
         * A filter implementation to test invalid pattern specification with
         * duplicate pattern names. Check {@link 
#testNFACompilerUniquePatternName()}.
         */
-       private static class TestFilter implements FilterFunction<Event> {
+       private static class TestFilter extends SimpleCondition<Event> {
 
                private static final long serialVersionUID = 
-3863103355752267133L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 5a3e623..b83eb3c 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.operator;
 
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -29,6 +28,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -239,7 +239,7 @@ public class CEPMigration11to13Test {
                }
        }
 
-       private static class StartFilter implements FilterFunction<Event> {
+       private static class StartFilter extends SimpleCondition<Event> {
                private static final long serialVersionUID = 
5726188262756267490L;
 
                @Override
@@ -248,7 +248,7 @@ public class CEPMigration11to13Test {
                }
        }
 
-       private static class MiddleFilter implements FilterFunction<SubEvent> {
+       private static class MiddleFilter extends SimpleCondition<SubEvent> {
                private static final long serialVersionUID = 
6215754202506583964L;
 
                @Override
@@ -257,7 +257,7 @@ public class CEPMigration11to13Test {
                }
        }
 
-       private static class EndFilter implements FilterFunction<Event> {
+       private static class EndFilter extends SimpleCondition<Event> {
                private static final long serialVersionUID = 
7056763917392056548L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
index 65fa733..f230bbc 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.cep.operator;
 
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -26,6 +25,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -448,7 +448,7 @@ public class CEPMigration12to13Test {
                }
        }
 
-       private static class StartFilter implements FilterFunction<Event> {
+       private static class StartFilter extends SimpleCondition<Event> {
                private static final long serialVersionUID = 
5726188262756267490L;
 
                @Override
@@ -457,7 +457,7 @@ public class CEPMigration12to13Test {
                }
        }
 
-       private static class MiddleFilter implements FilterFunction<SubEvent> {
+       private static class MiddleFilter extends SimpleCondition<SubEvent> {
                private static final long serialVersionUID = 
6215754202506583964L;
 
                @Override
@@ -466,7 +466,7 @@ public class CEPMigration12to13Test {
                }
        }
 
-       private static class EndFilter implements FilterFunction<Event> {
+       private static class EndFilter extends SimpleCondition<Event> {
                private static final long serialVersionUID = 
7056763917392056548L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index a99db05..726c8b8 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -30,6 +29,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -434,7 +434,7 @@ public class CEPOperatorTest extends TestLogger {
 
                harness.close();
        }
-
+       
        private void verifyWatermark(Object outputObject, long timestamp) {
                assertTrue(outputObject instanceof Watermark);
                assertEquals(timestamp, ((Watermark) 
outputObject).getTimestamp());
@@ -512,7 +512,7 @@ public class CEPOperatorTest extends TestLogger {
                @Override
                public NFA<Event> createNFA() {
 
-                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                                                private static final long 
serialVersionUID = 5726188262756267490L;
 
                                                @Override
@@ -520,7 +520,7 @@ public class CEPOperatorTest extends TestLogger {
                                                        return 
value.getName().equals("start");
                                                }
                                        })
-                                       
.followedBy("middle").subtype(SubEvent.class).where(new 
FilterFunction<SubEvent>() {
+                                       
.followedBy("middle").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
                                                private static final long 
serialVersionUID = 6215754202506583964L;
 
                                                @Override
@@ -528,7 +528,7 @@ public class CEPOperatorTest extends TestLogger {
                                                        return 
value.getVolume() > 5.0;
                                                }
                                        })
-                                       .followedBy("end").where(new 
FilterFunction<Event>() {
+                                       .followedBy("end").where(new 
SimpleCondition<Event>() {
                                                private static final long 
serialVersionUID = 7056763917392056548L;
 
                                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbdc100/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 399662a..2c86648 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.Event;
@@ -27,6 +26,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -371,7 +371,7 @@ public class CEPRescalingTest {
                @Override
                public NFA<Event> createNFA() {
 
-                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                                private static final long serialVersionUID = 
5726188262756267490L;
 
                                @Override
@@ -379,7 +379,7 @@ public class CEPRescalingTest {
                                        return value.getName().equals("start");
                                }
                        })
-                               
.followedBy("middle").subtype(SubEvent.class).where(new 
FilterFunction<SubEvent>() {
+                               
.followedBy("middle").subtype(SubEvent.class).where(new 
SimpleCondition<SubEvent>() {
                                        private static final long 
serialVersionUID = 6215754202506583964L;
 
                                        @Override
@@ -387,7 +387,7 @@ public class CEPRescalingTest {
                                                return value.getVolume() > 5.0;
                                        }
                                })
-                               .followedBy("end").where(new 
FilterFunction<Event>() {
+                               .followedBy("end").where(new 
SimpleCondition<Event>() {
                                        private static final long 
serialVersionUID = 7056763917392056548L;
 
                                        @Override

Reply via email to