Repository: flink
Updated Branches:
  refs/heads/master 545f50026 -> 0d856b34f


[FLINK-6248] [cep] Make the optional() available to all offered patterns.

Allow all patterns to be optional in a pattern sequence.
Singleton becomes simply optional, times(X) transforms from
"exactly X" to "0 or X", and "one or more" becomes "zero or more".


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3de48d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3de48d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3de48d0

Branch: refs/heads/master
Commit: b3de48d0e2c1fec6ccab3a8dd0f02b711dc0ae0a
Parents: 545f500
Author: kl0u <[email protected]>
Authored: Tue Apr 4 17:45:34 2017 +0200
Committer: kl0u <[email protected]>
Committed: Wed Apr 26 12:15:51 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/scala/pattern/Pattern.scala       | 286 +++++-----
 .../flink/cep/nfa/compiler/NFACompiler.java     |  42 +-
 .../org/apache/flink/cep/pattern/Pattern.java   | 271 ++++------
 .../apache/flink/cep/pattern/Quantifier.java    |  90 ++--
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 521 +++++++++++++++----
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   2 +-
 .../apache/flink/cep/pattern/PatternTest.java   |   8 +-
 7 files changed, 762 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index c636029..3214d09 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -20,7 +20,7 @@ package org.apache.flink.cep.scala.pattern
 import org.apache.flink.cep
 import org.apache.flink.cep.pattern.conditions.IterativeCondition
 import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context
-import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern}
+import org.apache.flink.cep.pattern.{MalformedPatternException, Quantifier, 
Pattern => JPattern}
 import org.apache.flink.streaming.api.windowing.time.Time
 
 /**
@@ -31,9 +31,8 @@ import org.apache.flink.streaming.api.windowing.time.Time
   *
   * {{{
   * Pattern<T, F> pattern = Pattern.<T>begin("start")
-  * .next("middle").subtype(F.class)
-  * .followedBy("end").where(new MyFilterFunction());
-  * }
+  *   .next("middle").subtype(F.class)
+  *   .followedBy("end").where(new MyCondition());
   * }}}
   *
   * @param jPattern Underlying Java API Pattern
@@ -45,6 +44,13 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   private[flink] def wrappedPattern = jPattern
 
   /**
+    * @return The previous pattern
+    */
+  def getPrevious(): Option[Pattern[T, _ <: T]] = {
+    wrapPattern(jPattern.getPrevious())
+  }
+
+  /**
     *
     * @return Name of the pattern operator
     */
@@ -64,170 +70,170 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     */
   def getQuantifier: Quantifier = jPattern.getQuantifier
 
-  /**
-    *
-    * @return Filter condition for an event to be matched
-    */
   def getCondition(): Option[IterativeCondition[F]] = {
     Option(jPattern.getCondition())
   }
 
   /**
-    * Applies a subtype constraint on the current pattern operator. This means 
that an event has
-    * to be of the given subtype in order to be matched.
+    * Adds a condition that has to be satisfied by an event
+    * in order to be considered a match. If another condition has already been
+    * set, the new one is going to be combined with the previous with a
+    * logical {{{AND}}}. In other case, this is going to be the only
+    * condition.
     *
-    * @param clazz Class of the subtype
-    * @tparam S Type of the subtype
-    * @return The same pattern operator with the new subtype constraint
+    * @param condition The condition as an [[IterativeCondition]].
+    * @return The pattern with the new condition is set.
     */
