[FLINK-6208] [cep] Implement skip till next match strategy

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c35dc0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c35dc0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c35dc0e

Branch: refs/heads/table-retraction
Commit: 7c35dc0e40a96f6b8d367b9a80a19e9a22525043
Parents: b527582
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Wed Apr 26 11:57:53 2017 +0200
Committer: kl0u <kklou...@gmail.com>
Committed: Fri Apr 28 14:28:37 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  19 +-
 .../cep/scala/pattern/FollowedByPattern.scala   |  44 --
 .../flink/cep/scala/pattern/Pattern.scala       |  31 +-
 .../flink/cep/scala/pattern/package.scala       |   3 +-
 .../flink/cep/scala/pattern/PatternTest.scala   |  52 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     | 192 ++---
 .../flink/cep/pattern/FollowedByPattern.java    |  33 -
 .../org/apache/flink/cep/pattern/Pattern.java   |  42 +-
 .../apache/flink/cep/pattern/Quantifier.java    |  82 +-
 .../java/org/apache/flink/cep/CEPITCase.java    |  30 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 760 +++++++++----------
 .../flink/cep/nfa/compiler/NFACompilerTest.java |  39 -
 .../flink/cep/operator/CEPOperatorTest.java     |   5 +-
 .../apache/flink/cep/pattern/PatternTest.java   |   9 +-
 14 files changed, 656 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 15afdf5..b379615 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -268,7 +268,7 @@ val strictNext: Pattern[Event, _] = start.next("middle")
 </div>
 
 Non-strict contiguity means that other events are allowed to occur in-between 
two matching events.
-A non-strict contiguity pattern state can be created via the `followedBy` 
method.
+A non-strict contiguity pattern state can be created via the `followedBy` or 
`followedByAny` method.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -282,6 +282,23 @@ Pattern<Event, ?> nonStrictNext = 
start.followedBy("middle");
 val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
 {% endhighlight %}
 </div>
+
+For non-strict contiguity one can specify if only the first succeeding 
matching event will be matched, or
+all. In the latter case multiple matches will be emitted for the same 
beginning.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Pattern<Event, ?> nonStrictNext = start.followedByAny("middle");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val nonStrictNext : Pattern[Event, _] = start.followedByAny("middle")
+{% endhighlight %}
+</div>
+
 </div>
 It is also possible to define a temporal constraint for the pattern to be 
valid.
 For example, one can define that a pattern should occur within 10 seconds via 
the `within` method. 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
deleted file mode 100644
index 4bda08f..0000000
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.cep.scala.pattern
-
-import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern}
-
-object FollowedByPattern {
-  /**
-    * Constructs a new Pattern by wrapping a given Java API Pattern
-    *
-    * @param jfbPattern Underlying Java API Pattern.
-    * @tparam T Base type of the elements appearing in the pattern
-    * @tparam F Subtype of T to which the current pattern operator is 
constrained
-    * @return New wrapping FollowedByPattern object
-    */
-  def apply[T, F <: T](jfbPattern: JFollowedByPattern[T, F]) =
-    new FollowedByPattern[T, F](jfbPattern)
-}
-
-/**
-  * Pattern operator which signifies that the there is a non-strict temporal 
contiguity between
-  * itself and its preceding pattern operator. This means that there might be 
events in between
-  * two matching events. These events are then simply ignored.
-  *
-  * @tparam T Base type of the events
-  * @tparam F Subtype of T to which the operator is currently constrained
-  */
-class FollowedByPattern[T, F <: T](jfbPattern: JFollowedByPattern[T, F])
-  extends Pattern[T, F](jfbPattern)

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 3bdbfcf..65b7ab0 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -46,22 +46,22 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   /**
     * @return The previous pattern
     */
