[FLINK-3318] Add support for quantifiers to CEP's pattern API

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9001c4ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9001c4ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9001c4ef

Branch: refs/heads/table-retraction
Commit: 9001c4ef82a7a04d821252ac62bb7809a931c98a
Parents: d0695c0
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Sat Mar 18 20:53:00 2017 +0100
Committer: kl0u <kklou...@gmail.com>
Committed: Thu Mar 23 10:47:55 2017 +0100

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |   78 +-
 .../flink/cep/scala/pattern/Pattern.scala       |   81 +-
 .../apache/flink/cep/nfa/ComputationState.java  |   44 +-
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |   17 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  398 ++++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |   25 +-
 .../java/org/apache/flink/cep/nfa/State.java    |   32 +-
 .../apache/flink/cep/nfa/StateTransition.java   |   20 +-
 .../flink/cep/nfa/StateTransitionAction.java    |    4 +-
 .../nfa/compiler/MalformedPatternException.java |   32 -
 .../flink/cep/nfa/compiler/NFACompiler.java     |  317 ++++-
 .../flink/cep/pattern/FilterFunctions.java      |   44 +
 .../cep/pattern/MalformedPatternException.java  |   32 +
 .../flink/cep/pattern/NotFilterFunction.java    |   42 +
 .../org/apache/flink/cep/pattern/Pattern.java   |  115 ++
 .../apache/flink/cep/pattern/Quantifier.java    |   54 +
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 1338 +++++++++++++++++-
 .../java/org/apache/flink/cep/nfa/NFATest.java  |   90 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |  152 +-
 .../apache/flink/cep/pattern/PatternTest.java   |   54 +
 20 files changed, 2595 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 8047481..22cffbc 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -341,7 +341,45 @@ patternState.subtype(SubEvent.class);
 patternState.within(Time.seconds(10));
 {% endhighlight %}
           </td>
-      </tr>
+       </tr>
+       <tr>
+          <td><strong>ZeroOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or more 
times(kleene star). This means any number of events can be matched in this 
state.</p>
+              <p>If eagerness is enabled(by default) for a pattern A*B and 
sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 
B, A2 B and A1 A2 B.</p>
+      {% highlight java %}
+      patternState.zeroOrMore();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>OneOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur one or more 
times(kleene star). This means at least one and at most infinite number of 
events can be matched in this state.</p>
+              <p>If eagerness is enabled (by default) for a pattern A*B and 
sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 
B and A1 A2 B.</p>
+      {% highlight java %}
+      patternState.oneOrMore();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Optional</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or once.</p>
+      {% highlight java %}
+      patternState.optional();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Times</strong></td>
+          <td>
+              <p>Specifies exact number of times that this pattern should be 
matched.</p>
+      {% highlight java %}
+      patternState.times(2);
+      {% endhighlight %}
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>
@@ -419,6 +457,44 @@ patternState.within(Time.seconds(10))
 {% endhighlight %}
           </td>
       </tr>
+       <tr>
+          <td><strong>ZeroOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or more 
times(kleene star). This means any number of events can be matched in this 
state.</p>
+              <p>If eagerness is enabled(by default) for a pattern A*B and 
sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 
B, A2 B and A1 A2 B.</p>
+      {% highlight scala %}
+      patternState.zeroOrMore()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>OneOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur one or more 
times(kleene star). This means at least one and at most infinite number of 
events can be matched in this state.</p>
+              <p>If eagerness is enabled (by default) for a pattern A*B and 
sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 
B and A1 A2 B.</p>
+      {% highlight scala %}
+      patternState.oneOrMore()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Optional</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or once.</p>
+      {% highlight scala %}
+      patternState.optional()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Times</strong></td>
+          <td>
+              <p>Specifies exact number of times that this pattern should be 
matched.</p>
+      {% highlight scala %}
+      patternState.times(2)
+      {% endhighlight %}
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index cc3b03c..5baf780 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -19,7 +19,7 @@ package org.apache.flink.cep.scala.pattern
 
 import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.cep
-import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern}
 import org.apache.flink.streaming.api.windowing.time.Time
 
 /**
@@ -59,6 +59,12 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
 
   /**
     *
+    * @return currently applied quantifier to this pattern
+    */
+  def getQuantifier: Quantifier = jPattern.getQuantifier
+
+  /**
+    *
     * @return Filter condition for an event to be matched
     */
   def getFilterFunction(): Option[FilterFunction[F]] = {
@@ -160,6 +166,79 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     wrapPattern(jPattern.getPrevious())
   }
 