-  def subtype[S <: F](clazz: Class[S]): Pattern[T, S] = {
-    jPattern.subtype(clazz)
-    this.asInstanceOf[Pattern[T, S]]
-  }
-
-  /**
-    * Defines the maximum time interval for a matching pattern. This means 
that the time gap
-    * between first and the last event must not be longer than the window time.
-    *
-    * @param windowTime Time of the matching window
-    * @return The same pattern operator with the new window length
-    */
-  def within(windowTime: Time): Pattern[T, F] = {
-    jPattern.within(windowTime)
+  def where(condition: IterativeCondition[F]): Pattern[T, F] = {
+    jPattern.where(condition)
     this
   }
 
   /**
-    * Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
-    * temporal contiguity. This means that the whole pattern only matches if 
an event which matches
-    * this operator directly follows the preceding matching event. Thus, there 
cannot be any
-    * events in between two matching events.
+    * Adds a condition that has to be satisfied by an event
+    * in order to be considered a match. If another condition has already been
+    * set, the new one is going to be combined with the previous with a
+    * logical {{{AND}}}. In other case, this is going to be the only
+    * condition.
     *
-    * @param name Name of the new pattern operator
-    * @return A new pattern operator which is appended to this pattern operator
+    * @param condition The condition to be set.
+    * @return The pattern with the new condition is set.
     */
-  def next(name: String): Pattern[T, T] = {
-    Pattern[T, T](jPattern.next(name))
-  }
+  def where(condition: (F, Context[F]) => Boolean): Pattern[T, F] = {
+    val condFun = new IterativeCondition[F] {
+      val cleanCond = cep.scala.cleanClosure(condition)
 
-  /**
-    * Appends a new pattern operator to the existing one. The new pattern 
operator enforces
-    * non-strict temporal contiguity. This means that a matching event of this 
operator and the
-    * preceding matching event might be interleaved with other events which 
are ignored.
-    *
-    * @param name Name of the new pattern operator
-    * @return A new pattern operator which is appended to this pattern operator
-    */
-  def followedBy(name: String): FollowedByPattern[T, T] = {
-    FollowedByPattern(jPattern.followedBy(name))
+      override def filter(value: F, ctx: Context[F]): Boolean = 
cleanCond(value, ctx)
+    }
+    where(condFun)
   }
 
   /**
-    * Specifies a filter condition which has to be fulfilled by an event in 
order to be matched.
+    * Adds a condition that has to be satisfied by an event
+    * in order to be considered a match. If another condition has already been
+    * set, the new one is going to be combined with the previous with a
+    * logical {{{AND}}}. In other case, this is going to be the only
+    * condition.
     *
-    * @param filter Filter condition
-    * @return The same pattern operator where the new filter condition is set
+    * @param condition The condition to be set.
+    * @return The pattern with the new condition is set.
     */
-  def where(filter: IterativeCondition[F]): Pattern[T, F] = {
-    jPattern.where(filter)
-    this
+  def where(condition: F => Boolean): Pattern[T, F] = {
+    val condFun = new IterativeCondition[F] {
+      val cleanCond = cep.scala.cleanClosure(condition)
+
+      override def filter(value: F, ctx: Context[F]): Boolean = 
cleanCond(value)
+    }
+    where(condFun)
   }
 
   /**
-    * Specifies a filter condition which is ORed with an existing filter 
function.
+    * Adds a condition that has to be satisfied by an event
+    * in order to be considered a match. If another condition has already been
+    * set, the new one is going to be combined with the previous with a
+    * logical {{{OR}}}. In other case, this is going to be the only
+    * condition.
     *
-    * @param filter Or filter function
-    * @return The same pattern operator where the new filter condition is set
+    * @param condition The condition as an [[IterativeCondition]].
+    * @return The pattern with the new condition is set.
     */
-  def or(filter: IterativeCondition[F]): Pattern[T, F] = {
-    jPattern.or(filter)
+  def or(condition: IterativeCondition[F]): Pattern[T, F] = {
+    jPattern.or(condition)
     this
   }
 
   /**
-    * Specifies a filter condition which is ORed with an existing filter 
function.
+    * Adds a condition that has to be satisfied by an event
+    * in order to be considered a match. If another condition has already been
+    * set, the new one is going to be combined with the previous with a
+    * logical {{{OR}}}. In other case, this is going to be the only
+    * condition.
     *
-    * @param filterFun Or filter function
-    * @return The same pattern operator where the new filter condition is set
+    * @param condition The {{{OR}}} condition.
+    * @return The pattern with the new condition is set.
     */
-  def or(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = {
-    val filter = new IterativeCondition[F] {
-      val cleanFilter = cep.scala.cleanClosure(filterFun)
+  def or(condition: (F, Context[F]) => Boolean): Pattern[T, F] = {
+    val condFun = new IterativeCondition[F] {
+      val cleanCond = cep.scala.cleanClosure(condition)
 
-      override def filter(value: F, ctx: Context[F]): Boolean = 
cleanFilter(value, ctx)
+      override def filter(value: F, ctx: Context[F]): Boolean = 
cleanCond(value, ctx)
     }
-    or(filter)
+    or(condFun)
   }
 
   /**
-    * Specifies a filter condition which has to be fulfilled by an event in 
order to be matched.
+    * Applies a subtype constraint on the current pattern. This means that an 
event has
+    * to be of the given subtype in order to be matched.
     *
-    * @param filterFun Filter condition
-    * @return The same pattern operator where the new filter condition is set
+    * @param clazz Class of the subtype
+    * @tparam S Type of the subtype
+    * @return The same pattern with the new subtype constraint
     */
-  def where(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = {
-    val filter = new IterativeCondition[F] {
-      val cleanFilter = cep.scala.cleanClosure(filterFun)
-
-      override def filter(value: F, ctx: Context[F]): Boolean = 
cleanFilter(value, ctx)
-    }
-    where(filter)
+  def subtype[S <: F](clazz: Class[S]): Pattern[T, S] = {
+    jPattern.subtype(clazz)
+    this.asInstanceOf[Pattern[T, S]]
   }
 
   /**
-    * Specifies a filter condition which has to be fulfilled by an event in 
order to be matched.
+    * Defines the maximum time interval in which a matching pattern has to be 
completed in
+    * order to be considered valid. This interval corresponds to the maximum 
time gap between first
+    * and the last event.
     *
-    * @param filterFun Filter condition
-    * @return The same pattern operator where the new filter condition is set
+    * @param windowTime Time of the matching window
+    * @return The same pattern operator with the new window length
     */
-  def where(filterFun: F => Boolean): Pattern[T, F] = {
-    val filter = new IterativeCondition[F] {
-      val cleanFilter = cep.scala.cleanClosure(filterFun)
-
-      override def filter(value: F, ctx: Context[F]): Boolean = 
cleanFilter(value)
-    }
-    where(filter)
+  def within(windowTime: Time): Pattern[T, F] = {
+    jPattern.within(windowTime)
+    this
   }
 
   /**
+    * Appends a new pattern to the existing one. The new pattern enforces 
strict
+    * temporal contiguity. This means that the whole pattern sequence matches 
only
+    * if an event which matches this pattern directly follows the preceding 
matching
+    * event. Thus, there cannot be any events in between two matching events.
     *
-    * @return The previous pattern operator
+    * @param name Name of the new pattern
+    * @return A new pattern which is appended to this one
     */
-  def getPrevious(): Option[Pattern[T, _ <: T]] = {
-    wrapPattern(jPattern.getPrevious())
+  def next(name: String): Pattern[T, T] = {
+    Pattern[T, T](jPattern.next(name))
   }
 
   /**
-    * Specifies that this pattern can occur zero or more times(kleene star).
-    * This means any number of events can be matched in this state.
+    * Appends a new pattern to the existing one. The new pattern enforces 
non-strict
+    * temporal contiguity. This means that a matching event of this pattern 
and the
+    * preceding matching event might be interleaved with other events which 
are ignored.
     *
-    * @return The same pattern with applied Kleene star operator
+    * @param name Name of the new pattern
+    * @return A new pattern which is appended to this one
     */
-  def zeroOrMore: Pattern[T, F] = {
-    jPattern.zeroOrMore()
-    this
-  }
+  def followedBy(name: String) = FollowedByPattern(jPattern.followedBy(name))
 
   /**
-    * Specifies that this pattern can occur zero or more times(kleene star).
-    * This means any number of events can be matched in this state.
-    *
-    * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will 
generate patterns:
-    * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+    * Specifies that this pattern is optional for a final match of the pattern
+    * sequence to happen.
     *
-    * @param eager if true the pattern always consumes earlier events
-    * @return The same pattern with applied Kleene star operator
+    * @return The same pattern as optional.
+    * @throws MalformedPatternException if the quantifier is not applicable to 
this pattern.
     */
-  def zeroOrMore(eager: Boolean): Pattern[T, F] = {
-    jPattern.zeroOrMore(eager)
+  def optional: Pattern[T, F] = {
+    jPattern.optional()
     this
   }
 
   /**
-    * Specifies that this pattern can occur one or more times(kleene star).
-    * This means at least one and at most infinite number of events can be 
matched in this state.
+    * Specifies that this pattern can occur {{{one or more}}} times.
+    * This means at least one and at most infinite number of events can
+    * be matched to this pattern.
+    *
+    * If this quantifier is enabled for a
+    * pattern {{{A.oneOrMore().followedBy(B)}}} and a sequence of events
+    * {{{A1 A2 B}}} appears, this will generate patterns:
+    * {{{A1 B}}} and {{{A1 A2 B}}}. See also {{{allowCombinations()}}}.
     *
-    * @return The same pattern with applied Kleene plus operator
+    * @return The same pattern with a [[Quantifier.ONE_OR_MORE()]] quantifier 
applied.
+    * @throws MalformedPatternException if the quantifier is not applicable to 
this pattern.
     */
   def oneOrMore: Pattern[T, F] = {
     jPattern.oneOrMore()
@@ -235,63 +241,51 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
-    * Specifies that this pattern can occur one or more times(kleene star).
-    * This means at least one and at most infinite number of events can be 
matched in this state.
-    *
-    * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will 
generate patterns:
-    * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+    * Specifies exact number of times that this pattern should be matched.
     *
-    * @param eager if true the pattern always consumes earlier events
-    * @return The same pattern with applied Kleene plus operator
+    * @param times number of times matching event must appear
+    * @return The same pattern with number of times applied
+    * @throws MalformedPatternException if the quantifier is not applicable to 
this pattern.
     */
-  def oneOrMore(eager: Boolean): Pattern[T, F] = {
-    jPattern.oneOrMore(eager)
+  def times(times: Int): Pattern[T, F] = {
+    jPattern.times(times)
     this
   }
 
   /**
-    * Specifies that this pattern can occur zero or once.
+    * Applicable only to [[Quantifier.ONE_OR_MORE()]] patterns, this option
+    * allows more flexibility to the matching events.
     *
-    * @return The same pattern with applied Kleene ? operator
-    */
-  def optional: Pattern[T, F] = {
-    jPattern.optional()
-    this
-  }
-
-  /**
-    * Specifies exact number of times that this pattern should be matched.
+    * If {{{allowCombinations()}}} is not applied for a
+    * pattern {{{A.oneOrMore().followedBy(B)}}} and a sequence of events
+    * {{{A1 A2 B}}} appears, this will generate patterns:
+    * {{{A1 B}}} and {{{A1 A2 B}}}. If this method is applied, we
+    * will have {{{A1 B}}}, {{{A2 B}}} and {{{A1 A2 B}}}.
     *
-    * @param times number of times matching event must appear
-    * @return The same pattern with number of times applied
+    * @return The same pattern with the updated quantifier.
+    * @throws MalformedPatternException if the quantifier is not applicable to 
this pattern.
     */
-  def times(times: Int): Pattern[T, F] = {
-    jPattern.times(times)
+  def allowCombinations(): Pattern[T, F] = {
+    jPattern.allowCombinations()
     this
   }
 
-
   /**
-    * Works in conjunction with 
[[org.apache.flink.cep.scala.pattern.Pattern#zeroOrMore()]],
-    * [[org.apache.flink.cep.scala.pattern.Pattern#oneOrMore()]] or
-    * [[org.apache.flink.cep.scala.pattern.Pattern#times(int)]].
+    * Works in conjunction with [[Pattern#oneOrMore()]] or 
[[Pattern#times(int)]].
     * Specifies that any not matching element breaks the loop.
     *
-    * <p>E.g. a pattern like:
+    * E.g. a pattern like:
     * {{{
     * Pattern.begin("start").where(_.getName().equals("c"))
-    *        
.followedBy("middle").where(_.getName().equals("a")).oneOrMore(true).consecutive()
+    *        
.followedBy("middle").where(_.getName().equals("a")).oneOrMore().consecutive()
     *        .followedBy("end1").where(_.getName().equals("b"));
     * }}}
     *
-    * <p>for a sequence: C D A1 A2 A3 D A4 B
-    *
-    * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
+    * For a sequence: C D A1 A2 A3 D A4 B
     *
-    * <p><b>NOTICE:</b> This operator can be applied only when either 
zeroOrMore,
-    * oneOrMore or times was previously applied!
+    * will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
     *
-    * <p>By default a relaxed continuity is applied.
+    * By default a relaxed continuity is applied.
     * @return pattern with continuity changed to strict
     */
   def consecutive(): Pattern[T, F] = {
@@ -314,12 +308,12 @@ object Pattern {
   def apply[T, F <: T](jPattern: JPattern[T, F]) = new Pattern[T, F](jPattern)
 
   /**
-    * Starts a new pattern with the initial pattern operator whose name is 
provided. Furthermore,
-    * the base type of the event sequence is set.
+    * Starts a new pattern sequence. The provided name is the one of the 
initial pattern
+    * of the new sequence. Furthermore, the base type of the event sequence is 
set.
     *
-    * @param name Name of the new pattern operator
+    * @param name The name of starting pattern of the new pattern sequence
     * @tparam X Base type of the event pattern
-    * @return The first pattern operator of a pattern
+    * @return The first pattern of a pattern sequence
     */
   def begin[X](name: String): Pattern[X, X] = Pattern(JPattern.begin(name))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 5ab44c0..add1583 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -31,7 +31,6 @@ import org.apache.flink.cep.pattern.FollowedByPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -166,10 +165,10 @@ public class NFACompiler {
                                if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
                                        final State<T> looping = 
createLooping(lastSink);
 
-                                       if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+                                       if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
                                                lastSink = 
createFirstMandatoryStateOfLoop(looping);
                                        } else if (currentPattern instanceof 
FollowedByPattern &&
-                                                               
currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
+                                                               
currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) {
                                                lastSink = 
createWaitingStateForZeroOrMore(looping, lastSink);
                                        } else {
                                                lastSink = looping;
@@ -237,7 +236,7 @@ public class NFACompiler {
                        final State<T> beginningState;
                        if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
                                final State<T> loopingState = 
createLooping(sinkState);
-                               if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+                               if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
                                        beginningState = 
createFirstMandatoryStateOfLoop(loopingState);
                                } else {
                                        beginningState = loopingState;
@@ -266,9 +265,29 @@ public class NFACompiler {
                        for (int i = 0; i < times - 1; i++) {
                                lastSink = createSingletonState(
                                        lastSink,
-                                       
!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT));
+                                       
!currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE),
+                                       false);
                        }
-                       return createSingletonState(lastSink, currentPattern 
instanceof FollowedByPattern);
+
+                       // we created the intermediate states in the loop, now 
we create the start of the loop.
+                       if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
+                               return createSingletonState(lastSink, 
currentPattern instanceof FollowedByPattern, false);
+                       }
+
+                       final IterativeCondition<T> currentFilterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
+                       final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
+
+                       final State<T> singletonState = createNormalState();
+                       singletonState.addTake(lastSink, currentFilterFunction);
+                       singletonState.addProceed(sinkState, trueFunction);
+
+                       if (currentPattern instanceof FollowedByPattern) {
+                               State<T> ignoreState = createNormalState();
+                               ignoreState.addTake(lastSink, 
currentFilterFunction);
+                               
ignoreState.addIgnore(BooleanConditions.<T>trueFunction());
+                               singletonState.addIgnore(ignoreState, 
trueFunction);
+                       }
+                       return singletonState;
                }
 
                /**
@@ -281,7 +300,8 @@ public class NFACompiler {
                 */
                @SuppressWarnings("unchecked")
                private State<T> createSingletonState(final State<T> sinkState) 
{
-                       return createSingletonState(sinkState, currentPattern 
instanceof FollowedByPattern);
+                       return createSingletonState(sinkState, currentPattern 
instanceof FollowedByPattern,
+                                       
currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL));
                }
 
                /**
@@ -294,20 +314,20 @@ public class NFACompiler {
                 * @return the created state
                 */
                @SuppressWarnings("unchecked")
-               private State<T> createSingletonState(final State<T> sinkState, 
boolean addIgnore) {
+               private State<T> createSingletonState(final State<T> sinkState, 
boolean addIgnore, boolean isOptional) {
                        final IterativeCondition<T> currentFilterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
                        final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
 
                        final State<T> singletonState = createNormalState();
                        singletonState.addTake(sinkState, 
currentFilterFunction);
 
-                       if (currentPattern.getQuantifier() == 
Quantifier.OPTIONAL) {
+                       if (isOptional) {
                                singletonState.addProceed(sinkState, 
trueFunction);
                        }
 
                        if (addIgnore) {
                                final State<T> ignoreState;
-                               if (currentPattern.getQuantifier() == 
Quantifier.OPTIONAL) {
+                               if (isOptional) {
                                        ignoreState = createNormalState();
                                        ignoreState.addTake(sinkState, 
currentFilterFunction);
                                } else {
@@ -356,7 +376,7 @@ public class NFACompiler {
 
                        loopingState.addProceed(sinkState, trueFunction);
                        loopingState.addTake(filterFunction);
-                       if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
+                       if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) {
                                final State<T> ignoreState = 
createNormalState();
 
                                final IterativeCondition<T> ignoreCondition = 
getIgnoreCondition(currentPattern);

http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 92b655e..f1d0d63 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -30,13 +30,12 @@ import org.apache.flink.util.Preconditions;
 /**
  * Base class for a pattern definition.
  * <p>
- * A pattern definition is used by {@link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
- * a {@link NFA}.
+ * A pattern definition is used by {@link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create a {@link NFA}.
  *
  * <pre>{@code
  * Pattern<T, F> pattern = Pattern.<T>begin("start")
  *   .next("middle").subtype(F.class)
- *   .followedBy("end").where(new MyFilterFunction());
+ *   .followedBy("end").where(new MyCondition());
  * }
  * </pre>
  *
@@ -45,20 +44,25 @@ import org.apache.flink.util.Preconditions;
  */
 public class Pattern<T, F extends T> {
 
-       // name of the pattern operator
+       /** Name of the pattern. */
        private final String name;
 
-       // previous pattern operator
+       /** Previous pattern. */
        private final Pattern<T, ? extends T> previous;
 
-       // filter condition for an event to be matched
+       /** The condition an event has to satisfy to be considered a matched. */
        private IterativeCondition<F> condition;
 
-       // window length in which the pattern match has to occur
+       /** Window length in which the pattern match has to occur. */
        private Time windowTime;
 
-       private Quantifier quantifier = Quantifier.ONE;
+       /** A quantifier for the pattern. By default set to {@link 
Quantifier#ONE()}. */
+       private Quantifier quantifier = Quantifier.ONE();
 
+       /**
+        * Applicable to a {@code times} pattern, and holds
+        * the number of times it has to appear.
+        */
        private int times;
 
        protected Pattern(final String name, final Pattern<T, ? extends T> 
previous) {
@@ -66,16 +70,16 @@ public class Pattern<T, F extends T> {
                this.previous = previous;
        }
 
-       public String getName() {
-               return name;
-       }
-
        public Pattern<T, ? extends T> getPrevious() {
                return previous;
        }
 
-       public IterativeCondition<F> getCondition() {
-               return condition;
+       public int getTimes() {
+               return times;
+       }
+
+       public String getName() {
+               return name;
        }
 
        public Time getWindowTime() {
@@ -86,15 +90,31 @@ public class Pattern<T, F extends T> {
                return quantifier;
        }
 
-       public int getTimes() {
-               return times;
+       public IterativeCondition<F> getCondition() {
+               return condition;
        }
 
        /**
-        * Specifies a filter condition which has to be fulfilled by an event 
in order to be matched.
+        * Starts a new pattern sequence. The provided name is the one of the 
initial pattern
+        * of the new sequence. Furthermore, the base type of the event 
sequence is set.
         *
-        * @param condition Filter condition
-        * @return The same pattern operator where the new filter condition is 
set
+        * @param name The name of starting pattern of the new pattern sequence
+        * @param <X> Base type of the event pattern
+        * @return The first pattern of a pattern sequence
+        */
+       public static <X> Pattern<X, X> begin(final String name) {
+               return new Pattern<X, X>(name, null);
+       }
+
+       /**
+        * Adds a condition that has to be satisfied by an event
+        * in order to be considered a match. If another condition has already 
been
+        * set, the new one is going to be combined with the previous with a
+        * logical {@code AND}. In other case, this is going to be the only
+        * condition.
+        *
+        * @param condition The condition as an {@link IterativeCondition}.
+        * @return The pattern with the new condition is set.
         */
        public Pattern<T, F> where(IterativeCondition<F> condition) {
                ClosureCleaner.clean(condition, true);
@@ -108,10 +128,14 @@ public class Pattern<T, F extends T> {
        }
 
        /**
-        * Specifies a filter condition which is OR'ed with an existing filter 
function.
+        * Adds a condition that has to be satisfied by an event
+        * in order to be considered a match. If another condition has already 
been
+        * set, the new one is going to be combined with the previous with a
+        * logical {@code OR}. In other case, this is going to be the only
+        * condition.
         *
-        * @param condition OR filter condition
-        * @return The same pattern operator where the new filter condition is 
set
+        * @param condition The condition as an {@link IterativeCondition}.
+        * @return The pattern with the new condition is set.
         */
        public Pattern<T, F> or(IterativeCondition<F> condition) {
                ClosureCleaner.clean(condition, true);
@@ -125,19 +149,18 @@ public class Pattern<T, F extends T> {
        }
 
        /**
-        * Applies a subtype constraint on the current pattern operator. This 
means that an event has
+        * Applies a subtype constraint on the current pattern. This means that 
an event has
         * to be of the given subtype in order to be matched.
         *
         * @param subtypeClass Class of the subtype
         * @param <S> Type of the subtype
-        * @return The same pattern operator with the new subtype constraint
+        * @return The same pattern with the new subtype constraint
         */
        public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) 
{
                if (condition == null) {
                        this.condition = new SubtypeCondition<F>(subtypeClass);
                } else {
-                       this.condition = new AndCondition<>(this.condition,
-                                       new SubtypeCondition<F>(subtypeClass));
+                       this.condition = new AndCondition<>(condition, new 
SubtypeCondition<F>(subtypeClass));
                }
 
                @SuppressWarnings("unchecked")
@@ -147,8 +170,9 @@ public class Pattern<T, F extends T> {
        }
 
        /**
-        * Defines the maximum time interval for a matching pattern. This means 
that the time gap
-        * between first and the last event must not be longer than the window 
time.
+        * Defines the maximum time interval in which a matching pattern has to 
be completed in
+        * order to be considered valid. This interval corresponds to the 
maximum time gap between first
+        * and the last event.
         *
         * @param windowTime Time of the matching window
         * @return The same pattern operator with the new window length
@@ -162,130 +186,114 @@ public class Pattern<T, F extends T> {
        }
 
        /**
-        * Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
-        * temporal contiguity. This means that the whole pattern only matches 
if an event which matches
-        * this operator directly follows the preceding matching event. Thus, 
there cannot be any
-        * events in between two matching events.
+        * Appends a new pattern to the existing one. The new pattern enforces 
strict
+        * temporal contiguity. This means that the whole pattern sequence 
matches only
+        * if an event which matches this pattern directly follows the 
preceding matching
+        * event. Thus, there cannot be any events in between two matching 
events.
         *
-        * @param name Name of the new pattern operator
-        * @return A new pattern operator which is appended to this pattern 
operator
+        * @param name Name of the new pattern
+        * @return A new pattern which is appended to this one
         */
        public Pattern<T, T> next(final String name) {
                return new Pattern<T, T>(name, this);
        }
 
        /**
-        * Appends a new pattern operator to the existing one. The new pattern 
operator enforces
-        * non-strict temporal contiguity. This means that a matching event of 
this operator and the
+        * Appends a new pattern to the existing one. The new pattern enforces 
non-strict
+        * temporal contiguity. This means that a matching event of this 
pattern and the
         * preceding matching event might be interleaved with other events 
which are ignored.
         *
-        * @param name Name of the new pattern operator
-        * @return A new pattern operator which is appended to this pattern 
operator
+        * @param name Name of the new pattern
+        * @return A new pattern which is appended to this one
         */
        public FollowedByPattern<T, T> followedBy(final String name) {
                return new FollowedByPattern<T, T>(name, this);
        }
 
        /**
-        * Starts a new pattern with the initial pattern operator whose name is 
provided. Furthermore,
-        * the base type of the event sequence is set.
-        *
-        * @param name Name of the new pattern operator
-        * @param <X> Base type of the event pattern
-        * @return The first pattern operator of a pattern
-        */
-       public static <X> Pattern<X, X> begin(final String name) {
-               return new Pattern<X, X>(name, null);
-       }
-
-       /**
-        * Specifies that this pattern can occur zero or more times(kleene 
star).
-        * This means any number of events can be matched in this state.
+        * Specifies that this pattern is optional for a final match of the 
pattern
+        * sequence to happen.
         *
-        * @return The same pattern with applied Kleene star operator
-        *
-        * @throws MalformedPatternException if quantifier already applied
+        * @return The same pattern as optional.
+        * @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
         */
-       public Pattern<T, F> zeroOrMore() {
-               return zeroOrMore(true);
+       public Pattern<T, F> optional() {
+               quantifier.makeOptional();
+               return this;
        }
 
        /**
-        * Specifies that this pattern can occur zero or more times(kleene 
star).
-        * This means any number of events can be matched in this state.
-        *
-        * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will 
generate patterns:
-        * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+        * Specifies that this pattern can occur {@code one or more} times.
+        * This means at least one and at most infinite number of events can
+        * be matched to this pattern.
         *
-        * @param eager if true the pattern always consumes earlier events
-        * @return The same pattern with applied Kleene star operator
+        * <p>If this quantifier is enabled for a
+        * pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events
+        * {@code A1 A2 B} appears, this will generate patterns:
+        * {@code A1 B} and {@code A1 A2 B}. See also {@link 
#allowCombinations()}.
         *
-        * @throws MalformedPatternException if quantifier already applied
+        * @return The same pattern with a {@link Quantifier#ONE_OR_MORE()} 
quantifier applied.
+        * @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
         */
-       public Pattern<T, F> zeroOrMore(final boolean eager) {
+       public Pattern<T, F> oneOrMore() {
                checkIfQuantifierApplied();
-               if (eager) {
-                       this.quantifier = Quantifier.ZERO_OR_MORE_EAGER;
-               } else {
-                       this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS;
-               }
+               this.quantifier = Quantifier.ONE_OR_MORE();
                return this;
        }
 
        /**
-        * Specifies that this pattern can occur one or more times(kleene star).
-        * This means at least one and at most infinite number of events can be 
matched in this state.
+        * Specifies exact number of times that this pattern should be matched.
         *
-        * @return The same pattern with applied Kleene plus operator
+        * @param times number of times matching event must appear
+        * @return The same pattern with number of times applied
         *
-        * @throws MalformedPatternException if quantifier already applied
+        * @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
         */
-       public Pattern<T, F> oneOrMore() {
-               return oneOrMore(true);
+       public Pattern<T, F> times(int times) {
+               checkIfQuantifierApplied();
+               Preconditions.checkArgument(times > 0, "You should give a 
positive number greater than 0.");
+               this.quantifier = Quantifier.TIMES();
+               this.times = times;
+               return this;
        }
 
        /**
-        * Specifies that this pattern can occur one or more times(kleene star).
-        * This means at least one and at most infinite number of events can be 
matched in this state.
+        * Applicable only to {@link Quantifier#ONE_OR_MORE()} patterns, this 
option
+        * allows more flexibility to the matching events.
         *
-        * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will 
generate patterns:
-        * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+        * <p>If {@code allowCombinations()} is not applied for a
+        * pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events
+        * {@code A1 A2 B} appears, this will generate patterns:
+        * {@code A1 B} and {@code A1 A2 B}. If this method is applied, we
+        * will have {@code A1 B}, {@code A2 B} and {@code A1 A2 B}.
         *
-        * @param eager if true the pattern always consumes earlier events
-        * @return The same pattern with applied Kleene plus operator
-        *
-        * @throws MalformedPatternException if quantifier already applied
+        * @return The same pattern with the updated quantifier.         *
+        * @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
         */
-       public Pattern<T, F> oneOrMore(final boolean eager) {
-               checkIfQuantifierApplied();
-               if (eager) {
-                       this.quantifier = Quantifier.ONE_OR_MORE_EAGER;
-               } else {
-                       this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS;
-               }
+       public Pattern<T, F> allowCombinations() {
+               quantifier.allowAllCombinations();
                return this;
        }
 
        /**
-        * Works in conjunction with {@link Pattern#zeroOrMore()}, {@link 
Pattern#oneOrMore()} or {@link Pattern#times(int)}.
+        * Works in conjunction with {@link Pattern#oneOrMore()} or {@link 
Pattern#times(int)}.
         * Specifies that any not matching element breaks the loop.
         *
         * <p>E.g. a pattern like:
         * <pre>{@code
-        * Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+        * Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
         *      @Override
         *      public boolean filter(Event value) throws Exception {
         *          return value.getName().equals("c");
         *      }
         * })
-        * .followedBy("middle").where(new FilterFunction<Event>() {
+        * .followedBy("middle").where(new SimpleCondition<Event>() {
         *      @Override
         *      public boolean filter(Event value) throws Exception {
         *          return value.getName().equals("a");
         *      }
-        * })
-        * }<b>.oneOrMore(true).consecutive()</b>{@code
-        * .followedBy("end1").where(new FilterFunction<Event>() {
+        * }).oneOrMore().consecutive()
+        * .followedBy("end1").where(new SimpleCondition<Event>() {
         *      @Override
         *      public boolean filter(Event value) throws Exception {
         *          return value.getName().equals("b");
@@ -297,78 +305,19 @@ public class Pattern<T, F extends T> {
         *
         * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
         *
-        * <p><b>NOTICE:</b> This operator can be applied only when either 
zeroOrMore,
-        * oneOrMore or times was previously applied!
-        *
         * <p>By default a relaxed continuity is applied.
         *
         * @return pattern with continuity changed to strict
         */
        public Pattern<T, F> consecutive() {
-               switch (this.quantifier) {
-
-                       case ZERO_OR_MORE_EAGER:
-                               this.quantifier = 
Quantifier.ZERO_OR_MORE_EAGER_STRICT;
-                               break;
-                       case ZERO_OR_MORE_COMBINATIONS:
-                               this.quantifier = 
Quantifier.ZERO_OR_MORE_COMBINATIONS_STRICT;
-                               break;
-                       case ONE_OR_MORE_EAGER:
-                               this.quantifier = 
Quantifier.ONE_OR_MORE_EAGER_STRICT;
-                               break;
-                       case ONE_OR_MORE_COMBINATIONS:
-                               this.quantifier = 
Quantifier.ONE_OR_MORE_COMBINATIONS_STRICT;
-                               break;
-                       case TIMES:
-                               this.quantifier = Quantifier.TIMES_STRICT;
-                               break;
-                       case ZERO_OR_MORE_COMBINATIONS_STRICT:
-                       case ONE_OR_MORE_EAGER_STRICT:
-                       case ONE_OR_MORE_COMBINATIONS_STRICT:
-                       case ZERO_OR_MORE_EAGER_STRICT:
-                       case TIMES_STRICT:
-                               throw new MalformedPatternException("Strict 
continuity already applied! consecutive() called twice.");
-                       case ONE:
-                       case OPTIONAL:
-                               throw new MalformedPatternException("Strict 
continuity cannot be applied to " + this.quantifier);
-               }
-
-               return this;
-       }
-
-       /**
-        * Specifies that this pattern can occur zero or once.
-        *
-        * @return The same pattern with applied Kleene ? operator
-        *
-        * @throws MalformedPatternException if quantifier already applied
-        */
-       public Pattern<T, F> optional() {
-               checkIfQuantifierApplied();
-               this.quantifier = Quantifier.OPTIONAL;
-               return this;
-       }
-
-       /**
-        * Specifies exact number of times that this pattern should be matched.
-        *
-        * @param times number of times matching event must appear
-        * @return The same pattern with number of times applied
-        *
-        * @throws MalformedPatternException if quantifier already applied
-        */
-       public Pattern<T, F> times(int times) {
-               checkIfQuantifierApplied();
-               Preconditions.checkArgument(times > 0, "You should give a 
positive number greater than 0.");
-               this.quantifier = Quantifier.TIMES;
-               this.times = times;
+               quantifier.makeConsecutive();
                return this;
        }
 
        private void checkIfQuantifierApplied() {
-               if (this.quantifier != Quantifier.ONE) {
-                       throw new MalformedPatternException("Already applied 
quantifier to this Pattern. Current quantifier is: " + this.quantifier);
+               if 
(!quantifier.hasProperty(Quantifier.QuantifierProperty.SINGLE)) {
+                       throw new MalformedPatternException("Already applied 
quantifier to this Pattern. " +
+                                       "Current quantifier is: " + quantifier);
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 9789072..9deacd4 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -18,51 +18,83 @@
 package org.apache.flink.cep.pattern;
 
 import java.util.EnumSet;
+import java.util.Objects;
 
-public enum Quantifier {
-       ONE,
-       ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER),
-       ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
-       ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, 
QuantifierProperty.STRICT, QuantifierProperty.LOOPING),
-       ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, 
QuantifierProperty.LOOPING),
-       ONE_OR_MORE_EAGER(
-               QuantifierProperty.LOOPING,
-               QuantifierProperty.EAGER,
-               QuantifierProperty.AT_LEAST_ONE),
-       ONE_OR_MORE_EAGER_STRICT(
-               QuantifierProperty.STRICT,
-               QuantifierProperty.LOOPING,
-               QuantifierProperty.EAGER,
-               QuantifierProperty.AT_LEAST_ONE),
-       ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, 
QuantifierProperty.AT_LEAST_ONE),
-       ONE_OR_MORE_COMBINATIONS_STRICT(
-               QuantifierProperty.STRICT,
-               QuantifierProperty.LOOPING,
-               QuantifierProperty.AT_LEAST_ONE),
-       TIMES(QuantifierProperty.TIMES),
-       TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT),
-       OPTIONAL;
+public class Quantifier {
 
        private final EnumSet<QuantifierProperty> properties;
 
-       Quantifier(final QuantifierProperty first, final QuantifierProperty... 
rest) {
+       private Quantifier(final QuantifierProperty first, final 
QuantifierProperty... rest) {
                this.properties = EnumSet.of(first, rest);
        }
 
-       Quantifier() {
-               this.properties = EnumSet.noneOf(QuantifierProperty.class);
+       public static Quantifier ONE() {
+               return new Quantifier(QuantifierProperty.SINGLE);
+       }
+
+       public static Quantifier ONE_OR_MORE() {
+               return new Quantifier(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER);
+       }
+
+       public static Quantifier TIMES() {
+               return new Quantifier(QuantifierProperty.TIMES);
        }
 
        public boolean hasProperty(QuantifierProperty property) {
                return properties.contains(property);
        }
 
+       public void allowAllCombinations() {
+               if (!hasProperty(Quantifier.QuantifierProperty.EAGER)) {
+                       throw new MalformedPatternException("Combinations 
already allowed!");
+               }
+
+               if (hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
+                       properties.remove(Quantifier.QuantifierProperty.EAGER);
+               } else {
+                       throw new MalformedPatternException("Combinations not 
applicable to " + this + "!");
+               }
+       }
+
+       public void makeConsecutive() {
+               if (hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) {
+                       throw new MalformedPatternException("Strict continuity 
already applied!");
+               }
+
+               if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || 
hasProperty(Quantifier.QuantifierProperty.TIMES)) {
+                       
properties.add(Quantifier.QuantifierProperty.CONSECUTIVE);
+               } else {
+                       throw new MalformedPatternException("Strict continuity 
not applicable to " + this + "!");
+               }
+       }
+
+       public void makeOptional() {
+               if (hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+                       throw new MalformedPatternException("Optional already 
applied!");
+               }
+               properties.add(Quantifier.QuantifierProperty.OPTIONAL);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj instanceof Quantifier && 
properties.equals(((Quantifier)obj).properties);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hashCode(properties);
+       }
+
+       /**
+        * Properties that a {@link Quantifier} can have. Not all combinations 
are valid.
+        */
        public enum QuantifierProperty {
+               SINGLE,
                LOOPING,
+               TIMES,
                EAGER,
-               AT_LEAST_ONE,
-               STRICT,
-               TIMES
+               CONSECUTIVE,
+               OPTIONAL
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 7359ca8..cffb4e7 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -456,7 +456,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().allowCombinations().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -544,7 +544,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().allowCombinations().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -609,7 +609,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(true).followedBy("end1").where(new 
SimpleCondition<Event>() {
+               }).oneOrMore().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -664,7 +664,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore().followedBy("end").where(new 
SimpleCondition<Event>() {
+               }).oneOrMore().optional().followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -731,14 +731,14 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false).followedBy("middle-second").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().allowCombinations().optional().followedBy("middle-second").where(new
 SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("d");
                        }
-               }).zeroOrMore(false).followedBy("end").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().allowCombinations().optional().followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -824,7 +824,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("d");
                        }
-               }).zeroOrMore(false).followedBy("end").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().allowCombinations().optional().followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -892,7 +892,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore()
+               }).oneOrMore().optional()
                        .next("end").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
@@ -947,7 +947,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false).next("end").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().optional().allowCombinations().next("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1007,7 +1007,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
+               }).oneOrMore().allowCombinations().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1061,7 +1061,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore(false).followedBy("end").where(new 
SimpleCondition<Event>() {
+               }).oneOrMore().allowCombinations().followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1129,7 +1129,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).zeroOrMore().consecutive().followedBy("end").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().optional().consecutive().followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
7056763917392056548L;
 
                        @Override
@@ -1190,7 +1190,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore(true).followedBy("end1").where(new 
SimpleCondition<Event>() {
+               }).oneOrMore().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1602,7 +1602,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore();
+               }).oneOrMore().optional();
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
@@ -1656,7 +1656,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore();
+               }).oneOrMore().optional();
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
@@ -1788,6 +1788,228 @@ public class NFAITCase extends TestLogger {
                ), resultingPatterns);
        }
 
+       ///////////////////////////////         Optional           
////////////////////////////////////////
+
+       @Test
+       public void testTimesNonStrictOptional1() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(3).optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testTimesNonStrictOptional2() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testTimesStrictOptional() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               
}).times(2).consecutive().optional().followedBy("end1").where(new 
SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs 
optional()
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testTimesStrictOptional1() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).next("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               
}).times(2).consecutive().optional().followedBy("end1").where(new 
SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs 
optional()
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testOptionalTimesNonStrictWithNext() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).next("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).optional().followedBy("end1").where(new 
SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs 
optional()
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
+               ));
+       }
 
        ///////////////////////////////         Consecutive           
////////////////////////////////////////
 
@@ -1841,29 +2063,56 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent4, 7));
                inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 8));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+               Pattern<Event, ?> pattern;
+               if (eager) {
+                       pattern = Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
+                               private static final long serialVersionUID = 
5726188262756267490L;
 
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("c");
-                       }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("c");
+                               }
+                       }).followedBy("middle").where(new 
SimpleCondition<Event>() {
+                               private static final long serialVersionUID = 
5726188262756267490L;
 
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("a");
-                       }
-               }).oneOrMore(eager).consecutive()
-                       .followedBy("end1").where(new SimpleCondition<Event>() {
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("a");
+                               }
+                       }).oneOrMore().consecutive()
+                                       .followedBy("end1").where(new 
SimpleCondition<Event>() {
+                                               private static final long 
serialVersionUID = 5726188262756267490L;
+
+                                               @Override
+                                               public boolean filter(Event 
value) throws Exception {
+                                                       return 
value.getName().equals("b");
+                                               }
+                                       });
+               } else {
+                       pattern = Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
+                               private static final long serialVersionUID = 
5726188262756267490L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("c");
+                               }
+                       }).followedBy("middle").where(new 
SimpleCondition<Event>() {
                                private static final long serialVersionUID = 
5726188262756267490L;
 
                                @Override
                                public boolean filter(Event value) throws 
Exception {
-                                       return value.getName().equals("b");
+                                       return value.getName().equals("a");
                                }
-                       });
+                       }).oneOrMore().allowCombinations().consecutive()
+                                       .followedBy("end1").where(new 
SimpleCondition<Event>() {
+                                               private static final long 
serialVersionUID = 5726188262756267490L;
+
+                                               @Override
+                                               public boolean filter(Event 
value) throws Exception {
+                                                       return 
value.getName().equals("b");
+                                               }
+                                       });
+               }
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
@@ -1905,28 +2154,51 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
                inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+               Pattern<Event, ?> pattern = eager
+                               ? Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 5726188262756267490L;
 
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("c");
-                       }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("c");
+                                       }
+                               }).followedBy("middle").where(new 
SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 5726188262756267490L;
 
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("a");
-                       }
-               }).zeroOrMore(eager).consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("a");
+                                       }
+                               
}).oneOrMore().optional().consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 5726188262756267490L;
 
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("b");
-                       }
-               });
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("b");
+                                       }
+                               })
+                               : Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 5726188262756267490L;
+
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("c");
+                                       }
+                               }).followedBy("middle").where(new 
SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 5726188262756267490L;
+
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("a");
+                                       }
+                               
}).oneOrMore().allowCombinations().optional().consecutive().followedBy("end1").where(new
 SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 5726188262756267490L;
+
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("b");
+                                       }
+                               });
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
@@ -2033,7 +2305,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore().consecutive();
+               }).oneOrMore().optional().consecutive();
 
                testStartWithOneOrZeroOrMoreStrict(pattern);
        }