-  def getPrevious(): Option[Pattern[T, _ <: T]] = {
-    wrapPattern(jPattern.getPrevious())
+  def getPrevious: Option[Pattern[T, _ <: T]] = {
+    wrapPattern(jPattern.getPrevious)
   }
 
   /**
     *
     * @return Name of the pattern operator
     */
-  def getName(): String = jPattern.getName()
+  def getName: String = jPattern.getName
 
   /**
     *
     * @return Window length in which the pattern match has to occur
     */
-  def getWindowTime(): Option[Time] = {
-    Option(jPattern.getWindowTime())
+  def getWindowTime: Option[Time] = {
+    Option(jPattern.getWindowTime)
   }
 
   /**
@@ -70,8 +70,8 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     */
   def getQuantifier: Quantifier = jPattern.getQuantifier
 
-  def getCondition(): Option[IterativeCondition[F]] = {
-    Option(jPattern.getCondition())
+  def getCondition: Option[IterativeCondition[F]] = {
+    Option(jPattern.getCondition)
   }
 
   /**
@@ -208,7 +208,22 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     * @param name Name of the new pattern
     * @return A new pattern which is appended to this one
     */
-  def followedBy(name: String) = FollowedByPattern(jPattern.followedBy(name))
+  def followedBy(name: String): Pattern[T, T] = {
+    Pattern[T, T](jPattern.followedBy(name))
+  }
+
+  /**
+    * Appends a new pattern to the existing one. The new pattern enforces 
non-strict
+    * temporal contiguity. This means that a matching event of this pattern 
and the
+    * preceding matching event might be interleaved with other events which 
are ignored.
+    *
+    * @param name Name of the new pattern
+    * @return A new pattern which is appended to this one
+    */
+  def followedByAny(name: String): Pattern[T, T] = {
+    Pattern[T, T](jPattern.followedByAny(name))
+  }
+
 
   /**
     * Specifies that this pattern is optional for a final match of the pattern

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
index 382c160..26355a5 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.cep.scala
 
-import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, 
Pattern => JPattern}
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
 
 package object pattern {
   /**
@@ -31,7 +31,6 @@ package object pattern {
     */
   private[flink] def wrapPattern[T, F <: T](javaPattern: JPattern[T, F])
   : Option[Pattern[T, F]] = javaPattern match {
-    case f: JFollowedByPattern[T, F] => Some(FollowedByPattern[T, F](f))
     case p: JPattern[T, F] => Some(Pattern[T, F](p))
     case _ => None
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index a95dddd..d574513 100644
--- 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++ 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -22,7 +22,7 @@ import org.junit.Assert._
 import org.junit.Test
 import org.apache.flink.cep.Event
 import org.apache.flink.cep.SubEvent
-import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context
+import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy
 import org.apache.flink.cep.pattern.conditions._
 
 class PatternTest {
@@ -33,7 +33,7 @@ class PatternTest {
     */
 
   @Test
-  def testStrictContiguity: Unit = {
+  def testStrictContiguity(): Unit = {
     val pattern = Pattern.begin[Event]("start").next("next").next("end")
     val jPattern = JPattern.begin[Event]("start").next("next").next("end")
 
@@ -56,7 +56,7 @@ class PatternTest {
 
 
   @Test
-  def testNonStrictContiguity: Unit = {
+  def testNonStrictContiguity(): Unit = {
     val pattern = 
Pattern.begin[Event]("start").followedBy("next").followedBy("end")
     val jPattern = 
JPattern.begin[Event]("start").followedBy("next").followedBy("end")
 
@@ -69,8 +69,8 @@ class PatternTest {
     assertTrue(previous.getPrevious.isDefined)
     assertFalse(preprevious.getPrevious.isDefined)
 
-    assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]])
-    assertTrue(previous.isInstanceOf[FollowedByPattern[_, _]])
+    assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, 
pattern.getQuantifier.getConsumingStrategy)
+    assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, 
previous.getQuantifier.getConsumingStrategy)
 
     assertEquals(pattern.getName, "end")
     assertEquals(previous.getName, "next")
@@ -78,25 +78,25 @@ class PatternTest {
   }
 
   @Test
-  def testStrictContiguityWithCondition: Unit = {
+  def testStrictContiguityWithCondition(): Unit = {
     val pattern = Pattern.begin[Event]("start")
       .next("next")
-      .where((value: Event, ctx: Context[Event]) => value.getName() == 
"foobar")
+      .where((value: Event, _) => value.getName == "foobar")
       .next("end")
-      .where((value: Event, ctx: Context[Event]) => value.getId() == 42)
+      .where((value: Event, _) => value.getId == 42)
 
     val jPattern = JPattern.begin[Event]("start")
       .next("next")
       .where(new SimpleCondition[Event]() {
         @throws[Exception]
         def filter(value: Event): Boolean = {
-          return value.getName() == "foobar"
+          value.getName == "foobar"
         }
       }).next("end")
       .where(new SimpleCondition[Event]() {
         @throws[Exception]
         def filter(value: Event): Boolean = {
-          return value.getId() == 42
+          value.getId == 42
         }
       })
 
@@ -120,7 +120,7 @@ class PatternTest {
   }
 
   @Test
-  def testPatternWithSubtyping: Unit = {
+  def testPatternWithSubtyping(): Unit = {
     val pattern = Pattern.begin[Event]("start")
       .next("subevent")
       .subtype(classOf[SubEvent])
@@ -150,11 +150,11 @@ class PatternTest {
   }
 
   @Test
-  def testPatternWithSubtypingAndFilter: Unit = {
+  def testPatternWithSubtypingAndFilter(): Unit = {
     val pattern = Pattern.begin[Event]("start")
       .next("subevent")
       .subtype(classOf[SubEvent])
-      .where((value: SubEvent) => false)
+      .where(_ => false)
       .followedBy("end")
 
     val jpattern = JPattern.begin[Event]("start")
@@ -163,7 +163,7 @@ class PatternTest {
       .where(new SimpleCondition[SubEvent]() {
         @throws[Exception]
         def filter(value: SubEvent): Boolean = {
-          return false
+          false
         }
       }).followedBy("end")
 
@@ -178,8 +178,8 @@ class PatternTest {
     assertTrue(previous.getPrevious.isDefined)
     assertFalse(preprevious.getPrevious.isDefined)
 
-    assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]])
-    assertTrue(previous.getCondition().isDefined)
+    assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, 
pattern.getQuantifier.getConsumingStrategy)
+    assertTrue(previous.getCondition.isDefined)
 
     assertEquals(pattern.getName, "end")
     assertEquals(previous.getName, "subevent")
@@ -194,25 +194,25 @@ class PatternTest {
       && threeWayEquals(
       pattern.getName,
       pattern.wrappedPattern.getName,
-      jPattern.getName())
+      jPattern.getName)
       //check equal time windows
       && threeWayEquals(
       pattern.getWindowTime.orNull,
       pattern.wrappedPattern.getWindowTime,
-      jPattern.getWindowTime())
+      jPattern.getWindowTime)
       //check congruent class names / types
       && threeWayEquals(
       pattern.getClass.getSimpleName,
       pattern.wrappedPattern.getClass.getSimpleName,
-      jPattern.getClass().getSimpleName())
+      jPattern.getClass.getSimpleName)
       //best effort to confirm congruent filter functions
       && compareFilterFunctions(
-      pattern.getCondition().orNull,
-      jPattern.getCondition())
+      pattern.getCondition.orNull,
+      jPattern.getCondition)
       //recursively check previous patterns
       && checkCongruentRepresentations(
       pattern.getPrevious.orNull,
-      jPattern.getPrevious()))
+      jPattern.getPrevious))
   }
 
   def threeWayEquals(a: AnyRef, b: AnyRef, c: AnyRef): Boolean = {
@@ -233,15 +233,15 @@ class PatternTest {
     (sFilter, jFilter) match {
       //matching types: and-filter; branch and recurse for inner filters
       case (saf: AndCondition[_], jaf: AndCondition[_])
-      => (compareFilterFunctions(saf.getLeft(), jaf.getLeft())
-        && compareFilterFunctions(saf.getRight(), jaf.getRight()))
+      => (compareFilterFunctions(saf.getLeft, jaf.getLeft)
+        && compareFilterFunctions(saf.getRight, jaf.getRight))
       //matching types: subtype-filter
-      case (saf: SubtypeCondition[_], jaf: SubtypeCondition[_]) => true
+      case (_: SubtypeCondition[_], _: SubtypeCondition[_]) => true
       //mismatch: one-sided and/subtype-filter
       case (_: AndCondition[_] | _: SubtypeCondition[_], _) => false
       case (_, _: AndCondition[_] | _: SubtypeCondition[_]) => false
       //from here we can only check mutual presence or absence of a function
-      case (s: IterativeCondition[_], j: IterativeCondition[_]) => true
+      case (_: IterativeCondition[_], _: IterativeCondition[_]) => true
       case (null, null) => true
       case _ => false
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index b4e0557..0ca0e14 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -20,32 +20,30 @@ package org.apache.flink.cep.nfa.compiler;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
-import org.apache.flink.cep.pattern.conditions.BooleanConditions;
-import org.apache.flink.cep.pattern.FollowedByPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
-import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
-import javax.annotation.Nullable;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Compiler class containing methods to compile a {@link Pattern} into a 
{@link NFA} or a
  * {@link NFAFactory}.
@@ -159,25 +157,7 @@ public class NFACompiler {
 
                        State<T> lastSink = sinkState;
                        while (currentPattern.getPrevious() != null) {
-                               checkPatternNameUniqueness();
-                               usedNames.add(currentPattern.getName());
-
-                               if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
-                                       final State<T> looping = 
createLooping(lastSink);
-
-                                       if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
-                                               lastSink = 
createFirstMandatoryStateOfLoop(looping);
-                                       } else if (currentPattern instanceof 
FollowedByPattern &&
-                                                               
currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) {
-                                               lastSink = 
createWaitingStateForZeroOrMore(looping, lastSink);
-                                       } else {
-                                               lastSink = looping;
-                                       }
-                               } else if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
-                                       lastSink = createTimesState(lastSink, 
currentPattern.getTimes());
-                               } else {
-                                       lastSink = 
createSingletonState(lastSink);
-                               }
+                               lastSink = convertPattern(lastSink);
                                currentPattern = currentPattern.getPrevious();
 
                                final Time currentWindowTime = 
currentPattern.getWindowTime();
@@ -190,6 +170,29 @@ public class NFACompiler {
                        return lastSink;
                }
 
+               private State<T> convertPattern(final State<T> sinkState) {
+                       final State<T> lastSink;
+                       checkPatternNameUniqueness();
+                       usedNames.add(currentPattern.getName());
+
+                       final Quantifier quantifier = 
currentPattern.getQuantifier();
+                       if 
(quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
+                               final State<T> looping = 
createLooping(sinkState);
+
+                               if 
(!quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+                                       lastSink = 
createFirstMandatoryStateOfLoop(looping);
+                               } else {
+                                       lastSink = 
createWaitingStateForZeroOrMore(looping, sinkState);
+                               }
+                       } else if 
(quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) {
+                               lastSink = createTimesState(sinkState, 
currentPattern.getTimes());
+                       } else {
+                               lastSink = createSingletonState(sinkState);
+                       }
+
+                       return lastSink;
+               }
+
                /**
                 * Creates a pair of states that enables relaxed strictness 
before a zeroOrMore looping state.
                 *
@@ -197,19 +200,21 @@ public class NFACompiler {
                 * @param lastSink     the state that the looping one points to
                 * @return the newly created state
                 */
+               @SuppressWarnings("unchecked")
                private State<T> createWaitingStateForZeroOrMore(final State<T> 
loopingState, final State<T> lastSink) {
-                       final State<T> followByState = createNormalState();
-                       final State<T> followByStateWithoutProceed = 
createNormalState();
-
                        final IterativeCondition<T> currentFunction = 
(IterativeCondition<T>)currentPattern.getCondition();
-                       final IterativeCondition<T> ignoreFunction = 
getIgnoreCondition(currentPattern);
 
+                       final State<T> followByState = createNormalState();
                        followByState.addProceed(lastSink, 
BooleanConditions.<T>trueFunction());
-                       followByState.addIgnore(followByStateWithoutProceed, 
ignoreFunction);
                        followByState.addTake(loopingState, currentFunction);
 
-                       followByStateWithoutProceed.addIgnore(ignoreFunction);
-                       followByStateWithoutProceed.addTake(loopingState, 
currentFunction);
+                       final IterativeCondition<T> ignoreFunction = 
getIgnoreCondition(currentPattern);
+                       if (ignoreFunction != null) {
+                               final State<T> followByStateWithoutProceed = 
createNormalState();
+                               
followByState.addIgnore(followByStateWithoutProceed, ignoreFunction);
+                               
followByStateWithoutProceed.addIgnore(ignoreFunction);
+                               
followByStateWithoutProceed.addTake(loopingState, currentFunction);
+                       }
 
                        return followByState;
                }
@@ -230,23 +235,7 @@ public class NFACompiler {
                 */
                @SuppressWarnings("unchecked")
                private State<T> createStartState(State<T> sinkState) {
-                       checkPatternNameUniqueness();
-                       usedNames.add(currentPattern.getName());
-
-                       final State<T> beginningState;
-                       if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
-                               final State<T> loopingState = 
createLooping(sinkState);
-                               if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
-                                       beginningState = 
createFirstMandatoryStateOfLoop(loopingState);
-                               } else {
-                                       beginningState = loopingState;
-                               }
-                       } else if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
-                               beginningState = createTimesState(sinkState, 
currentPattern.getTimes());
-                       } else {
-                               beginningState = 
createSingletonState(sinkState);
-                       }
-
+                       final State<T> beginningState = 
convertPattern(sinkState);
                        beginningState.makeStart();
 
                        return beginningState;
@@ -263,29 +252,26 @@ public class NFACompiler {
                private State<T> createTimesState(final State<T> sinkState, int 
times) {
                        State<T> lastSink = sinkState;
                        for (int i = 0; i < times - 1; i++) {
-                               lastSink = createSingletonState(
-                                       lastSink,
-                                       
!currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE),
-                                       false);
+                               lastSink = createSingletonState(lastSink, 
getInnerIgnoreCondition(currentPattern), false);
                        }
 
+                       final IterativeCondition<T> currentFilterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
+                       final IterativeCondition<T> ignoreCondition = 
getIgnoreCondition(currentPattern);
+
                        // we created the intermediate states in the loop, now 
we create the start of the loop.
-                       if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
-                               return createSingletonState(lastSink, 
currentPattern instanceof FollowedByPattern, false);
+                       if 
(!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL))
 {
+                               return createSingletonState(lastSink, 
ignoreCondition, false);
                        }
 
-                       final IterativeCondition<T> currentFilterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
-                       final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
-
                        final State<T> singletonState = createNormalState();
                        singletonState.addTake(lastSink, currentFilterFunction);
-                       singletonState.addProceed(sinkState, trueFunction);
+                       singletonState.addProceed(sinkState, 
BooleanConditions.<T>trueFunction());
 
-                       if (currentPattern instanceof FollowedByPattern) {
+                       if (ignoreCondition != null) {
                                State<T> ignoreState = createNormalState();
                                ignoreState.addTake(lastSink, 
currentFilterFunction);
-                               ignoreState.addIgnore(trueFunction);
-                               singletonState.addIgnore(ignoreState, 
trueFunction);
+                               ignoreState.addIgnore(ignoreCondition);
+                               singletonState.addIgnore(ignoreState, 
ignoreCondition);
                        }
                        return singletonState;
                }
@@ -300,8 +286,10 @@ public class NFACompiler {
                 */
                @SuppressWarnings("unchecked")
                private State<T> createSingletonState(final State<T> sinkState) 
{
-                       return createSingletonState(sinkState, currentPattern 
instanceof FollowedByPattern,
-                                       
currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL));
+                       return createSingletonState(
+                               sinkState,
+                               getIgnoreCondition(currentPattern),
+                               
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
                }
 
                /**
@@ -309,12 +297,12 @@ public class NFACompiler {
                 * of a similar state without the PROCEED edge, so that for 
each PROCEED transition branches
                 * in computation state graph  can be created only once.
                 *
-                * @param addIgnore if any IGNORE should be added
+                * @param ignoreCondition condition that should be applied to 
IGNORE transition
                 * @param sinkState state that the state being converted should 
point to
                 * @return the created state
                 */
                @SuppressWarnings("unchecked")
-               private State<T> createSingletonState(final State<T> sinkState, 
boolean addIgnore, boolean isOptional) {
+               private State<T> createSingletonState(final State<T> sinkState, 
final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
                        final IterativeCondition<T> currentFilterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
                        final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
 
@@ -325,7 +313,7 @@ public class NFACompiler {
                                singletonState.addProceed(sinkState, 
trueFunction);
                        }
 
-                       if (addIgnore) {
+                       if (ignoreCondition != null) {
                                final State<T> ignoreState;
                                if (isOptional) {
                                        ignoreState = createNormalState();
@@ -333,7 +321,7 @@ public class NFACompiler {
                                } else {
                                        ignoreState = singletonState;
                                }
-                               singletonState.addIgnore(ignoreState, 
getIgnoreCondition(currentPattern));
+                               singletonState.addIgnore(ignoreState, 
ignoreCondition);
                        }
                        return singletonState;
                }
@@ -352,8 +340,8 @@ public class NFACompiler {
                        final State<T> firstState = createNormalState();
 
                        firstState.addTake(sinkState, currentFilterFunction);
-                       if (currentPattern instanceof FollowedByPattern) {
-                               final IterativeCondition<T> ignoreCondition = 
getIgnoreCondition(currentPattern);
+                       final IterativeCondition<T> ignoreCondition = 
getIgnoreCondition(currentPattern);
+                       if (ignoreCondition != null) {
                                firstState.addIgnore(ignoreCondition);
                        }
                        return firstState;
@@ -369,18 +357,16 @@ public class NFACompiler {
                 */
                @SuppressWarnings("unchecked")
                private State<T> createLooping(final State<T> sinkState) {
-
-                       final State<T> loopingState = createNormalState();
                        final IterativeCondition<T> filterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
+                       final IterativeCondition<T> ignoreCondition = 
getInnerIgnoreCondition(currentPattern);
                        final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
 
+                       final State<T> loopingState = createNormalState();
                        loopingState.addProceed(sinkState, trueFunction);
                        loopingState.addTake(filterFunction);
-                       if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) {
-                               final State<T> ignoreState = 
createNormalState();
-
-                               final IterativeCondition<T> ignoreCondition = 
getIgnoreCondition(currentPattern);
 
+                       if (ignoreCondition != null) {
+                               final State<T> ignoreState = 
createNormalState();
                                ignoreState.addTake(loopingState, 
filterFunction);
                                ignoreState.addIgnore(ignoreCondition);
                                loopingState.addIgnore(ignoreState, 
ignoreCondition);
@@ -403,15 +389,37 @@ public class NFACompiler {
 
                /**
                 * @return The {@link IterativeCondition condition} for the 
{@code IGNORE} edge
-                * that corresponds to the specified {@link Pattern}. If the 
pattern is
-                * {@link QuantifierProperty#EAGER}, the negated user-specified 
condition is
-                * returned. In other case, a condition that always evaluated 
to {@code true} is
-                * returned.
+                * that corresponds to the specified {@link Pattern}. It is 
applicable only for inner states of a complex
+                * state like looping or times.
                 */
+               @SuppressWarnings("unchecked")
+               private IterativeCondition<T> 
getInnerIgnoreCondition(Pattern<T, ?> pattern) {
+                       switch 
(pattern.getQuantifier().getInnerConsumingStrategy()) {
+                               case STRICT:
+                                       return null;
+                               case SKIP_TILL_NEXT:
+                                       return new 
NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+                               case SKIP_TILL_ANY:
+                                       return BooleanConditions.trueFunction();
+                       }
+                       return null;
+               }
+
+               /**
+                * @return The {@link IterativeCondition condition} for the 
{@code IGNORE} edge
+                * that corresponds to the specified {@link Pattern}. For more 
on strategy see {@link Quantifier}
+                */
+               @SuppressWarnings("unchecked")
                private IterativeCondition<T> getIgnoreCondition(Pattern<T, ?> 
pattern) {
-                       return 
pattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)
-                                       ? new 
NotCondition<>((IterativeCondition<T>) pattern.getCondition())
-                                       : BooleanConditions.<T>trueFunction();
+                       switch (pattern.getQuantifier().getConsumingStrategy()) 
{
+                               case STRICT:
+                                       return null;
+                               case SKIP_TILL_NEXT:
+                                       return new 
NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+                               case SKIP_TILL_ANY:
+                                       return BooleanConditions.trueFunction();
+                       }
+                       return null;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
deleted file mode 100644
index 266451c..0000000
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.pattern;
-
-/**
- * Pattern operator which signifies that the there is a non-strict temporal 
contiguity between
- * itself and its preceding pattern operator. This means that there might be 
events in between
- * two matching events. These events are then simply ignored.
- *
- * @param <T> Base type of the events
- * @param <F> Subtype of T to which the operator is currently constrained
- */
-public class FollowedByPattern<T, F extends T> extends Pattern<T, F> {
-       FollowedByPattern(final String name, Pattern<T, ?> previous) {
-               super(name, previous);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 3ce0c73..cef0f85 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.pattern;
 
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
 import org.apache.flink.cep.pattern.conditions.AndCondition;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.OrCondition;
@@ -56,8 +57,8 @@ public class Pattern<T, F extends T> {
        /** Window length in which the pattern match has to occur. */
        private Time windowTime;
 
-       /** A quantifier for the pattern. By default set to {@link 
Quantifier#ONE()}. */
-       private Quantifier quantifier = Quantifier.ONE();
+       /** A quantifier for the pattern. By default set to {@link 
Quantifier#ONE(ConsumingStrategy)}. */
+       private Quantifier quantifier = 
Quantifier.ONE(ConsumingStrategy.STRICT);
 
        /**
         * Applicable to a {@code times} pattern, and holds
@@ -70,6 +71,15 @@ public class Pattern<T, F extends T> {
                this.previous = previous;
        }
 
+       protected Pattern(
+                       final String name,
+                       final Pattern<T, ? extends T> previous,
+                       final ConsumingStrategy consumingStrategy) {
+               this.name = name;
+               this.previous = previous;
+               this.quantifier = Quantifier.ONE(consumingStrategy);
+       }
+
        public Pattern<T, ? extends T> getPrevious() {
                return previous;
        }
@@ -195,7 +205,19 @@ public class Pattern<T, F extends T> {
         * @return A new pattern which is appended to this one
         */
        public Pattern<T, T> next(final String name) {
-               return new Pattern<T, T>(name, this);
+               return new Pattern<T, T>(name, this, ConsumingStrategy.STRICT);
+       }
+
+       /**
+        * Appends a new pattern to the existing one. The new pattern enforces 
non-strict
+        * temporal contiguity. This means that a matching event of this 
pattern and the
+        * preceding matching event might be interleaved with other events 
which are ignored.
+        *
+        * @param name Name of the new pattern
+        * @return A new pattern which is appended to this one
+        */
+       public Pattern<T, T> followedBy(final String name) {
+               return new Pattern<>(name, this, 
ConsumingStrategy.SKIP_TILL_NEXT);
        }
 
        /**
@@ -206,8 +228,8 @@ public class Pattern<T, F extends T> {
         * @param name Name of the new pattern
         * @return A new pattern which is appended to this one
         */
-       public FollowedByPattern<T, T> followedBy(final String name) {
-               return new FollowedByPattern<T, T>(name, this);
+       public Pattern<T, T> followedByAny(final String name) {
+               return new Pattern<>(name, this, 
ConsumingStrategy.SKIP_TILL_ANY);
        }
 
        /**
@@ -232,12 +254,12 @@ public class Pattern<T, F extends T> {
         * {@code A1 A2 B} appears, this will generate patterns:
         * {@code A1 B} and {@code A1 A2 B}. See also {@link 
#allowCombinations()}.
         *
-        * @return The same pattern with a {@link Quantifier#ONE_OR_MORE()} 
quantifier applied.
+        * @return The same pattern with a {@link 
Quantifier#ONE_OR_MORE(ConsumingStrategy)} quantifier applied.
         * @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
         */
        public Pattern<T, F> oneOrMore() {
                checkIfQuantifierApplied();
-               this.quantifier = Quantifier.ONE_OR_MORE();
+               this.quantifier = 
Quantifier.ONE_OR_MORE(quantifier.getConsumingStrategy());
                return this;
        }
 
@@ -252,14 +274,14 @@ public class Pattern<T, F extends T> {
        public Pattern<T, F> times(int times) {
                checkIfQuantifierApplied();
                Preconditions.checkArgument(times > 0, "You should give a 
positive number greater than 0.");
-               this.quantifier = Quantifier.TIMES();
+               this.quantifier = 
Quantifier.TIMES(quantifier.getConsumingStrategy());
                this.times = times;
                return this;
        }
 
        /**
-        * Applicable only to {@link Quantifier#ONE_OR_MORE()} and {@link 
Quantifier#TIMES()} patterns,
-        * this option allows more flexibility to the matching events.
+        * Applicable only to {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} 
and
+        * {@link Quantifier#TIMES(ConsumingStrategy)} patterns, this option 
allows more flexibility to the matching events.
         *
         * <p>If {@code allowCombinations()} is not applied for a
         * pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 0332ed4..b0f882c 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -24,48 +24,62 @@ public class Quantifier {
 
        private final EnumSet<QuantifierProperty> properties;
 
-       private Quantifier(final QuantifierProperty first, final 
QuantifierProperty... rest) {
+       private final ConsumingStrategy consumingStrategy;
+
+       private ConsumingStrategy innerConsumingStrategy = 
ConsumingStrategy.SKIP_TILL_NEXT;
+
+       private Quantifier(
+                       final ConsumingStrategy consumingStrategy,
+                       final QuantifierProperty first,
+                       final QuantifierProperty... rest) {
                this.properties = EnumSet.of(first, rest);
+               this.consumingStrategy = consumingStrategy;
        }
 
-       public static Quantifier ONE() {
-               return new Quantifier(QuantifierProperty.SINGLE);
+       public static Quantifier ONE(final ConsumingStrategy consumingStrategy) 
{
+               return new Quantifier(consumingStrategy, 
QuantifierProperty.SINGLE);
        }
 
-       public static Quantifier ONE_OR_MORE() {
-               return new Quantifier(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER);
+       public static Quantifier ONE_OR_MORE(final ConsumingStrategy 
consumingStrategy) {
+               return new Quantifier(consumingStrategy, 
QuantifierProperty.LOOPING);
        }
 
-       public static Quantifier TIMES() {
-               return new Quantifier(QuantifierProperty.TIMES, 
QuantifierProperty.EAGER);
+       public static Quantifier TIMES(final ConsumingStrategy 
consumingStrategy) {
+               return new Quantifier(consumingStrategy, 
QuantifierProperty.TIMES);
        }
 
        public boolean hasProperty(QuantifierProperty property) {
                return properties.contains(property);
        }
 
-       public void combinations() {
-               if (!hasProperty(QuantifierProperty.SINGLE) && 
!hasProperty(Quantifier.QuantifierProperty.EAGER)) {
-                       throw new MalformedPatternException("Combinations 
already allowed!");
-               }
+       public ConsumingStrategy getConsumingStrategy() {
+               return consumingStrategy;
+       }
 
-               if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || 
hasProperty(Quantifier.QuantifierProperty.TIMES)) {
-                       properties.remove(Quantifier.QuantifierProperty.EAGER);
-               } else {
-                       throw new MalformedPatternException("Combinations not 
applicable to " + this + "!");
+       public ConsumingStrategy getInnerConsumingStrategy() {
+               return innerConsumingStrategy;
+       }
+
+       private static void checkPattern(boolean condition, Object 
errorMessage) {
+               if (!condition) {
+                       throw new 
MalformedPatternException(String.valueOf(errorMessage));
                }
        }
 
+       public void combinations() {
+               checkPattern(!hasProperty(QuantifierProperty.SINGLE), 
"Combinations not applicable to " + this + "!");
+               checkPattern(innerConsumingStrategy != 
ConsumingStrategy.STRICT, "You can apply apply either combinations or 
consecutive, not both!");
+               checkPattern(innerConsumingStrategy != 
ConsumingStrategy.SKIP_TILL_ANY, "Combinations already applied!");
+
+               innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_ANY;
+       }
+
        public void consecutive() {
-               if (!hasProperty(QuantifierProperty.SINGLE) && 
hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) {
-                       throw new MalformedPatternException("Strict continuity 
already applied!");
-               }
+               checkPattern(hasProperty(QuantifierProperty.LOOPING) || 
hasProperty(QuantifierProperty.TIMES), "Combinations not applicable to " + this 
+ "!");
+               checkPattern(innerConsumingStrategy != 
ConsumingStrategy.SKIP_TILL_ANY, "You can apply apply either combinations or 
consecutive, not both!");
+               checkPattern(innerConsumingStrategy != 
ConsumingStrategy.STRICT, "Combinations already applied!");
 
-               if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || 
hasProperty(Quantifier.QuantifierProperty.TIMES)) {
-                       
properties.add(Quantifier.QuantifierProperty.CONSECUTIVE);
-               } else {
-                       throw new MalformedPatternException("Strict continuity 
not applicable to " + this + "!");
-               }
+               innerConsumingStrategy = ConsumingStrategy.STRICT;
        }
 
        public void optional() {
@@ -76,13 +90,21 @@ public class Quantifier {
        }
 
        @Override
-       public boolean equals(Object obj) {
-               return obj instanceof Quantifier && 
properties.equals(((Quantifier)obj).properties);
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               Quantifier that = (Quantifier) o;
+               return Objects.equals(properties, that.properties) &&
+                               consumingStrategy == that.consumingStrategy;
        }
 
        @Override
        public int hashCode() {
-               return Objects.hashCode(properties);
+               return Objects.hash(properties, consumingStrategy);
        }
 
        /**
@@ -92,9 +114,13 @@ public class Quantifier {
                SINGLE,
                LOOPING,
                TIMES,
-               EAGER,
-               CONSECUTIVE,
                OPTIONAL
        }
 
+       public enum ConsumingStrategy {
+               STRICT,
+               SKIP_TILL_NEXT,
+               SKIP_TILL_ANY
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 3a32175..f62c686 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -96,7 +96,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                                return value.getName().equals("start");
                        }
                })
-               .followedBy("middle").subtype(SubEvent.class).where(
+               .followedByAny("middle").subtype(SubEvent.class).where(
                                new SimpleCondition<SubEvent>() {
 
                                        @Override
@@ -105,7 +105,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                                        }
                                }
                        )
-               .followedBy("end").where(new SimpleCondition<Event>() {
+               .followedByAny("end").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -171,7 +171,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                                return value.getName().equals("start");
                        }
                })
-                       .followedBy("middle").subtype(SubEvent.class).where(
+                       .followedByAny("middle").subtype(SubEvent.class).where(
                                new SimpleCondition<SubEvent>() {
 
                                        @Override
@@ -180,7 +180,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                                        }
                                }
                        )
-                       .followedBy("end").where(new SimpleCondition<Event>() {
+                       .followedByAny("end").where(new 
SimpleCondition<Event>() {
 
                                @Override
                                public boolean filter(Event value) throws 
Exception {
@@ -250,13 +250,13 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new SimpleCondition<Event>() {
+               }).followedByAny("end").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -339,13 +339,13 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new SimpleCondition<Event>() {
+               }).followedByAny("end").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -416,7 +416,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
 
                DataStream<Integer> input = env.fromElements(1, 2);
 
-               Pattern<Integer, ?> pattern = 
Pattern.<Integer>begin("start").followedBy("end").within(Time.days(1));
+               Pattern<Integer, ?> pattern = 
Pattern.<Integer>begin("start").followedByAny("end").within(Time.days(1));
 
                DataStream<Integer> result = CEP.pattern(input, 
pattern).select(new PatternSelectFunction<Integer, Integer>() {
                        @Override
@@ -470,13 +470,13 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new SimpleCondition<Event>() {
+               }).followedByAny("end").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -538,7 +538,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                                        return value.getName().equals("start");
                                }
                        })
-                       .followedBy("middle")
+                       .followedByAny("middle")
                        .where(new SimpleCondition<Event>() {
                                @Override
                                public boolean filter(Event value) throws 
Exception {
@@ -551,7 +551,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                                        return value.getPrice() == 5.0;
                                }
                        })
-                       .followedBy("end").where(new SimpleCondition<Event>() {
+                       .followedByAny("end").where(new 
SimpleCondition<Event>() {
 
                                @Override
                                public boolean filter(Event value) throws 
Exception {
@@ -623,13 +623,13 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
-               }).followedBy("middle").where(new SimpleCondition<Event>() {
+               }).followedByAny("middle").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).followedBy("end").where(new SimpleCondition<Event>() {
+               }).followedByAny("end").where(new SimpleCondition<Event>() {
 
                        @Override
                        public boolean filter(Event value) throws Exception {

Reply via email to