+  /**
+    * Specifies that this pattern can occur zero or more times(kleene star).
+    * This means any number of events can be matched in this state.
+    *
+    * @return The same pattern with applied Kleene star operator
+    */
+  def zeroOrMore: Pattern[T, F] = {
+    jPattern.zeroOrMore()
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur zero or more times(kleene star).
+    * This means any number of events can be matched in this state.
+    *
+    * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will 
generate patterns:
+    * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+    *
+    * @param eager if true the pattern always consumes earlier events
+    * @return The same pattern with applied Kleene star operator
+    */
+  def zeroOrMore(eager: Boolean): Pattern[T, F] = {
+    jPattern.zeroOrMore(eager)
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur one or more times(kleene star).
+    * This means at least one and at most infinite number of events can be 
matched in this state.
+    *
+    * @return The same pattern with applied Kleene plus operator
+    */
+  def oneOrMore: Pattern[T, F] = {
+    jPattern.oneOrMore()
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur one or more times(kleene star).
+    * This means at least one and at most infinite number of events can be 
matched in this state.
+    *
+    * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will 
generate patterns:
+    * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+    *
+    * @param eager if true the pattern always consumes earlier events
+    * @return The same pattern with applied Kleene plus operator
+    */
+  def oneOrMore(eager: Boolean): Pattern[T, F] = {
+    jPattern.oneOrMore(eager)
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur zero or once.
+    *
+    * @return The same pattern with applied Kleene ? operator
+    */
+  def optional: Pattern[T, F] = {
+    jPattern.optional()
+    this
+  }
+
+  /**
+    * Specifies exact number of times that this pattern should be matched.
+    *
+    * @param times number of times matching event must appear
+    * @return The same pattern with number of times applied
+    */
+  def times(times: Int): Pattern[T, F] = {
+    jPattern.times(times)
+    this
+  }
+
 }
 
 object Pattern {

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 3f44fba..445d038 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * Helper class which encapsulates the state of the NFA computation. It points 
to the current state,
  * the last taken event, its occurrence timestamp, the current version and the 
starting timestamp
@@ -41,17 +43,21 @@ public class ComputationState<T> {
        // Timestamp of the first element in the pattern
        private final long startTimestamp;
 
-       public ComputationState(
-               final State<T> currentState,
-               final T event,
-               final long timestamp,
-               final DeweyNumber version,
-               final long startTimestamp) {
+       private final State<T> previousState;
+
+       private ComputationState(
+                       final State<T> currentState,
+                       final State<T> previousState,
+                       final T event,
+                       final long timestamp,
+                       final DeweyNumber version,
+                       final long startTimestamp) {
                this.state = currentState;
                this.event = event;
                this.timestamp = timestamp;
                this.version = version;
                this.startTimestamp = startTimestamp;
+               this.previousState = previousState;
        }
 
        public boolean isFinalState() {
@@ -59,7 +65,7 @@ public class ComputationState<T> {
        }
 
        public boolean isStartState() {
-               return state.isStart();
+               return state.isStart() && event == null;
        }
 
        public long getTimestamp() {
@@ -74,6 +80,10 @@ public class ComputationState<T> {
                return state;
        }
 
+       public State<T> getPreviousState() {
+               return previousState;
+       }
+
        public T getEvent() {
                return event;
        }
@@ -81,4 +91,24 @@ public class ComputationState<T> {
        public DeweyNumber getVersion() {
                return version;
        }
+
+       public static <T> ComputationState<T> createStartState(final State<T> 
state) {
+               Preconditions.checkArgument(state.isStart());
+               return new ComputationState<>(state, null, null, -1L, new 
DeweyNumber(1), -1L);
+       }
+
+       public static <T> ComputationState<T> createStartState(final State<T> 
state, final DeweyNumber version) {
+               Preconditions.checkArgument(state.isStart());
+               return new ComputationState<>(state, null, null, -1L, version, 
-1L);
+       }
+
+       public static <T> ComputationState<T> createState(
+                       final State<T> currentState,
+                       final State<T> previousState,
+                       final T event,
+                       final long timestamp,
+                       final DeweyNumber version,
+                       final long startTimestamp) {
+               return new ComputationState<>(currentState, previousState, 
event, timestamp, version, startTimestamp);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index bb9039d..fd3fafa 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -44,6 +44,10 @@ public class DeweyNumber implements Serializable {
                this.deweyNumber = deweyNumber;
        }
 
+       public DeweyNumber(DeweyNumber number) {
+               this.deweyNumber = Arrays.copyOf(number.deweyNumber, 
number.deweyNumber.length);
+       }
+
        /**
         * Checks whether this dewey number is compatible to the other dewey 
number.
         *
@@ -90,8 +94,19 @@ public class DeweyNumber implements Serializable {
         * @return A new dewey number derived from this whose last digit is 
increased by one
         */
        public DeweyNumber increase() {
+               return increase(1);
+       }
+
+       /**
+        * Creates a new dewey number from this such that its last digit is 
increased by the supplied
+        * number
+        *
+        * @param times how many times to increase the Dewey number
+        * @return A new dewey number derived from this whose last digit is 
increased by given number
+        */
+       public DeweyNumber increase(int times) {
                int[] newDeweyNumber = Arrays.copyOf(deweyNumber, 
deweyNumber.length);
-               newDeweyNumber[deweyNumber.length - 1]++;
+               newDeweyNumber[deweyNumber.length - 1] += times;
 
                return new DeweyNumber(newDeweyNumber);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 257418a..3d42248 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -39,7 +40,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -87,7 +87,7 @@ public class NFA<T> implements Serializable {
        /**
         *      Buffer used to store the matched events.
         */
-       private final SharedBuffer<State<T>, T> sharedBuffer;
+       private final SharedBuffer<String, T> sharedBuffer;
 
        /**
         * A set of all the valid NFA states, as returned by the
@@ -98,7 +98,7 @@ public class NFA<T> implements Serializable {
 
        /**
         * The length of a windowed pattern, as specified using the
-        * {@link org.apache.flink.cep.pattern.Pattern#within(Time) 
Pattern.within(Time)}
+        * {@link org.apache.flink.cep.pattern.Pattern#within(Time)}  
Pattern.within(Time)}
         * method.
         */
        private final long windowTime;
@@ -109,9 +109,6 @@ public class NFA<T> implements Serializable {
         */
        private final boolean handleTimeout;
 
-       // Current starting index for the next dewey version number
-       private int startEventCounter;
-
        /**
         * Current set of {@link ComputationState computation states} within 
the state machine.
         * These are the "active" intermediate states that are waiting for new 
matching
@@ -119,8 +116,6 @@ public class NFA<T> implements Serializable {
         */
        private transient Queue<ComputationState<T>> computationStates;
 
-       private StateTransitionComparator<T>  stateTransitionComparator;
-
        public NFA(
                        final TypeSerializer<T> eventSerializer,
                        final long windowTime,
@@ -129,11 +124,10 @@ public class NFA<T> implements Serializable {
                this.nonDuplicatingTypeSerializer = new 
NonDuplicatingTypeSerializer<>(eventSerializer);
                this.windowTime = windowTime;
                this.handleTimeout = handleTimeout;
-               this.sharedBuffer = new 
SharedBuffer<>(nonDuplicatingTypeSerializer);
-               this.computationStates = new LinkedList<>();
-               this.states = new HashSet<>();
-               this.startEventCounter = 1;
-               this.stateTransitionComparator =  new 
StateTransitionComparator<>();
+               sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+               computationStates = new LinkedList<>();
+
+               states = new HashSet<>();
        }
 
        public Set<State<T>> getStates() {
@@ -150,7 +144,7 @@ public class NFA<T> implements Serializable {
                states.add(state);
 
                if (state.isStart()) {
-                       computationStates.add(new ComputationState<>(state, 
null, -1L, null, -1L));
+                       
computationStates.add(ComputationState.createStartState(state));
                }
        }
 
@@ -201,8 +195,8 @@ public class NFA<T> implements Serializable {
                                }
 
                                // remove computation state which has exceeded 
the window length
-                               
sharedBuffer.release(computationState.getState(), computationState.getEvent(), 
computationState.getTimestamp());
-                               
sharedBuffer.remove(computationState.getState(), computationState.getEvent(), 
computationState.getTimestamp());
+                               
sharedBuffer.release(computationState.getState().getName(), 
computationState.getEvent(), computationState.getTimestamp());
+                               
sharedBuffer.remove(computationState.getState().getName(), 
computationState.getEvent(), computationState.getTimestamp());
 
                                newComputationStates = Collections.emptyList();
                        } else if (event != null) {
@@ -218,8 +212,8 @@ public class NFA<T> implements Serializable {
                                        result.addAll(matches);
 
                                        // remove found patterns because they 
are no longer needed
-                                       
sharedBuffer.release(newComputationState.getState(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
-                                       
sharedBuffer.remove(newComputationState.getState(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
+                                       
sharedBuffer.release(newComputationState.getPreviousState().getName(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
+                                       
sharedBuffer.remove(newComputationState.getPreviousState().getName(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
                                } else {
                                        // add new computation state; it will 
be processed once the next event arrives
                                        
computationStates.add(newComputationState);
@@ -252,8 +246,7 @@ public class NFA<T> implements Serializable {
                        return 
nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
                                sharedBuffer.equals(other.sharedBuffer) &&
                                states.equals(other.states) &&
-                               windowTime == other.windowTime &&
-                               startEventCounter == other.startEventCounter;
+                               windowTime == other.windowTime;
                } else {
                        return false;
                }
@@ -261,12 +254,80 @@ public class NFA<T> implements Serializable {
 
        @Override
        public int hashCode() {
-               return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, 
states, windowTime, startEventCounter);
+               return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, 
states, windowTime);
+       }
+
+       private static <T> boolean isEquivalentState(final State<T> s1, final 
State<T> s2) {
+               return s1.getName().equals(s2.getName());
        }
 
        /**
+        * Class for storing resolved transitions. It counts at insert time the 
number of
+        * branching transitions both for IGNORE and TAKE actions.
+        */
+       private static class OutgoingEdges<T> {
+               private List<StateTransition<T>> edges = new ArrayList<>();
+
+               private final State<T> currentState;
+
+               private int totalTakeBranches = 0;
+               private int totalIgnoreBranches = 0;
+
+               OutgoingEdges(final State<T> currentState) {
+                       this.currentState = currentState;
+               }
+
+               void add(StateTransition<T> edge) {
+
+                       if (!isSelfIgnore(edge)) {
+                               if (edge.getAction() == 
StateTransitionAction.IGNORE) {
+                                       totalIgnoreBranches++;
+                               } else if (edge.getAction() == 
StateTransitionAction.TAKE) {
+                                       totalTakeBranches++;
+                               }
+                       }
+
+                       edges.add(edge);
+               }
+
+               int getTotalIgnoreBranches() {
+                       return totalIgnoreBranches;
+               }
+               int getTotalTakeBranches() {
+                       return totalTakeBranches;
+               }
+
+               List<StateTransition<T>> getEdges() {
+                       return edges;
+               }
+
+               private boolean isSelfIgnore(final StateTransition<T> edge) {
+                       return isEquivalentState(edge.getTargetState(), 
currentState) &&
+                               edge.getAction() == 
StateTransitionAction.IGNORE;
+               }
+       }
+
+
+       /**
         * Computes the next computation states based on the given computation 
state, the current event,
-        * its timestamp and the internal state machine.
+        * its timestamp and the internal state machine. The algorithm is:
+        *
+        * 1. Decide on valid transitions and number of branching paths. See 
{@link OutgoingEdges}
+        * 2. Perform transitions:
+        *      a) IGNORE (links in {@link SharedBuffer} will still point to 
the previous event)
+        *          - do not perform for Start State - special case
+        *          - if stays in the same state increase the current stage for 
future use with number of
+        *            outgoing edges
+        *          - if after PROCEED increase current stage and add new stage 
(as we change the state)
+        *          - lock the entry in {@link SharedBuffer} as it is needed in 
the created branch
+        *      b) TAKE (links in {@link SharedBuffer} will point to the 
current event)
+        *          - add entry to the shared buffer with version of the 
current computation state
+        *          - add stage and then increase with number of takes for the 
future computation states
+        *          - peek to the next state if it has PROCEED path to a Final 
State, if true create
+        *            Final ComputationState to emit results
+        * 3. Handle the Start State, as it always have to remain
+        * 4. Release the corresponding entries in {@link SharedBuffer}.
+        *
         *
         * @param computationState Current computation state
         * @param event Current event which is processed
@@ -277,31 +338,179 @@ public class NFA<T> implements Serializable {
                        final ComputationState<T> computationState,
                        final T event,
                        final long timestamp) {
-               Stack<State<T>> states = new Stack<>();
-               List<ComputationState<T>> resultingComputationStates = new 
ArrayList<>();
-               State<T> state = computationState.getState();
 
-               states.push(state);
+               final OutgoingEdges<T> outgoingEdges = 
createDecisionGraph(computationState, event);
+
+               // Create the computing version based on the previously 
computed edges
+               // We need to defer the creation of computation states until we 
know how many edges start
+               // at this computation state so that we can assign proper 
version
+               final List<StateTransition<T>> edges = outgoingEdges.getEdges();
+               int takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
+               int ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
+
+               final List<ComputationState<T>> resultingComputationStates = 
new ArrayList<>();
+               for (StateTransition<T> edge : edges) {
+                       switch (edge.getAction()) {
+                               case IGNORE: {
+                                       if (!computationState.isStartState()) {
+                                               final DeweyNumber version;
+                                               if 
(isEquivalentState(edge.getTargetState(), computationState.getState())) {
+                                                       //Stay in the same 
state (it can be either looping one or singleton)
+                                                       final int toIncrease = 
calculateIncreasingSelfState(
+                                                               
outgoingEdges.getTotalIgnoreBranches(),
+                                                               
outgoingEdges.getTotalTakeBranches());
+                                                       version = 
computationState.getVersion().increase(toIncrease);
+                                               } else {
+                                                       //IGNORE after PROCEED
+                                                       version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+                                                       ignoreBranchesToVisit--;
+                                               }
 
-               boolean branched = false;
-               while (!states.isEmpty()) {
-                       State<T> currentState = states.pop();
-                       final List<StateTransition<T>> stateTransitions = new 
ArrayList<>(currentState.getStateTransitions());
+                                               resultingComputationStates.add(
+                                                       
ComputationState.createState(
+                                                               
edge.getTargetState(),
+                                                               
computationState.getPreviousState(),
+                                                               
computationState.getEvent(),
+                                                               
computationState.getTimestamp(),
+                                                               version,
+                                                               
computationState.getStartTimestamp()
+                                                       )
+                                               );
+                                               sharedBuffer.lock(
+                                                       
computationState.getPreviousState().getName(),
+                                                       
computationState.getEvent(),
+                                                       
computationState.getTimestamp());
+                                       }
+                               }
+                               break;
+                               case TAKE:
+                                       final State<T> newState = 
edge.getTargetState();
+                                       final State<T> consumingState = 
edge.getSourceState();
+                                       final State<T> previousEventState = 
computationState.getPreviousState();
+
+                                       final T previousEvent = 
computationState.getEvent();
+                                       final DeweyNumber currentVersion = 
computationState.getVersion();
+
+                                       final DeweyNumber 
newComputationStateVersion = new 
DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+                                       takeBranchesToVisit--;
+
+                                       final long startTimestamp;
+                                       if (computationState.isStartState()) {
+                                               startTimestamp = timestamp;
+                                               sharedBuffer.put(
+                                                       
consumingState.getName(),
+                                                       event,
+                                                       timestamp,
+                                                       currentVersion);
+                                       } else {
+                                               startTimestamp = 
computationState.getStartTimestamp();
+                                               sharedBuffer.put(
+                                                       
consumingState.getName(),
+                                                       event,
+                                                       timestamp,
+                                                       
previousEventState.getName(),
+                                                       previousEvent,
+                                                       
computationState.getTimestamp(),
+                                                       currentVersion);
+                                       }
 
-                       // this is for when we restore from legacy. In that 
case, the comparator is null
-                       // as it did not exist in the previous Flink versions, 
so we have to initialize it here.
+                                       // a new computation state is referring 
to the shared entry
+                                       
sharedBuffer.lock(consumingState.getName(), event, timestamp);
+
+                                       
resultingComputationStates.add(ComputationState.createState(
+                                               newState,
+                                               consumingState,
+                                               event,
+                                               timestamp,
+                                               newComputationStateVersion,
+                                               startTimestamp
+                                       ));
+
+                                       //check if newly created state is 
optional (have a PROCEED path to Final state)
+                                       final State<T> finalState = 
findFinalStateAfterProceed(newState, event);
+                                       if (finalState != null) {
+                                               
sharedBuffer.lock(consumingState.getName(), event, timestamp);
+                                               
resultingComputationStates.add(ComputationState.createState(
+                                                       finalState,
+                                                       consumingState,
+                                                       event,
+                                                       timestamp,
+                                                       
newComputationStateVersion,
+                                                       startTimestamp));
+                                       }
+                                       break;
+                       }
+               }
 
-                       if (stateTransitionComparator == null) {
-                               stateTransitionComparator = new 
StateTransitionComparator();
+               if (computationState.isStartState()) {
+                       final int totalBranches = 
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), 
outgoingEdges.getTotalTakeBranches());
+                       final ComputationState<T> startState = 
createStartState(computationState, totalBranches);
+                       resultingComputationStates.add(startState);
+               }
+
+               if (computationState.getEvent() != null) {
+                       // release the shared entry referenced by the current 
computation state.
+                       sharedBuffer.release(
+                               computationState.getPreviousState().getName(),
+                               computationState.getEvent(),
+                               computationState.getTimestamp());
+                       // try to remove unnecessary shared buffer entries
+                       sharedBuffer.remove(
+                               computationState.getPreviousState().getName(),
+                               computationState.getEvent(),
+                               computationState.getTimestamp());
+               }
+
+               return resultingComputationStates;
+       }
+
+       private State<T> findFinalStateAfterProceed(State<T> state, T event) {
+               final Stack<State<T>> statesToCheck = new Stack<>();
+               statesToCheck.push(state);
+
+               try {
+                       while (!statesToCheck.isEmpty()) {
+                               final State<T> currentState = 
statesToCheck.pop();
+                               for (StateTransition<T> transition : 
currentState.getStateTransitions()) {
+                                       if (transition.getAction() == 
StateTransitionAction.PROCEED &&
+                                               
checkFilterCondition(transition.getCondition(), event)) {
+                                               if 
(transition.getTargetState().isFinal()) {
+                                                       return 
transition.getTargetState();
+                                               } else {
+                                                       
statesToCheck.push(transition.getTargetState());
+                                               }
+                                       }
+                               }
                        }
+               } catch (Exception e) {
+                       throw new RuntimeException("Failure happened in filter 
function.", e);
+               }
+
+               return null;
+       }
+
+       private int calculateIncreasingSelfState(int ignoreBranches, int 
takeBranches) {
+               return takeBranches == 0 && ignoreBranches == 0 ? 0 : 
ignoreBranches + 1;
+       }
+
+       private ComputationState<T> createStartState(final ComputationState<T> 
computationState, final int totalBranches) {
+               final DeweyNumber startVersion = 
computationState.getVersion().increase(totalBranches);
+               return 
ComputationState.createStartState(computationState.getState(), startVersion);
+       }
 
-                       // impose the IGNORE will be processed last
-                       Collections.sort(stateTransitions, 
stateTransitionComparator);
+       private OutgoingEdges<T> createDecisionGraph(ComputationState<T> 
computationState, T event) {
+               final Stack<State<T>> states = new Stack<>();
+               states.push(computationState.getState());
+               final OutgoingEdges<T> outgoingEdges = new 
OutgoingEdges<>(computationState.getState());
+               //First create all outgoing edges, so to be able to reason 
about the Dewey version
+               while (!states.isEmpty()) {
+                       State<T> currentState = states.pop();
+                       Collection<StateTransition<T>> stateTransitions = 
currentState.getStateTransitions();
 
                        // check all state transitions for each state
-                       for (StateTransition<T> stateTransition: 
stateTransitions) {
+                       for (StateTransition<T> stateTransition : 
stateTransitions) {
                                try {
-                                       if (stateTransition.getCondition() == 
null || stateTransition.getCondition().filter(event)) {
+                                       if 
(checkFilterCondition(stateTransition.getCondition(), event)) {
                                                // filter condition is true
                                                switch 
(stateTransition.getAction()) {
                                                        case PROCEED:
@@ -310,73 +519,8 @@ public class NFA<T> implements Serializable {
                                                                
states.push(stateTransition.getTargetState());
                                                                break;
                                                        case IGNORE:
-                                                               final 
DeweyNumber version;
-                                                               if (branched) {
-                                                                       version 
= computationState.getVersion().increase();
-                                                               } else {
-                                                                       version 
= computationState.getVersion();
-                                                               }
-                                                               
resultingComputationStates.add(new ComputationState<T>(
-                                                                       
computationState.getState(),
-                                                                       
computationState.getEvent(),
-                                                                       
computationState.getTimestamp(),
-                                                                       version,
-                                                                       
computationState.getStartTimestamp()));
-
-                                                               // we have a 
new computation state referring to the same the shared entry
-                                                               // the lock of 
the current computation is released later on
-                                                               
sharedBuffer.lock(computationState.getState(), computationState.getEvent(), 
computationState.getTimestamp());
-                                                               break;
                                                        case TAKE:
-                                                               final State<T> 
newState = stateTransition.getTargetState();
-                                                               final 
DeweyNumber oldVersion;
-                                                               final 
DeweyNumber newComputationStateVersion;
-                                                               final State<T> 
previousState = computationState.getState();
-                                                               final T 
previousEvent = computationState.getEvent();
-                                                               final long 
previousTimestamp;
-                                                               final long 
startTimestamp;
-
-                                                               if 
(computationState.isStartState()) {
-                                                                       
oldVersion = new DeweyNumber(startEventCounter++);
-                                                                       
newComputationStateVersion = oldVersion.addStage();
-                                                                       
startTimestamp = timestamp;
-                                                                       
previousTimestamp = -1L;
-
-                                                               } else {
-                                                                       
startTimestamp = computationState.getStartTimestamp();
-                                                                       
previousTimestamp = computationState.getTimestamp();
-                                                                       
oldVersion = computationState.getVersion();
-
-                                                                       
branched = true;
-                                                                       
newComputationStateVersion = oldVersion.addStage();
-                                                               }
-
-                                                               if 
(previousState.isStart()) {
-                                                                       
sharedBuffer.put(
-                                                                               
newState,
-                                                                               
event,
-                                                                               
timestamp,
-                                                                               
oldVersion);
-                                                               } else {
-                                                                       
sharedBuffer.put(
-                                                                               
newState,
-                                                                               
event,
-                                                                               
timestamp,
-                                                                               
previousState,
-                                                                               
previousEvent,
-                                                                               
previousTimestamp,
-                                                                               
oldVersion);
-                                                               }
-
-                                                               // a new 
computation state is referring to the shared entry
-                                                               
sharedBuffer.lock(newState, event, timestamp);
-
-                                                               
resultingComputationStates.add(new ComputationState<T>(
-                                                                       
newState,
-                                                                       event,
-                                                                       
timestamp,
-                                                                       
newComputationStateVersion,
-                                                                       
startTimestamp));
+                                                               
outgoingEdges.add(stateTransition);
                                                                break;
                                                }
                                        }
@@ -385,19 +529,12 @@ public class NFA<T> implements Serializable {
                                }
                        }
                }
+               return outgoingEdges;
+       }
 
-               if (computationState.isStartState()) {
-                       // a computation state is always kept if it refers to a 
starting state because every
-                       // new element can start a new pattern
-                       resultingComputationStates.add(computationState);
-               } else {
-                       // release the shared entry referenced by the current 
computation state.
-                       sharedBuffer.release(computationState.getState(), 
computationState.getEvent(), computationState.getTimestamp());
-                       // try to remove unnecessary shared buffer entries
-                       sharedBuffer.remove(computationState.getState(), 
computationState.getEvent(), computationState.getTimestamp());
-               }
 
-               return resultingComputationStates;
+       private boolean checkFilterCondition(FilterFunction<T> condition, T 
event) throws Exception {
+               return condition == null || condition.filter(event);
        }
 
        /**
@@ -409,8 +546,8 @@ public class NFA<T> implements Serializable {
         * @return Collection of event sequences which end in the given 
computation state
         */
        private Collection<Map<String, T>> extractPatternMatches(final 
ComputationState<T> computationState) {
-               Collection<LinkedHashMultimap<State<T>, T>> paths = 
sharedBuffer.extractPatterns(
-                       computationState.getState(),
+               Collection<LinkedHashMultimap<String, T>> paths = 
sharedBuffer.extractPatterns(
+                       computationState.getPreviousState().getName(),
                        computationState.getEvent(),
                        computationState.getTimestamp(),
                        computationState.getVersion());
@@ -420,19 +557,20 @@ public class NFA<T> implements Serializable {
                TypeSerializer<T> serializer = 
nonDuplicatingTypeSerializer.getTypeSerializer();
 
                // generate the correct names from the collection of 
LinkedHashMultimaps
-               for (LinkedHashMultimap<State<T>, T> path: paths) {
+               for (LinkedHashMultimap<String, T> path: paths) {
                        Map<String, T> resultPath = new HashMap<>();
-                       for (State<T> key: path.keySet()) {
+                       for (String key: path.keySet()) {
                                int counter = 0;
                                Set<T> events = path.get(key);
 
                                // we iterate over the elements in insertion 
order
                                for (T event: events) {
                                        resultPath.put(
-                                               events.size() > 1 ? 
generateStateName(key.getName(), counter): key.getName(),
+                                               events.size() > 1 ? 
generateStateName(key, counter): key,
                                                // copy the element so that the 
user can change it
                                                serializer.isImmutableType() ? 
event : serializer.copy(event)
                                        );
+                                       counter++;
                                }
                        }
 
@@ -472,6 +610,7 @@ public class NFA<T> implements Serializable {
 
        private void writeComputationState(final ComputationState<T> 
computationState, final ObjectOutputStream oos) throws IOException {
                oos.writeObject(computationState.getState());
+               oos.writeObject(computationState.getPreviousState());
                oos.writeLong(computationState.getTimestamp());
                oos.writeObject(computationState.getVersion());
                oos.writeLong(computationState.getStartTimestamp());
@@ -490,6 +629,7 @@ public class NFA<T> implements Serializable {
        @SuppressWarnings("unchecked")
        private ComputationState<T> readComputationState(ObjectInputStream ois) 
throws IOException, ClassNotFoundException {
                final State<T> state = (State<T>)ois.readObject();
+               final State<T> previousState = (State<T>)ois.readObject();
                final long timestamp = ois.readLong();
                final DeweyNumber version = (DeweyNumber)ois.readObject();
                final long startTimestamp = ois.readLong();
@@ -504,7 +644,7 @@ public class NFA<T> implements Serializable {
                        event = null;
                }
 
-               return new ComputationState<>(state, event, timestamp, version, 
startTimestamp);
+               return ComputationState.createState(state, previousState, 
event, timestamp, version, startTimestamp);
        }
 
        /**
@@ -629,20 +769,4 @@ public class NFA<T> implements Serializable {
                        return getClass().hashCode();
                }
        }
-
-       /**
-        * Comparator used for imposing the assumption that IGNORE is always 
the last StateTransition in a state.
-        */
-       private static final class StateTransitionComparator<T> implements 
Serializable, Comparator<StateTransition<T>> {
-
-               private static final long serialVersionUID = 
-2775474935413622278L;
-
-               @Override
-               public int compare(final StateTransition<T> o1, final 
StateTransition<T> o2) {
-                       if (o1.getAction() == o2.getAction()) {
-                               return 0;
-                       }
-                       return o1.getAction() == StateTransitionAction.IGNORE ? 
1 : -1;
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index b7e288b..e6a8c75 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -212,28 +212,27 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                SharedBufferEntry<K, V> entry = get(key, value, timestamp);
 
                if (entry != null) {
-                       extractionStates.add(new ExtractionState<K, V>(entry, 
version, new Stack<SharedBufferEntry<K, V>>()));
+                       extractionStates.add(new ExtractionState<>(entry, 
version, new Stack<SharedBufferEntry<K, V>>()));
 
                        // use a depth first search to reconstruct the previous 
relations
                        while (!extractionStates.isEmpty()) {
-                               ExtractionState<K, V> extractionState = 
extractionStates.pop();
-                               DeweyNumber currentVersion = 
extractionState.getVersion();
+                               final ExtractionState<K, V> extractionState = 
extractionStates.pop();
                                // current path of the depth first search
-                               Stack<SharedBufferEntry<K, V>> currentPath = 
extractionState.getPath();
+                               final Stack<SharedBufferEntry<K, V>> 
currentPath = extractionState.getPath();
+                               final SharedBufferEntry<K, V> currentEntry = 
extractionState.getEntry();
 
                                // termination criterion
-                               if (currentVersion.length() == 1) {
-                                       LinkedHashMultimap<K, V> completePath = 
LinkedHashMultimap.create();
+                               if (currentEntry == null) {
+                                       final LinkedHashMultimap<K, V> 
completePath = LinkedHashMultimap.create();
 
                                        while(!currentPath.isEmpty()) {
-                                               SharedBufferEntry<K, V> 
currentEntry = currentPath.pop();
+                                               final SharedBufferEntry<K, V> 
currentPathEntry = currentPath.pop();
 
-                                               
completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue());
+                                               
completePath.put(currentPathEntry.getKey(), 
currentPathEntry.getValueTime().getValue());
                                        }
 
                                        result.add(completePath);
                                } else {
-                                       SharedBufferEntry<K, V> currentEntry = 
extractionState.getEntry();
 
                                        // append state to the path
                                        currentPath.push(currentEntry);
@@ -242,17 +241,18 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                                        for (SharedBufferEdge<K, V> edge : 
currentEntry.getEdges()) {
                                                // we can only proceed if the 
current version is compatible to the version
                                                // of this previous relation
+                                               final DeweyNumber 
currentVersion = extractionState.getVersion();
                                                if 
(currentVersion.isCompatibleWith(edge.getVersion())) {
                                                        if (firstMatch) {
                                                                // for the 
first match we don't have to copy the current path
-                                                               
extractionStates.push(new ExtractionState<K, V>(edge.getTarget(), 
edge.getVersion(), currentPath));
+                                                               
extractionStates.push(new ExtractionState<>(edge.getTarget(), 
edge.getVersion(), currentPath));
                                                                firstMatch = 
false;
                                                        } else {
-                                                               
Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
+                                                               final 
Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
                                                                
copy.addAll(currentPath);
 
                                                                
extractionStates.push(
-                                                                       new 
ExtractionState<K, V>(
+                                                                       new 
ExtractionState<>(
                                                                                
edge.getTarget(),
                                                                                
edge.getVersion(),
                                                                                
copy));
@@ -260,6 +260,7 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                                                }
                                        }
                                }
+
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 50b2cf3..7bcb6ea 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.functions.FilterFunction;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,7 +45,7 @@ public class State<T> implements Serializable {
                this.name = name;
                this.stateType = stateType;
 
-               stateTransitions = new ArrayList<StateTransition<T>>();
+               stateTransitions = new ArrayList<>();
        }
 
        public boolean isFinal() {
@@ -60,8 +62,32 @@ public class State<T> implements Serializable {
                return stateTransitions;
        }
 
-       public void addStateTransition(final StateTransition<T> 
stateTransition) {
-               stateTransitions.add(stateTransition);
+
+       private void addStateTransition(
+               final StateTransitionAction action,
+               final State<T> targetState,
+               final FilterFunction<T> condition) {
+               stateTransitions.add(new StateTransition<T>(this, action, 
targetState, condition));
+       }
+
+       public void addIgnore(final FilterFunction<T> condition) {
+               addStateTransition(StateTransitionAction.IGNORE, this, 
condition);
+       }
+
+       public void addIgnore(final State<T> targetState,final 
FilterFunction<T> condition) {
+               addStateTransition(StateTransitionAction.IGNORE, targetState, 
condition);
+       }
+
+       public void addTake(final State<T> targetState, final FilterFunction<T> 
condition) {
+               addStateTransition(StateTransitionAction.TAKE, targetState, 
condition);
+       }
+
+       public void addProceed(final State<T> targetState, final 
FilterFunction<T> condition) {
+               addStateTransition(StateTransitionAction.PROCEED, targetState, 
condition);
+       }
+
+       public void addTake(final FilterFunction<T> condition) {
+               addStateTransition(StateTransitionAction.TAKE, this, condition);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index 479f28a..e3c7b7a 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -27,12 +27,18 @@ public class StateTransition<T> implements Serializable {
        private static final long serialVersionUID = -4825345749997891838L;
 
        private final StateTransitionAction action;
+       private final State<T> sourceState;
        private final State<T> targetState;
        private final FilterFunction<T> condition;
 
-       public StateTransition(final StateTransitionAction action, final 
State<T> targetState, final FilterFunction<T> condition) {
+       public StateTransition(
+               final State<T> sourceState,
+               final StateTransitionAction action,
+               final State<T> targetState,
+               final FilterFunction<T> condition) {
                this.action = action;
                this.targetState = targetState;
+               this.sourceState = sourceState;
                this.condition = condition;
        }
 
@@ -44,6 +50,10 @@ public class StateTransition<T> implements Serializable {
                return targetState;
        }
 
+       public State<T> getSourceState() {
+               return sourceState;
+       }
+
        public FilterFunction<T> getCondition() {
                return condition;
        }
@@ -55,6 +65,7 @@ public class StateTransition<T> implements Serializable {
                        StateTransition<T> other = (StateTransition<T>) obj;
 
                        return action == other.action &&
+                               
sourceState.getName().equals(other.sourceState.getName()) &&
                                
targetState.getName().equals(other.targetState.getName());
                } else {
                        return false;
@@ -64,14 +75,17 @@ public class StateTransition<T> implements Serializable {
        @Override
        public int hashCode() {
                // we have to take the name of targetState because the 
transition might be reflexive
-               return Objects.hash(action, targetState.getName());
+               return Objects.hash(action, targetState.getName(), 
sourceState.getName());
        }
 
        @Override
        public String toString() {
                StringBuilder builder = new StringBuilder();
 
-               builder.append("StateTransition(").append(action).append(", 
").append(targetState.getName());
+               builder.append("StateTransition(")
+                       .append(action).append(", ")
+                       .append(sourceState.getName()).append(", ")
+                       .append(targetState.getName());
 
                if (condition != null) {
                        builder.append(", with filter)");

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
index 70fc7fb..b8ca4e8 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
@@ -22,7 +22,7 @@ package org.apache.flink.cep.nfa;
  * Set of actions when doing a state transition from a {@link State} to 
another.
  */
 public enum StateTransitionAction {
-       TAKE, // take the current event and assign it to the new state
-       IGNORE, // ignore the current event and do the state transition
+       TAKE, // take the current event and assign it to the current state
+       IGNORE, // ignore the current event
        PROCEED // do the state transition and keep the current event for 
further processing (epsilon transition)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
deleted file mode 100644
index a3bb5f4..0000000
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa.compiler;
-
-/**
- * An exception used to indicate that a {@link 
org.apache.flink.cep.pattern.Pattern}
- * was not specified correctly.
- */
-public class MalformedPatternException extends RuntimeException {
-
-       private static final long serialVersionUID = 7751134834983361543L;
-
-       public MalformedPatternException(String message) {
-               super(message);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 18ed21f..b476c49 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -22,18 +22,21 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
-import org.apache.flink.cep.nfa.StateTransition;
-import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.FilterFunctions;
 import org.apache.flink.cep.pattern.FollowedByPattern;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.NotFilterFunction;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -42,7 +45,7 @@ import java.util.Set;
  */
 public class NFACompiler {
 
-       protected final static String BEGINNING_STATE_NAME = "$beginningState$";
+       protected static final String ENDING_STATE_NAME = "$endState$";
 
        /**
         * Compiles the given pattern into a {@link NFA}.
@@ -74,88 +77,288 @@ public class NFACompiler {
         */
        @SuppressWarnings("unchecked")
        public static <T> NFAFactory<T> compileFactory(
-               Pattern<T, ?> pattern,
-               TypeSerializer<T> inputTypeSerializer,
+               final Pattern<T, ?> pattern,
+               final TypeSerializer<T> inputTypeSerializer,
                boolean timeoutHandling) {
                if (pattern == null) {
                        // return a factory for empty NFAs
-                       return new NFAFactoryImpl<T>(inputTypeSerializer, 0, 
Collections.<State<T>>emptyList(), timeoutHandling);
+                       return new NFAFactoryImpl<>(inputTypeSerializer, 0, 
Collections.<State<T>>emptyList(), timeoutHandling);
                } else {
-                       // set of all generated states
-                       Map<String, State<T>> states = new HashMap<>();
-                       long windowTime;
+                       final NFAFactoryCompiler<T> nfaFactoryCompiler = new 
NFAFactoryCompiler<>(pattern);
+                       nfaFactoryCompiler.compileFactory();
+                       return new NFAFactoryImpl<>(inputTypeSerializer, 
nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), 
timeoutHandling);
+               }
+       }
+
+       /**
+        * Converts a {@link Pattern} into graph of {@link State}. It enables 
sharing of
+        * compilation state across methods.
+        *
+        * @param <T>
+        */
+       private static class NFAFactoryCompiler<T> {
+
+               private final Set<String> usedNames = new HashSet<>();
+               private final List<State<T>> states = new ArrayList<>();
 
-                       // this is used to enforse pattern name uniqueness.
-                       Set<String> patternNames = new HashSet<>();
+               private long windowTime = 0;
+               private Pattern<T, ?> currentPattern;
 
-                       Pattern<T, ?> succeedingPattern;
-                       State<T> succeedingState;
-                       Pattern<T, ?> currentPattern = pattern;
+               NFAFactoryCompiler(final Pattern<T, ?> pattern) {
+                       this.currentPattern = pattern;
+               }
 
+               /**
+                * Compiles the given pattern into a {@link NFAFactory}. The 
NFA factory can be used to create
+                * multiple NFAs.
+                */
+               void compileFactory() {
                        // we're traversing the pattern from the end to the 
beginning --> the first state is the final state
-                       State<T> currentState = new 
State<>(currentPattern.getName(), State.StateType.Final);
-                       patternNames.add(currentPattern.getName());
+                       State<T> sinkState = createEndingState();
+                       // add all the normal states
+                       sinkState = createMiddleStates(sinkState);
+                       // add the beginning state
+                       createStartState(sinkState);
+               }
 
-                       states.put(currentPattern.getName(), currentState);
+               List<State<T>> getStates() {
+                       return states;
+               }
+
+               long getWindowTime() {
+                       return windowTime;
+               }
+
+               /**
+                * Creates the dummy Final {@link State} of the NFA graph.
+                * @return dummy Final state
+                */
+               private State<T> createEndingState() {
+                       State<T> endState = new State<>(ENDING_STATE_NAME, 
State.StateType.Final);
+                       states.add(endState);
+                       usedNames.add(ENDING_STATE_NAME);
 
                        windowTime = currentPattern.getWindowTime() != null ? 
currentPattern.getWindowTime().toMilliseconds() : 0L;
+                       return endState;
+               }
 
-                       while (currentPattern.getPrevious() != null) {
-                               succeedingPattern = currentPattern;
-                               succeedingState = currentState;
-                               currentPattern = currentPattern.getPrevious();
+               /**
+                * Creates all the states between Start and Final state.
+                * @param sinkState the state that last state should point to 
(always the Final state)
+                * @return the next state after Start in the resulting graph
+                */
+               private State<T> createMiddleStates(final State<T> sinkState) {
 
-                               if 
(!patternNames.add(currentPattern.getName())) {
-                                       throw new 
MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() 
+ ". " +
-                                               "Pattern names must be 
unique.");
+                       State<T> lastSink = sinkState;
+                       while (currentPattern.getPrevious() != null) {
+                               checkPatternNameUniqueness();
+
+                               State<T> sourceState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
+                               states.add(sourceState);
+                               usedNames.add(sourceState.getName());
+
+                               if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+                                       convertToLooping(sourceState, lastSink);
+
+                                       if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+                                               sourceState = 
createFirstMandatoryStateOfLoop(sourceState, State.StateType.Normal);
+                                               states.add(sourceState);
+                                               
usedNames.add(sourceState.getName());
+                                       }
+                               } else if (currentPattern.getQuantifier() == 
Quantifier.TIMES) {
+                                       sourceState = 
convertToTimesState(sourceState, lastSink, currentPattern.getTimes());
+                               } else {
+                                       convertToSingletonState(sourceState, 
lastSink);
                                }
 
-                               Time currentWindowTime = 
currentPattern.getWindowTime();
+                               currentPattern = currentPattern.getPrevious();
+                               lastSink = sourceState;
 
+                               final Time currentWindowTime = 
currentPattern.getWindowTime();
                                if (currentWindowTime != null && 
currentWindowTime.toMilliseconds() < windowTime) {
                                        // the window time is the global 
minimum of all window times of each state
                                        windowTime = 
currentWindowTime.toMilliseconds();
                                }
+                       }
+
+                       return lastSink;
+               }
+
+               private void checkPatternNameUniqueness() {
+                       if (usedNames.contains(currentPattern.getName())) {
+                               throw new MalformedPatternException(
+                                       "Duplicate pattern name: " + 
currentPattern.getName() + ". " +
+                                       "Pattern names must be unique.");
+                       }
+               }
+
+               /**
+                * Creates the Start {@link State} of the resulting NFA graph.
+                * @param sinkState the state that Start state should point to 
(alwyas first state of middle states)
+                * @return created state
+                */
+               @SuppressWarnings("unchecked")
+               private State<T> createStartState(State<T> sinkState) {
+                       checkPatternNameUniqueness();
+
+                       final State<T> beginningState;
+                       if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+                               final State<T> loopingState;
+                               if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+                                       loopingState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
+                                       beginningState = 
createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start);
+                                       states.add(loopingState);
+                               } else {
+                                       loopingState = new 
State<>(currentPattern.getName(), State.StateType.Start);
+                                       beginningState = loopingState;
+                               }
+                               convertToLooping(loopingState, sinkState, true);
+                       } else  {
+                               if (currentPattern.getQuantifier() == 
Quantifier.TIMES && currentPattern.getTimes() > 1) {
+                                       final State<T> timesState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
+                                       states.add(timesState);
+                                       sinkState = 
convertToTimesState(timesState, sinkState, currentPattern.getTimes() - 1);
+                               }
 
-                               if 
(states.containsKey(currentPattern.getName())) {
-                                       currentState = 
states.get(currentPattern.getName());
+                               beginningState = new 
State<>(currentPattern.getName(), State.StateType.Start);
+                               convertToSingletonState(beginningState, 
sinkState);
+                       }
+
+                       states.add(beginningState);
+                       usedNames.add(beginningState.getName());
+
+                       return beginningState;
+               }
+
+               /**
+                * Converts the given state into a "complex" state consisting 
of given number of states with
+                * same {@link FilterFunction}
+                *
+                * @param sourceState the state to be converted
+                * @param sinkState the state that the converted state should 
point to
+                * @param times number of times the state should be copied
+                * @return the first state of the "complex" state, next state 
should point to it
+                */
+               private State<T> convertToTimesState(final State<T> 
sourceState, final State<T> sinkState, int times) {
+                       convertToSingletonState(sourceState, sinkState);
+                       State<T> lastSink;
+                       State<T> firstState = sourceState;
+                       for (int i = 0; i < times - 1; i++) {
+                               lastSink = firstState;
+                               firstState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
+                               states.add(firstState);
+                               convertToSingletonState(firstState, lastSink);
+                       }
+                       return firstState;
+               }
+
+               /**
+                * Converts the given state into a simple single state. For an 
OPTIONAL state it also consists
+                * of a similar state without the PROCEED edge, so that for 
each PROCEED transition branches
+                * in computation state graph  can be created only once.
+                *
+                * @param sourceState the state to be converted
+                * @param sinkState state that the state being converted should 
point to
+                */
+               @SuppressWarnings("unchecked")
+               private void convertToSingletonState(final State<T> 
sourceState, final State<T> sinkState) {
+
+                       final FilterFunction<T> currentFilterFunction = 
(FilterFunction<T>) currentPattern.getFilterFunction();
+                       final FilterFunction<T> trueFunction = 
FilterFunctions.trueFunction();
+                       sourceState.addTake(sinkState, currentFilterFunction);
+
+                       if (currentPattern.getQuantifier() == 
Quantifier.OPTIONAL) {
+                               sourceState.addProceed(sinkState, trueFunction);
+                       }
+
+                       if (currentPattern instanceof FollowedByPattern) {
+                               final State<T> ignoreState;
+                               if (currentPattern.getQuantifier() == 
Quantifier.OPTIONAL) {
+                                       ignoreState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
+                                       ignoreState.addTake(sinkState, 
currentFilterFunction);
+                                       states.add(ignoreState);
                                } else {
-                                       currentState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
-                                       states.put(currentState.getName(), 
currentState);
+                                       ignoreState = sourceState;
                                }
+                               sourceState.addIgnore(ignoreState, 
trueFunction);
+                       }
+               }
 
-                               currentState.addStateTransition(new 
StateTransition<T>(
-                                       StateTransitionAction.TAKE,
-                                       succeedingState,
-                                       (FilterFunction<T>) 
succeedingPattern.getFilterFunction()));
-
-                               if (succeedingPattern instanceof 
FollowedByPattern) {
-                                       // the followed by pattern entails a 
reflexive ignore transition
-                                       currentState.addStateTransition(new 
StateTransition<T>(
-                                               StateTransitionAction.IGNORE,
-                                               currentState,
-                                               null
-                                       ));
+               /**
+                * Patterns with quantifiers AT_LEAST_ONE_* are converted into 
pair of states: a singleton state and
+                * looping state. This method creates the first of the two.
+                *
+                * @param sinkState the state the newly created state should 
point to, it should be a looping state
+                * @param stateType the type of the created state, as the NFA 
graph can also start wit AT_LEAST_ONE_*
+                * @return the newly created state
+                */
+               @SuppressWarnings("unchecked")
+               private State<T> createFirstMandatoryStateOfLoop(final State<T> 
sinkState, final State.StateType stateType) {
+
+                       final FilterFunction<T> currentFilterFunction = 
(FilterFunction<T>) currentPattern.getFilterFunction();
+                       final State<T> firstState = new 
State<>(currentPattern.getName(), stateType);
+
+                       firstState.addTake(sinkState, currentFilterFunction);
+                       if (currentPattern instanceof FollowedByPattern) {
+                               if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+                                       firstState.addIgnore(new 
NotFilterFunction<>(currentFilterFunction));
+                               } else {
+                                       
firstState.addIgnore(FilterFunctions.<T>trueFunction());
                                }
                        }
+                       return firstState;
+               }
 
-                       // add the beginning state
-                       final State<T> beginningState;
+               /**
+                * Converts the given state into looping one. Looping state is 
one with TAKE edge to itself and
+                * PROCEED edge to the sinkState. It also consists of a similar 
state without the PROCEED edge, so that
+                * for each PROCEED transition branches in computation state 
graph  can be created only once.
+                *
+                * <p>If this looping state is first of a graph we should treat 
the {@link Pattern} as {@link FollowedByPattern}
+                * to enable combinations.
+                *
+                * @param sourceState  the state to converted
+                * @param sinkState    the state that the converted state 
should point to
+                * @param isFirstState if the looping state is first of a graph
+                */
+               @SuppressWarnings("unchecked")
+               private void convertToLooping(final State<T> sourceState, final 
State<T> sinkState, boolean isFirstState) {
+
+                       final FilterFunction<T> filterFunction = 
(FilterFunction<T>) currentPattern.getFilterFunction();
+                       final FilterFunction<T> trueFunction = 
FilterFunctions.<T>trueFunction();
+
+                       sourceState.addProceed(sinkState, trueFunction);
+                       sourceState.addTake(filterFunction);
+                       if (currentPattern instanceof FollowedByPattern || 
isFirstState) {
+                               final State<T> ignoreState = new State<>(
+                                       currentPattern.getName(),
+                                       State.StateType.Normal);
+
+
+                               final FilterFunction<T> ignoreCondition;
+                               if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+                                       ignoreCondition = new 
NotFilterFunction<>(filterFunction);
+                               } else {
+                                       ignoreCondition = trueFunction;
+                               }
 
-                       if (states.containsKey(BEGINNING_STATE_NAME)) {
-                               beginningState = 
states.get(BEGINNING_STATE_NAME);
-                       } else {
-                               beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-                               states.put(BEGINNING_STATE_NAME, 
beginningState);
+                               sourceState.addIgnore(ignoreState, 
ignoreCondition);
+                               ignoreState.addTake(sourceState, 
filterFunction);
+                               ignoreState.addIgnore(ignoreState, 
ignoreCondition);
+                               states.add(ignoreState);
                        }
+               }
 
-                       beginningState.addStateTransition(new 
StateTransition<T>(
-                               StateTransitionAction.TAKE,
-                               currentState,
-                               (FilterFunction<T>) 
currentPattern.getFilterFunction()
-                       ));
-
-                       return new NFAFactoryImpl<T>(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
+               /**
+                * Converts the given state into looping one. Looping state is 
one with TAKE edge to itself and
+                * PROCEED edge to the sinkState. It also consists of a similar 
state without the PROCEED edge, so that
+                * for each PROCEED transition branches in computation state 
graph  can be created only once.
+                *
+                * @param sourceState the state to converted
+                * @param sinkState   the state that the converted state should 
point to
+                */
+               private void convertToLooping(final State<T> sourceState, final 
State<T> sinkState) {
+                       convertToLooping(sourceState, sinkState, false);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
new file mode 100644
index 0000000..12e58ba
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class FilterFunctions<T> {
+
+       private FilterFunctions() {
+       }
+
+       public static <T> FilterFunction<T> trueFunction()  {
+               return new FilterFunction<T>() {
+                       @Override
+                       public boolean filter(T value) throws Exception {
+                               return true;
+                       }
+               };
+       }
+
+       public static <T> FilterFunction<T> falseFunction()  {
+               return new FilterFunction<T>() {
+                       @Override
+                       public boolean filter(T value) throws Exception {
+                               return false;
+                       }
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
new file mode 100644
index 0000000..c85f3be
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+/**
+ * An exception used to indicate that a {@link 
org.apache.flink.cep.pattern.Pattern}
+ * was not specified correctly.
+ */
+public class MalformedPatternException extends RuntimeException {
+
+       private static final long serialVersionUID = 7751134834983361543L;
+
+       public MalformedPatternException(String message) {
+               super(message);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
new file mode 100644
index 0000000..a4fc8f5
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which negates filter function.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class NotFilterFunction<T> implements FilterFunction<T> {
+       private static final long serialVersionUID = -2109562093871155005L;
+
+       private final FilterFunction<T> original;
+
+       public NotFilterFunction(final FilterFunction<T> original) {
+               this.original = original;
+       }
+
+       @Override
+       public boolean filter(T value) throws Exception {
+               return !original.filter(value);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 7ea675f..7b4d9c7 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Base class for a pattern definition.
@@ -53,6 +54,10 @@ public class Pattern<T, F extends T> {
        // window length in which the pattern match has to occur
        private Time windowTime;
 
+       private Quantifier quantifier = Quantifier.ONE;
+
+       private int times;
+
        protected Pattern(final String name, final Pattern<T, ? extends T> 
previous) {
                this.name = name;
                this.previous = previous;
@@ -74,6 +79,14 @@ public class Pattern<T, F extends T> {
                return windowTime;
        }
 
+       public Quantifier getQuantifier() {
+               return quantifier;
+       }
+
+       public int getTimes() {
+               return times;
+       }
+
        /**
         * Specifies a filter condition which has to be fulfilled by an event 
in order to be matched.
         *
@@ -183,4 +196,106 @@ public class Pattern<T, F extends T> {
                return new Pattern<X, X>(name, null);
        }
 
+       /**
+        * Specifies that this pattern can occur zero or more times(kleene 
star).
+        * This means any number of events can be matched in this state.
+        *
+        * @return The same pattern with applied Kleene star operator
+        *
+        * @throws MalformedPatternException if quantifier already applied
+        */
+       public Pattern<T, F> zeroOrMore() {
+               return zeroOrMore(true);
+       }
+
+       /**
+        * Specifies that this pattern can occur zero or more times(kleene 
star).
+        * This means any number of events can be matched in this state.
+        *
+        * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will 
generate patterns:
+        * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+        *
+        * @param eager if true the pattern always consumes earlier events
+        * @return The same pattern with applied Kleene star operator
+        *
+        * @throws MalformedPatternException if quantifier already applied
+        */
+       public Pattern<T, F> zeroOrMore(final boolean eager) {
+               checkIfQuantifierApplied();
+               if (eager) {
+                       this.quantifier = Quantifier.ZERO_OR_MORE_EAGER;
+               } else {
+                       this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS;
+               }
+               return this;
+       }
+
+       /**
+        * Specifies that this pattern can occur one or more times(kleene star).
+        * This means at least one and at most infinite number of events can be 
matched in this state.
+        *
+        * @return The same pattern with applied Kleene plus operator
+        *
+        * @throws MalformedPatternException if quantifier already applied
+        */
+       public Pattern<T, F> oneOrMore() {
+               return oneOrMore(true);
+       }
+
+       /**
+        * Specifies that this pattern can occur one or more times(kleene star).
+        * This means at least one and at most infinite number of events can be 
matched in this state.
+        *
+        * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will 
generate patterns:
+        * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+        *
+        * @param eager if true the pattern always consumes earlier events
+        * @return The same pattern with applied Kleene plus operator
+        *
+        * @throws MalformedPatternException if quantifier already applied
+        */
+       public Pattern<T, F> oneOrMore(final boolean eager) {
+               checkIfQuantifierApplied();
+               if (eager) {
+                       this.quantifier = Quantifier.ONE_OR_MORE_EAGER;
+               } else {
+                       this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS;
+               }
+               return this;
+       }
+
+       /**
+        * Specifies that this pattern can occur zero or once.
+        *
+        * @return The same pattern with applied Kleene ? operator
+        *
+        * @throws MalformedPatternException if quantifier already applied
+        */
+       public Pattern<T, F> optional() {
+               checkIfQuantifierApplied();
+               this.quantifier = Quantifier.OPTIONAL;
+               return this;
+       }
+
+       /**
+        * Specifies exact number of times that this pattern should be matched.
+        *
+        * @param times number of times matching event must appear
+        * @return The same pattern with number of times applied
+        *
+        * @throws MalformedPatternException if quantifier already applied
+        */
+       public Pattern<T, F> times(int times) {
+               checkIfQuantifierApplied();
+               Preconditions.checkArgument(times > 0, "You should give a 
positive number greater than 0.");
+               this.quantifier = Quantifier.TIMES;
+               this.times = times;
+               return this;
+       }
+
+       private void checkIfQuantifierApplied() {
+               if (this.quantifier != Quantifier.ONE) {
+                       throw new MalformedPatternException("Already applied 
quantifier to this Pattern. Current quantifier is: " + this.quantifier);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
new file mode 100644
index 0000000..7abe9bd
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import java.util.EnumSet;
+
+public enum Quantifier {
+       ONE,
+       ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER),
+       ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
+       ONE_OR_MORE_EAGER(
+               QuantifierProperty.LOOPING,
+               QuantifierProperty.EAGER,
+               QuantifierProperty.AT_LEAST_ONE),
+       ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, 
QuantifierProperty.AT_LEAST_ONE),
+       TIMES,
+       OPTIONAL;
+
+       private final EnumSet<QuantifierProperty> properties;
+
+       Quantifier(final QuantifierProperty first, final QuantifierProperty... 
rest) {
+               this.properties = EnumSet.of(first, rest);
+       }
+
+       Quantifier() {
+               this.properties = EnumSet.noneOf(QuantifierProperty.class);
+       }
+
+       public boolean hasProperty(QuantifierProperty property) {
+               return properties.contains(property);
+       }
+
+       public enum QuantifierProperty {
+               LOOPING,
+               EAGER,
+               AT_LEAST_ONE
+       }
+
+}

Reply via email to