@@ -2183,7 +2455,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).oneOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
+               }).oneOrMore().allowCombinations().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -2227,7 +2499,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).zeroOrMore(false).followedBy("end1").where(new 
SimpleCondition<Event>() {
+               
}).oneOrMore().allowCombinations().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -2313,37 +2585,41 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(nextOne, 6));
                inputEvents.add(new StreamRecord<>(endEvent, 8));
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("start");
-                       }
-               }).followedBy("middle").subtype(SubEvent.class).where(new 
IterativeCondition<SubEvent>() {
-                       private static final long serialVersionUID = 
6215754202506583964L;
-
-                       @Override
-                       public boolean filter(SubEvent value, Context<SubEvent> 
ctx) throws Exception {
-                               if (!value.getName().startsWith("foo")) {
-                                       return false;
-                               }
+               Pattern<Event, ?> pattern = eager
+                               ? Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 5726188262756267490L;
 
-                               double sum = 0.0;
-                               for (Event event : 
ctx.getEventsForPattern("middle")) {
-                                       sum += event.getPrice();
-                               }
-                               sum += value.getPrice();
-                               return Double.compare(sum, 5.0) < 0;
-                       }
-               }).oneOrMore(eager).followedBy("end").where(new 
SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
7056763917392056548L;
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("start");
+                                       }
+                               })
+                               
.followedBy("middle").subtype(SubEvent.class).where(new 
MySubeventIterCondition()).oneOrMore()
+                               .followedBy("end").where(new 
SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 7056763917392056548L;
+
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("end");
+                                       }
+                               })
+                               : Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 5726188262756267490L;
 
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("end");
-                       }
-               });
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("start");
+                                       }
+                               })
+                               
.followedBy("middle").subtype(SubEvent.class).where(new 
MySubeventIterCondition()).oneOrMore().allowCombinations()
+                               .followedBy("end").where(new 
SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 7056763917392056548L;
+
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().equals("end");
+                                       }
+                               });
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
@@ -2352,6 +2628,25 @@ public class NFAITCase extends TestLogger {
                return resultingPatterns;
        }
 
+       private static class MySubeventIterCondition extends 
IterativeCondition<SubEvent> {
+
+               private static final long serialVersionUID = 
6215754202506583964L;
+
+               @Override
+               public boolean filter (SubEvent value, Context < SubEvent > 
ctx) throws Exception {
+                       if (!value.getName().startsWith("foo")) {
+                               return false;
+                       }
+
+                       double sum = 0.0;
+                       for (Event event : ctx.getEventsForPattern("middle")) {
+                               sum += event.getPrice();
+                       }
+                       sum += value.getPrice();
+                       return Double.compare(sum, 5.0) < 0;
+               }
+       }
+
        @Test
        public void testIterativeWithLoopingStartingEager() {
                List<List<Event>> actual = 
testIterativeWithLoopingStarting(true);
@@ -2395,30 +2690,25 @@ public class NFAITCase extends TestLogger {
                // behavior (which is the one applied in the case that the 
pattern graph starts with such a pattern)
                // of a looping pattern is with relaxed continuity (as in 
followedBy).
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
-                       private static final long serialVersionUID = 
6215754202506583964L;
-
-                       @Override
-                       public boolean filter(Event value, Context<Event> ctx) 
throws Exception {
-                               if (!value.getName().equals("start")) {
-                                       return false;
-                               }
-
-                               double sum = 0.0;
-                               for (Event event : 
ctx.getEventsForPattern("start")) {
-                                       sum += event.getPrice();
-                               }
-                               sum += value.getPrice();
-                               return Double.compare(sum, 5.0) < 0;
-                       }
-               }).zeroOrMore(eager).followedBy("end").where(new 
SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
7056763917392056548L;
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("end");
-                       }
-               });
+               Pattern<Event, ?> pattern = eager
+                               ? Pattern.<Event>begin("start").where(new 
MyEventIterCondition()).oneOrMore().optional()
+                                       .followedBy("end").where(new 
SimpleCondition<Event>() {
+                                               private static final long 
serialVersionUID = 7056763917392056548L;
+
+                                               @Override
+                                               public boolean filter(Event 
value) throws Exception {
+                                                       return 
value.getName().equals("end");
+                                               }
+                                       })
+                               : Pattern.<Event>begin("start").where(new 
MyEventIterCondition()).oneOrMore().allowCombinations().optional()
+                                       .followedBy("end").where(new 
SimpleCondition<Event>() {
+                                               private static final long 
serialVersionUID = 7056763917392056548L;
+
+                                               @Override
+                                               public boolean filter(Event 
value) throws Exception {
+                                                       return 
value.getName().equals("end");
+                                               }
+                                       });
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
@@ -2427,6 +2717,25 @@ public class NFAITCase extends TestLogger {
                return resultingPatterns;
        }
 
+       private static class MyEventIterCondition extends 
IterativeCondition<Event> {
+
+               private static final long serialVersionUID = 
6215754202506583964L;
+
+               @Override
+               public boolean filter(Event value, Context<Event> ctx) throws 
Exception {
+                       if (!value.getName().equals("start")) {
+                               return false;
+                       }
+
+                       double sum = 0.0;
+                       for (Event event : ctx.getEventsForPattern("start")) {
+                               sum += event.getPrice();
+                       }
+                       sum += value.getPrice();
+                       return Double.compare(sum, 5.0) < 0;
+               }
+       }
+
        @Test
        public void testIterativeWithPrevPatternDependency() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 80b1bcb..da1f9c8 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -153,7 +153,7 @@ public class NFACompilerTest extends TestLogger {
        public void testNFACompilerWithKleeneStar() {
 
                Pattern<Event, Event> pattern = 
Pattern.<Event>begin("start").where(startFilter)
-                       
.followedBy("middle").subtype(SubEvent.class).zeroOrMore()
+                       
.followedBy("middle").subtype(SubEvent.class).oneOrMore().optional()
                        .followedBy("end").where(endFilter);
 
                NFA<Event> nfa = NFACompiler.compile(pattern, serializer, 
false);

http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index e9aa7d2..8c4304a 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -196,7 +196,7 @@ public class PatternTest extends TestLogger {
                        public boolean filter(Object value) throws Exception {
                                return true;
                        }
-               }).oneOrMore().zeroOrMore();
+               }).oneOrMore().oneOrMore().optional();
        }
 
        @Test(expected = MalformedPatternException.class)
@@ -209,7 +209,7 @@ public class PatternTest extends TestLogger {
                        public boolean filter(Object value) throws Exception {
                                return true;
                        }
-               }).zeroOrMore().times(1);
+               }).oneOrMore().optional().times(1);
        }
 
        @Test(expected = MalformedPatternException.class)
@@ -235,7 +235,7 @@ public class PatternTest extends TestLogger {
                        public boolean filter(Object value) throws Exception {
                                return true;
                        }
-               }).oneOrMore().oneOrMore(true);
+               }).oneOrMore().oneOrMore();
        }
 
        @Test(expected = MalformedPatternException.class)
@@ -248,6 +248,6 @@ public class PatternTest extends TestLogger {
                        public boolean filter(Object value) throws Exception {
                                return true;
                        }
-               }).oneOrMore(true).zeroOrMore(true);
+               }).oneOrMore().oneOrMore().optional();
        }
 }

Reply via email to