[FLINK-6244] [cep] Emit timeouted Patterns as Side Output

This closes #4320


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ed5815e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ed5815e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ed5815e

Branch: refs/heads/master
Commit: 6ed5815e8b3f06b95470d1a4598fede0a9be9280
Parents: 9995588
Author: Dawid Wysakowicz <dwysakow...@apache.org>
Authored: Fri Aug 11 11:56:28 2017 +0200
Committer: Dawid Wysakowicz <dwysakow...@apache.org>
Committed: Wed Aug 23 08:35:29 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  54 ++-
 .../apache/flink/cep/scala/PatternStream.scala  | 231 ++++++++---
 ...StreamScalaJavaAPIInteroperabilityTest.scala |  45 +-
 .../org/apache/flink/cep/PatternStream.java     | 412 +++++++++++--------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |   3 +-
 .../AbstractKeyedCEPPatternOperator.java        |  45 +-
 .../flink/cep/operator/CEPOperatorUtils.java    | 302 ++++++++++----
 .../cep/operator/FlatSelectCepOperator.java     |  67 +++
 .../operator/FlatSelectTimeoutCepOperator.java  | 130 ++++++
 .../cep/operator/KeyedCEPPatternOperator.java   |  83 ----
 .../flink/cep/operator/SelectCepOperator.java   |  56 +++
 .../cep/operator/SelectTimeoutCepOperator.java  | 119 ++++++
 .../TimeoutKeyedCEPPatternOperator.java         |  92 -----
 .../TimestampedSideOutputCollector.java         |  82 ++++
 .../cep/operator/CEPMigration11to13Test.java    |  61 +--
 .../flink/cep/operator/CEPMigrationTest.java    |  66 +--
 .../flink/cep/operator/CEPOperatorTest.java     | 285 +++++--------
 .../flink/cep/operator/CEPRescalingTest.java    |   8 +-
 .../cep/operator/CepOperatorTestUtilities.java  | 113 +++++
 19 files changed, 1441 insertions(+), 813 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index bddb9b2..4b13bb3 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1316,63 +1316,75 @@ and `flatSelect` API calls allow a timeout handler to 
be specified. This timeout
 partial event sequence. The timeout handler receives all the events that have 
been matched so far by the pattern, and
 the timestamp when the timeout was detected.
 
+In order to treat partial patterns, the `select` and `flatSelect` API calls 
offer an overloaded version which takes as
+parameters
+
+ * `PatternTimeoutFunction`/`PatternFlatTimeoutFunction`
+ * [OutputTag]({{ site.baseurl }}/dev/stream/side_output.html) for the side 
output in which the timeouted matches will be returned
+ * and the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
+
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-In order to treat partial patterns, the `select` and `flatSelect` API calls 
offer an overloaded version which takes as
-the first parameter a `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` 
and as second parameter the known
-`PatternSelectFunction`/`PatternFlatSelectFunction`. The return type of the 
timeout function can be different from the
-select function. The timeout event and the select event are wrapped in 
`Either.Left` and `Either.Right` respectively
-so that the resulting data stream is of type `org.apache.flink.types.Either`.
 
-{% highlight java %}
+~~~java
 PatternStream<Event> patternStream = CEP.pattern(input, pattern);
 
-DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(
+OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
+
+SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
     new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
+    outputTag,
     new PatternSelectFunction<Event, ComplexEvent>() {...}
 );
 
-DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = 
patternStream.flatSelect(
+DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);
+
+SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
     new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
+    outputTag,
     new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
 );
-{% endhighlight %}
+
+DataStream<TimeoutEvent> timeoutFlatResult = 
flatResult.getSideOutput(outputTag);
+~~~
 
 </div>
 
 <div data-lang="scala" markdown="1">
-In order to treat partial patterns, the `select` API call offers an overloaded 
version which takes as the first parameter a timeout function and as second 
parameter a selection function.
-The timeout function is called with a map of string-event pairs of the partial 
match which has timed out and a long indicating when the timeout occurred.
-The string is defined by the name of the pattern to which the event has been 
matched.
-The timeout function returns exactly one result per call.
-The return type of the timeout function can be different from the select 
function.
-The timeout event and the select event are wrapped in `Left` and `Right` 
respectively so that the resulting data stream is of type `Either`.
 
-{% highlight scala %}
+~~~scala
 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 
-DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
+val outputTag = OutputTag[String]("side-output")
+
+val result: SingleOutputStreamOperator[ComplexEvent] = 
patternStream.select(outputTag){
     (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
 } {
     pattern: Map[String, Iterable[Event]] => ComplexEvent()
 }
-{% endhighlight %}
+
+val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag);
+~~~
 
 The `flatSelect` API call offers the same overloaded version which takes as 
the first parameter a timeout function and as second parameter a selection 
function.
 In contrast to the `select` functions, the `flatSelect` functions are called 
with a `Collector`.
 The collector can be used to emit an arbitrary number of events.
 
-{% highlight scala %}
+~~~scala
 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 
-DataStream[Either[TimeoutEvent, ComplexEvent]] result = 
patternStream.flatSelect{
+val outputTag = OutputTag[String]("side-output")
+
+val result: SingleOutputStreamOperator[ComplexEvent] = 
patternStream.flatSelect(outputTag){
     (pattern: Map[String, Iterable[Event]], timestamp: Long, out: 
Collector[TimeoutEvent]) =>
         out.collect(TimeoutEvent())
 } {
     (pattern: mutable.Map[String, Iterable[Event]], out: 
Collector[ComplexEvent]) =>
         out.collect(ComplexEvent())
 }
-{% endhighlight %}
+
+val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag);
+~~~
 
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index d270ef7..eacfa87 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -17,19 +17,14 @@
  */
 package org.apache.flink.cep.scala
 
-import java.util.{List => JList, Map => JMap}
+import java.util.{UUID, List => JList, Map => JMap}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, 
PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, 
PatternStream => JPatternStream}
 import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, 
PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, 
PatternStream => JPatternStream}
 import org.apache.flink.streaming.api.scala.{asScalaStream, _}
 import org.apache.flink.util.Collector
-import org.apache.flink.types.{Either => FEither}
-import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
-import java.lang.{Long => JLong}
-
-import org.apache.flink.cep.operator.CEPOperatorUtils
-import org.apache.flink.cep.scala.pattern.Pattern
 
 import scala.collection.Map
 
@@ -84,37 +79,54 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     *                               pattern sequence.
     * @tparam L Type of the resulting timeout event
     * @tparam R Type of the resulting event
+    * @deprecated Use the version that returns timeouted events as a 
side-output
     * @return Data stream of either type which contains the resulting events 
and resulting timeout
     *         events.
     */
+  @deprecated
   def select[L: TypeInformation, R: TypeInformation](
     patternTimeoutFunction: PatternTimeoutFunction[T, L],
     patternSelectFunction: PatternSelectFunction[T, R])
   : DataStream[Either[L, R]] = {
+    val outputTag = OutputTag[L](UUID.randomUUID().toString)
+    val mainStream = select(outputTag, patternTimeoutFunction, 
patternSelectFunction)
+    mainStream.connect(mainStream.getSideOutput[L](outputTag)).map(r => 
Right(r), l => Left(l))
+  }
 
-    val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
-      jPatternStream.getInputStream,
-      jPatternStream.getPattern,
-      jPatternStream.getComparator)
-
+  /**
+    * Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+    * provided [[PatternSelectFunction]] is called. The pattern select 
function can produce
+    * exactly one resulting element.
+    *
+    * Additionally a timeout function is applied to partial event patterns 
which have timed out. For
+    * each partial pattern sequence the provided [[PatternTimeoutFunction]] is 
called. The pattern
+    * timeout function has to produce exactly one resulting timeout event.
+    *
+    * You can get the stream of timeouted matches using 
[[DataStream.getSideOutput()]] on the
+    * [[DataStream]] resulting from the windowed operation with the same 
[[OutputTag]].
+    *
+    * @param outputTag [[OutputTag]] that identifies side output with 
timeouted patterns
+    * @param patternTimeoutFunction The pattern timeout function which is 
called for each partial
+    *                               pattern sequence which has timed out.
+    * @param patternSelectFunction  The pattern select function which is 
called for each detected
+    *                               pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream which contains the resulting elements with the 
resulting timeout elements
+    *         in a side output.
+    */
+  def select[L: TypeInformation, R: TypeInformation](
+    outputTag: OutputTag[L],
+    patternTimeoutFunction: PatternTimeoutFunction[T, L],
+    patternSelectFunction: PatternSelectFunction[T, R])
+  : DataStream[R] = {
     val cleanedSelect = cleanClosure(patternSelectFunction)
     val cleanedTimeout = cleanClosure(patternTimeoutFunction)
 
-    implicit val eitherTypeInfo = createTypeInformation[Either[L, R]]
-
-    asScalaStream(patternStream).map[Either[L, R]] {
-     input: FEither[FTuple2[JMap[String, JList[T]], JLong], JMap[String, 
JList[T]]] =>
-       if (input.isLeft) {
-         val timeout = input.left()
-         val timeoutEvent = cleanedTimeout.timeout(timeout.f0, timeout.f1)
-         val t = Left[L, R](timeoutEvent)
-         t
-       } else {
-         val event = cleanedSelect.select(input.right())
-         val t = Right[L, R](event)
-         t
-       }
-    }
+    asScalaStream(
+      jPatternStream
+      .select(outputTag, cleanedTimeout, implicitly[TypeInformation[R]], 
cleanedSelect)
+    )
   }
 
   /**
@@ -151,44 +163,58 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) 
{
     *                                   detected pattern sequence.
     * @tparam L Type of the resulting timeout event
     * @tparam R Type of the resulting event
+    * @deprecated Use the version that returns timeouted events as a 
side-output
     * @return Data stream of either type which contains the resulting events 
and the resulting
     *         timeout events wrapped in a [[Either]] type.
     */
+  @deprecated
   def flatSelect[L: TypeInformation, R: TypeInformation](
     patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
     patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
   : DataStream[Either[L, R]] = {
-    val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
-      jPatternStream.getInputStream,
-      jPatternStream.getPattern,
-      jPatternStream.getComparator
-    )
-
-    val cleanedSelect = cleanClosure(patternFlatSelectFunction)
-    val cleanedTimeout = cleanClosure(patternFlatTimeoutFunction)
 
-    implicit val eitherTypeInfo = createTypeInformation[Either[L, R]]
-
-    asScalaStream(patternStream).flatMap[Either[L, R]] {
-      (input: FEither[FTuple2[JMap[String, JList[T]], JLong], JMap[String, 
JList[T]]],
-        collector: Collector[Either[L, R]]) =>
-
-        if (input.isLeft()) {
-          val timeout = input.left()
+    val outputTag = OutputTag[L]("dummy-timeouted")
+    val mainStream = flatSelect(outputTag, patternFlatTimeoutFunction, 
patternFlatSelectFunction)
+    mainStream.connect(mainStream.getSideOutput[L](outputTag)).map(r => 
Right(r), l => Left(l))
+  }
 
-          cleanedTimeout.timeout(timeout.f0, timeout.f1, new Collector[L]() {
-            override def collect(record: L): Unit = 
collector.collect(Left(record))
+  /**
+    * Applies a flat select function to the detected pattern sequence. For 
each pattern sequence
+    * the provided [[PatternFlatSelectFunction]] is called. The pattern flat 
select function can
+    * produce an arbitrary number of resulting elements.
+    *
+    * Additionally a timeout function is applied to partial event patterns 
which have timed out. For
+    * each partial pattern sequence the provided 
[[PatternFlatTimeoutFunction]] is called. The
+    * pattern timeout function can produce an arbitrary number of resulting 
timeout events.
+    *
+    * You can get the stream of timeouted matches using 
[[DataStream.getSideOutput()]] on the
+    * [[DataStream]] resulting from the windowed operation with the same 
[[OutputTag]].
+    *
+    * @param outputTag [[OutputTag]] that identifies side output with 
timeouted patterns
+    * @param patternFlatTimeoutFunction The pattern flat timeout function 
which is called for each
+    *                                   partially matched pattern sequence 
which has timed out.
+    * @param patternFlatSelectFunction  The pattern flat select function which 
is called for each
+    *                                   detected pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream which contains the resulting elements with the 
resulting timeout elements
+    *         in a side output.
+    */
+  def flatSelect[L: TypeInformation, R: TypeInformation](
+    outputTag: OutputTag[L],
+    patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
+    patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
+  : DataStream[R] = {
 
-            override def close(): Unit = collector.close()
-          })
-        } else {
-          cleanedSelect.flatSelect(input.right, new Collector[R]() {
-            override def collect(record: R): Unit = 
collector.collect(Right(record))
+    val cleanedSelect = cleanClosure(patternFlatSelectFunction)
+    val cleanedTimeout = cleanClosure(patternFlatTimeoutFunction)
 
-            override def close(): Unit = collector.close()
-          })
-        }
-    }
+    asScalaStream(
+      jPatternStream.flatSelect(
+        outputTag,
+        cleanedTimeout,
+        implicitly[TypeInformation[R]],
+        cleanedSelect))
   }
 
   /**
@@ -228,9 +254,11 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     *                               pattern sequence.
     * @tparam L Type of the resulting timeout event
     * @tparam R Type of the resulting event
+    * @deprecated Use the version that returns timeouted events as a 
side-output
     * @return Data stream of either type which contain the resulting events 
and resulting timeout
     *         events.
     */
+  @deprecated
   def select[L: TypeInformation, R: TypeInformation](
       patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L) (
       patternSelectFunction: Map[String, Iterable[T]] => R)
@@ -252,6 +280,48 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   }
 
   /**
+    * Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+    * provided [[PatternSelectFunction]] is called. The pattern select 
function can produce
+    * exactly one resulting element.
+    *
+    * Additionally a timeout function is applied to partial event patterns 
which have timed out. For
+    * each partial pattern sequence the provided [[PatternTimeoutFunction]] is 
called. The pattern
+    * timeout function has to produce exactly one resulting element.
+    *
+    * You can get the stream of timeouted matches using 
[[DataStream.getSideOutput()]] on the
+    * [[DataStream]] resulting from the windowed operation with the same 
[[OutputTag]].
+    *
+    * @param outputTag [[OutputTag]] that identifies side output with 
timeouted patterns
+    * @param patternTimeoutFunction The pattern timeout function which is 
called for each partial
+    *                               pattern sequence which has timed out.
+    * @param patternSelectFunction  The pattern select function which is 
called for each detected
+    *                               pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contain the resulting events 
and resulting timeout
+    *         events.
+    */
+  def select[L: TypeInformation, R: TypeInformation](outputTag: OutputTag[L])(
+    patternTimeoutFunction: (Map[String, Iterable[T]], Long) => L) (
+    patternSelectFunction: Map[String, Iterable[T]] => R)
+  : DataStream[R] = {
+
+    val cleanSelectFun = cleanClosure(patternSelectFunction)
+    val cleanTimeoutFun = cleanClosure(patternTimeoutFunction)
+
+    val patternSelectFun = new PatternSelectFunction[T, R] {
+      override def select(pattern: JMap[String, JList[T]]): R =
+        cleanSelectFun(mapToScala(pattern))
+    }
+    val patternTimeoutFun = new PatternTimeoutFunction[T, L] {
+      override def timeout(pattern: JMap[String, JList[T]], timeoutTimestamp: 
Long): L =
+        cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp)
+    }
+
+    select(outputTag, patternTimeoutFun, patternSelectFun)
+  }
+
+  /**
     * Applies a flat select function to the detected pattern sequence. For 
each pattern sequence
     * the provided [[PatternFlatSelectFunction]] is called. The pattern flat 
select function
     * can produce an arbitrary number of resulting elements.
@@ -292,9 +362,11 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     *                                   detected pattern sequence.
     * @tparam L Type of the resulting timeout event
     * @tparam R Type of the resulting event
+    * @deprecated Use the version that returns timeouted events as a 
side-output
     * @return Data stream of either type which contains the resulting events 
and the resulting
     *         timeout events wrapped in a [[Either]] type.
     */
+  @deprecated
   def flatSelect[L: TypeInformation, R: TypeInformation](
       patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, 
Collector[L]) => Unit) (
       patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => 
Unit)
@@ -319,6 +391,53 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
     flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
   }
+
+  /**
+    * Applies a flat select function to the detected pattern sequence. For 
each pattern sequence
+    * the provided [[PatternFlatSelectFunction]] is called. The pattern flat 
select function can
+    * produce an arbitrary number of resulting elements.
+    *
+    * Additionally a timeout function is applied to partial event patterns 
which have timed out. For
+    * each partial pattern sequence the provided 
[[PatternFlatTimeoutFunction]] is called. The
+    * pattern timeout function can produce an arbitrary number of resulting 
timeout events.
+    *
+    * You can get the stream of timeouted matches using 
[[DataStream.getSideOutput()]] on the
+    * [[DataStream]] resulting from the windowed operation with the same 
[[OutputTag]].
+    *
+    * @param outputTag [[OutputTag]] that identifies side output with 
timeouted patterns
+    * @param patternFlatTimeoutFunction The pattern flat timeout function 
which is called for each
+    *                                   partially matched pattern sequence 
which has timed out.
+    * @param patternFlatSelectFunction  The pattern flat select function which 
is called for each
+    *                                   detected pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contains the resulting events 
and the resulting
+    *         timeout events wrapped in a [[Either]] type.
+    */
+  def flatSelect[L: TypeInformation, R: TypeInformation](outputTag: 
OutputTag[L])(
+    patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) 
=> Unit) (
+    patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => 
Unit)
+  : DataStream[R] = {
+
+    val cleanSelectFun = cleanClosure(patternFlatSelectFunction)
+    val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction)
+
+    val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] {
+      override def flatSelect(pattern: JMap[String, JList[T]], out: 
Collector[R]): Unit =
+        cleanSelectFun(mapToScala(pattern), out)
+    }
+
+    val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] {
+      override def timeout(
+        pattern: JMap[String, JList[T]],
+        timeoutTimestamp: Long, out: Collector[L])
+      : Unit = {
+        cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out)
+      }
+    }
+
+    flatSelect(outputTag, patternFlatTimeoutFun, patternFlatSelectFun)
+  }
 }
 
 object PatternStream {
@@ -328,7 +447,7 @@ object PatternStream {
     * @tparam T Type of the events
     * @return A new pattern stream wrapping the pattern stream from Java APU
     */
-  def apply[T](jPatternStream: JPatternStream[T]) = {
+  def apply[T](jPatternStream: JPatternStream[T]): PatternStream[T] = {
     new PatternStream[T](jPatternStream)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
index e2161a0..f3371c8 100644
--- 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
+++ 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
@@ -21,15 +21,17 @@ import 
org.apache.flink.api.common.functions.util.ListCollector
 import org.apache.flink.cep.scala.pattern.Pattern
 import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.streaming.api.transformations.{OneInputTransformation, 
TwoInputTransformation}
 import org.apache.flink.util.{Collector, TestLogger}
 import org.apache.flink.types.{Either => FEither}
 import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
-
 import java.lang.{Long => JLong}
 import java.util.{Map => JMap}
 import java.util.{List => JList}
 
+import org.apache.flink.cep.operator.{FlatSelectCepOperator, 
FlatSelectTimeoutCepOperator, SelectCepOperator}
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+
 import scala.collection.JavaConverters._
 import scala.collection.Map
 import org.junit.Assert._
@@ -51,8 +53,8 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends 
TestLogger {
         assertEquals(param, pattern)
         param.get("begin").get(0)
       })
-    val out = extractUserFunction[StreamMap[JMap[String, JList[(Int, Int)]], 
(Int, Int)]](result)
-      .getUserFunction.map(param.mapValues(_.asJava).asJava)
+    val out = extractUserFunction[SelectCepOperator[(Int, Int), Byte, (Int, 
Int)]](result)
+      .getUserFunction.select(param.mapValues(_.asJava).asJava)
     //verifies output parameter forwarding
     assertEquals(param.get("begin").get(0), out)
   }
@@ -77,8 +79,8 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends 
TestLogger {
         out.collect(pattern.get("begin").get.head)
       })
 
-    extractUserFunction[StreamFlatMap[java.util.Map[String, JList[List[Int]]], 
List[Int]]](result).
-      getUserFunction.flatMap(inParam.mapValues(_.asJava).asJava, outParam)
+    extractUserFunction[FlatSelectCepOperator[List[Int], Byte, 
List[Int]]](result).
+      getUserFunction.flatSelect(inParam.mapValues(_.asJava).asJava, outParam)
     //verify output parameter forwarding and that flatMap function was 
actually called
     assertEquals(inList, outList.get(0))
   }
@@ -96,28 +98,26 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends 
TestLogger {
     val expectedOutput = List(Right("match"), Right("barfoo"), 
Left("timeout"), Left("barfoo"))
       .asJava
 
-    val result: DataStream[Either[String, String]] = pStream.flatSelect {
-        (pattern: Map[String, Iterable[String]], timestamp: Long, out: 
Collector[String]) =>
-          out.collect("timeout")
-          out.collect(pattern("begin").head)
+    val outputTag = OutputTag[Either[String, String]]("timeouted")
+    val result: DataStream[Either[String, String]] = 
pStream.flatSelect(outputTag) {
+        (pattern: Map[String, Iterable[String]], timestamp: Long,
+         out: Collector[Either[String, String]]) =>
+          out.collect(Left("timeout"))
+          out.collect(Left(pattern("begin").head))
       } {
-        (pattern: Map[String, Iterable[String]], out: Collector[String]) =>
+        (pattern: Map[String, Iterable[String]], out: Collector[Either[String, 
String]]) =>
           //verifies input parameter forwarding
           assertEquals(inParam, pattern)
-          out.collect("match")
-          out.collect(pattern("begin").head)
+          out.collect(Right("match"))
+          out.collect(Right(pattern("begin").head))
       }
 
-    val fun = extractUserFunction[
-      StreamFlatMap[
-        FEither[
-          FTuple2[JMap[String, JList[String]], JLong],
-          JMap[String, JList[String]]],
-        Either[String, String]]](result)
+    val fun = extractUserFunction[FlatSelectTimeoutCepOperator[String, 
Either[String, String],
+      Either[String, String], Byte]](
+      result).getUserFunction
 
-    
fun.getUserFunction.flatMap(FEither.Right(inParam.mapValues(_.asJava).asJava), 
output)
-    
fun.getUserFunction.flatMap(FEither.Left(FTuple2.of(inParam.mapValues(_.asJava).asJava,
 42L)),
-                                output)
+    fun.getFlatSelectFunction.flatSelect(inParam.mapValues(_.asJava).asJava, 
output)
+    fun.getFlatTimeoutFunction.timeout(inParam.mapValues(_.asJava).asJava, 
42L, output)
 
     assertEquals(expectedOutput, outList)
   }
@@ -129,4 +129,5 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends 
TestLogger {
       .getOperator
       .asInstanceOf[T]
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 555d270..6380375 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -18,21 +18,18 @@
 
 package org.apache.flink.cep;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.cep.operator.CEPOperatorUtils;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.types.Either;
-import org.apache.flink.util.Collector;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Stream abstraction for CEP pattern detection. A pattern stream is a stream 
which emits detected
@@ -109,6 +106,16 @@ public class PatternStream<T> {
        }
 
        /**
+        * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
+        * on the given function if closure cleaning is enabled in the {@link 
ExecutionConfig}.
+        *
+        * @return The cleaned Function
+        */
+       private  <F> F clean(F f) {
+               return inputStream.getExecutionEnvironment().clean(f);
+       }
+
+       /**
         * Applies a select function to the detected pattern sequence. For each 
pattern sequence the
         * provided {@link PatternSelectFunction} is called. The pattern select 
function can produce
         * exactly one resulting element.
@@ -121,13 +128,94 @@ public class PatternStream<T> {
         *         function.
         */
        public <R> SingleOutputStreamOperator<R> select(final 
PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> 
outTypeInfo) {
-               SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
-                               
CEPOperatorUtils.createPatternStream(inputStream, pattern, comparator);
+               return CEPOperatorUtils.createPatternStream(inputStream, 
pattern, comparator, clean(patternSelectFunction), outTypeInfo);
+       }
 
-               return patternStream.map(
-                       new PatternSelectMapper<>(
-                               
patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
-                       .returns(outTypeInfo);
+       /**
+        * Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+        * provided {@link PatternSelectFunction} is called. The pattern select 
function can produce
+        * exactly one resulting element.
+        *
+        * <p>Applies a timeout function to a partial pattern sequence which 
has timed out. For each
+        * partial pattern sequence the provided {@link PatternTimeoutFunction} 
is called. The pattern
+        * timeout function can produce exactly one resulting element.
+        *
+        * <p>You can get the stream of timed-out data resulting from the
+        * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
+        * {@link SingleOutputStreamOperator} resulting from the select 
operation
+        * with the same {@link OutputTag}.
+        *
+        * @param timeoutOutputTag {@link OutputTag} that identifies side 
output with timeouted patterns
+        * @param patternTimeoutFunction The pattern timeout function which is 
called for each partial
+        *                               pattern sequence which has timed out.
+        * @param patternSelectFunction The pattern select function which is 
called for each detected
+        *                              pattern sequence.
+        * @param <L> Type of the resulting timeout elements
+        * @param <R> Type of the resulting elements
+        * @return {@link DataStream} which contains the resulting elements 
with the resulting timeout
+        * elements in a side output.
+        */
+       public <L, R> SingleOutputStreamOperator<R> select(
+               final OutputTag<L> timeoutOutputTag,
+               final PatternTimeoutFunction<T, L> patternTimeoutFunction,
+               final PatternSelectFunction<T, R> patternSelectFunction) {
+
+               TypeInformation<R> rightTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
+                       patternSelectFunction,
+                       PatternSelectFunction.class,
+                       0,
+                       1,
+                       new int[]{0, 1, 0},
+                       new int[]{},
+                       inputStream.getType(),
+                       null,
+                       false);
+
+               return select(
+                       timeoutOutputTag,
+                       patternTimeoutFunction,
+                       rightTypeInfo,
+                       patternSelectFunction);
+       }
+
+       /**
+        * Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+        * provided {@link PatternSelectFunction} is called. The pattern select 
function can produce
+        * exactly one resulting element.
+        *
+        * <p>Applies a timeout function to a partial pattern sequence which 
has timed out. For each
+        * partial pattern sequence the provided {@link PatternTimeoutFunction} 
is called. The pattern
+        * timeout function can produce exactly one resulting element.
+        *
+        * <p>You can get the stream of timed-out data resulting from the
+        * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
+        * {@link SingleOutputStreamOperator} resulting from the select 
operation
+        * with the same {@link OutputTag}.
+        *
+        * @param timeoutOutputTag {@link OutputTag} that identifies side 
output with timeouted patterns
+        * @param patternTimeoutFunction The pattern timeout function which is 
called for each partial
+        *                               pattern sequence which has timed out.
+        * @param outTypeInfo Explicit specification of output type.
+        * @param patternSelectFunction The pattern select function which is 
called for each detected
+        *                              pattern sequence.
+        * @param <L> Type of the resulting timeout elements
+        * @param <R> Type of the resulting elements
+        * @return {@link DataStream} which contains the resulting elements 
with the resulting timeout
+        * elements in a side output.
+        */
+       public <L, R> SingleOutputStreamOperator<R> select(
+                       final OutputTag<L> timeoutOutputTag,
+                       final PatternTimeoutFunction<T, L> 
patternTimeoutFunction,
+                       final TypeInformation<R> outTypeInfo,
+                       final PatternSelectFunction<T, R> 
patternSelectFunction) {
+               return CEPOperatorUtils.createTimeoutPatternStream(
+                       inputStream,
+                       pattern,
+                       comparator,
+                       clean(patternSelectFunction),
+                       outTypeInfo,
+                       timeoutOutputTag,
+                       clean(patternTimeoutFunction));
        }
 
        /**
@@ -145,19 +233,21 @@ public class PatternStream<T> {
         *                              pattern sequence.
         * @param <L> Type of the resulting timeout elements
         * @param <R> Type of the resulting elements
+        *
+        * @deprecated Use {@link PatternStream#select(OutputTag, 
PatternTimeoutFunction, PatternSelectFunction)}
+        * that returns timeouted events as a side-output
+        *
         * @return {@link DataStream} which contains the resulting elements or 
the resulting timeout
         * elements wrapped in an {@link Either} type.
         */
+       @Deprecated
        public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
                final PatternTimeoutFunction<T, L> patternTimeoutFunction,
                final PatternSelectFunction<T, R> patternSelectFunction) {
 
-               SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, 
Long>, Map<String, List<T>>>> patternStream =
-                               
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, comparator);
-
-               TypeInformation<L> leftTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
-                       patternTimeoutFunction,
-                       PatternTimeoutFunction.class,
+               TypeInformation<R> rightTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
+                       patternSelectFunction,
+                       PatternSelectFunction.class,
                        0,
                        1,
                        new int[]{0, 1, 0},
@@ -166,9 +256,9 @@ public class PatternStream<T> {
                        null,
                        false);
 
-               TypeInformation<R> rightTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
-                       patternSelectFunction,
-                       PatternSelectFunction.class,
+               TypeInformation<L> leftTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
+                       patternTimeoutFunction,
+                       PatternTimeoutFunction.class,
                        0,
                        1,
                        new int[]{0, 1, 0},
@@ -177,14 +267,22 @@ public class PatternStream<T> {
                        null,
                        false);
 
+               final OutputTag<L> outputTag = new 
OutputTag<L>("dummy-timeouted", leftTypeInfo);
+
+               final SingleOutputStreamOperator<R> mainStream = 
CEPOperatorUtils.createTimeoutPatternStream(
+                       inputStream,
+                       pattern,
+                       comparator,
+                       clean(patternSelectFunction),
+                       rightTypeInfo,
+                       outputTag,
+                       clean(patternTimeoutFunction));
+
+               final DataStream<L> timeoutedStream = 
mainStream.getSideOutput(outputTag);
+
                TypeInformation<Either<L, R>> outTypeInfo = new 
EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
 
-               return patternStream.map(
-                       new PatternSelectTimeoutMapper<>(
-                               
patternStream.getExecutionEnvironment().clean(patternSelectFunction),
-                               
patternStream.getExecutionEnvironment().clean(patternTimeoutFunction)
-                       )
-               ).returns(outTypeInfo);
+               return mainStream.connect(timeoutedStream).map(new 
CoMapTimeout<>()).returns(outTypeInfo);
        }
 
        /**
@@ -227,14 +325,99 @@ public class PatternStream<T> {
         * @return {@link DataStream} which contains the resulting elements 
from the pattern flat select
         *         function.
         */
-       public <R> SingleOutputStreamOperator<R> flatSelect(final 
PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> 
outTypeInfo) {
-               SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
-                               
CEPOperatorUtils.createPatternStream(inputStream, pattern, comparator);
-
-               return patternStream.flatMap(
-                       new PatternFlatSelectMapper<>(
-                               
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
-                       )).returns(outTypeInfo);
+       public <R> SingleOutputStreamOperator<R> flatSelect(
+                       final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction,
+                       final TypeInformation<R> outTypeInfo) {
+               return CEPOperatorUtils.createPatternStream(
+                       inputStream,
+                       pattern,
+                       comparator,
+                       clean(patternFlatSelectFunction),
+                       outTypeInfo);
+       }
+
+       /**
+        * Applies a flat select function to the detected pattern sequence. For 
each pattern sequence the
+        * provided {@link PatternFlatSelectFunction} is called. The pattern 
select function can produce
+        * exactly one resulting element.
+        *
+        * <p>Applies a timeout function to a partial pattern sequence which 
has timed out. For each
+        * partial pattern sequence the provided {@link 
PatternFlatTimeoutFunction} is called. The pattern
+        * timeout function can produce exactly one resulting element.
+        *
+        * <p>You can get the stream of timed-out data resulting from the
+        * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
+        * {@link SingleOutputStreamOperator} resulting from the select 
operation
+        * with the same {@link OutputTag}.
+        *
+        * @param timeoutOutputTag {@link OutputTag} that identifies side 
output with timeouted patterns
+        * @param patternFlatTimeoutFunction The pattern timeout function which 
is called for each partial
+        *                               pattern sequence which has timed out.
+        * @param patternFlatSelectFunction The pattern select function which 
is called for each detected
+        *                              pattern sequence.
+        * @param <L> Type of the resulting timeout elements
+        * @param <R> Type of the resulting elements
+        * @return {@link DataStream} which contains the resulting elements 
with the resulting timeout
+        * elements in a side output.
+        */
+       public <L, R> SingleOutputStreamOperator<R> flatSelect(
+               final OutputTag<L> timeoutOutputTag,
+               final PatternFlatTimeoutFunction<T, L> 
patternFlatTimeoutFunction,
+               final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction) {
+
+               TypeInformation<R> rightTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
+                       patternFlatSelectFunction,
+                       PatternFlatSelectFunction.class,
+                       0,
+                       1,
+                       new int[]{0, 1, 0},
+                       new int[]{1, 0},
+                       inputStream.getType(),
+                       null,
+                       false);
+
+               return flatSelect(timeoutOutputTag, patternFlatTimeoutFunction, 
rightTypeInfo, patternFlatSelectFunction);
+       }
+
+       /**
+        * Applies a flat select function to the detected pattern sequence. For 
each pattern sequence the
+        * provided {@link PatternFlatSelectFunction} is called. The pattern 
select function can produce
+        * exactly one resulting element.
+        *
+        * <p>Applies a timeout function to a partial pattern sequence which 
has timed out. For each
+        * partial pattern sequence the provided {@link 
PatternFlatTimeoutFunction} is called. The pattern
+        * timeout function can produce exactly one resulting element.
+        *
+        * <p>You can get the stream of timed-out data resulting from the
+        * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
+        * {@link SingleOutputStreamOperator} resulting from the select 
operation
+        * with the same {@link OutputTag}.
+        *
+        * @param timeoutOutputTag {@link OutputTag} that identifies side 
output with timeouted patterns
+        * @param patternFlatTimeoutFunction The pattern timeout function which 
is called for each partial
+        *                               pattern sequence which has timed out.
+        * @param patternFlatSelectFunction The pattern select function which 
is called for each detected
+        *                              pattern sequence.
+        * @param outTypeInfo Explicit specification of output type.
+        * @param <L> Type of the resulting timeout elements
+        * @param <R> Type of the resulting elements
+        * @return {@link DataStream} which contains the resulting elements 
with the resulting timeout
+        * elements in a side output.
+        */
+       public <L, R> SingleOutputStreamOperator<R> flatSelect(
+               final OutputTag<L> timeoutOutputTag,
+               final PatternFlatTimeoutFunction<T, L> 
patternFlatTimeoutFunction,
+               final TypeInformation<R> outTypeInfo,
+               final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction) {
+
+               return CEPOperatorUtils.createTimeoutPatternStream(
+                       inputStream,
+                       pattern,
+                       comparator,
+                       clean(patternFlatSelectFunction),
+                       outTypeInfo,
+                       timeoutOutputTag,
+                       clean(patternFlatTimeoutFunction));
        }
 
        /**
@@ -252,17 +435,19 @@ public class PatternStream<T> {
         *                                  detected pattern sequence.
         * @param <L> Type of the resulting timeout events
         * @param <R> Type of the resulting events
+        *
+        * @deprecated Use {@link PatternStream#flatSelect(OutputTag, 
PatternFlatTimeoutFunction, PatternFlatSelectFunction)}
+        * that returns timeouted events as a side-output
+        *
         * @return {@link DataStream} which contains the resulting events from 
the pattern flat select
         * function or the resulting timeout events from the pattern flat 
timeout function wrapped in an
         * {@link Either} type.
         */
+       @Deprecated
        public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
                final PatternFlatTimeoutFunction<T, L> 
patternFlatTimeoutFunction,
                final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction) {
 
-               SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, 
Long>, Map<String, List<T>>>> patternStream =
-                               
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, comparator);
-
                TypeInformation<L> leftTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternFlatTimeoutFunction,
                        PatternFlatTimeoutFunction.class,
@@ -285,147 +470,40 @@ public class PatternStream<T> {
                        null,
                        false);
 
-               TypeInformation<Either<L, R>> outTypeInfo = new 
EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
-
-               return patternStream.flatMap(
-                       new PatternFlatSelectTimeoutWrapper<>(
-                               
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction),
-                               
patternStream.getExecutionEnvironment().clean(patternFlatTimeoutFunction)
-                       )
-               ).returns(outTypeInfo);
-       }
-
-       /**
-        * Wrapper for a {@link PatternSelectFunction}.
-        *
-        * @param <T> Type of the input elements
-        * @param <R> Type of the resulting elements
-        */
-       private static class PatternSelectMapper<T, R> implements 
MapFunction<Map<String, List<T>>, R> {
-               private static final long serialVersionUID = 
2273300432692943064L;
-
-               private final PatternSelectFunction<T, R> patternSelectFunction;
-
-               public PatternSelectMapper(PatternSelectFunction<T, R> 
patternSelectFunction) {
-                       this.patternSelectFunction = patternSelectFunction;
-               }
-
-               @Override
-               public R map(Map<String, List<T>> value) throws Exception {
-                       return patternSelectFunction.select(value);
-               }
-       }
-
-       private static class PatternSelectTimeoutMapper<T, L, R> implements 
MapFunction<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>, 
Either<L, R>> {
-
-               private static final long serialVersionUID = 
8259477556738887724L;
-
-               private final PatternSelectFunction<T, R> patternSelectFunction;
-               private final PatternTimeoutFunction<T, L> 
patternTimeoutFunction;
-
-               public PatternSelectTimeoutMapper(
-                       PatternSelectFunction<T, R> patternSelectFunction,
-                       PatternTimeoutFunction<T, L> patternTimeoutFunction) {
-
-                       this.patternSelectFunction = patternSelectFunction;
-                       this.patternTimeoutFunction = patternTimeoutFunction;
-               }
-
-               @Override
-               public Either<L, R> map(Either<Tuple2<Map<String, List<T>>, 
Long>, Map<String, List<T>>> value) throws Exception {
-                       if (value.isLeft()) {
-                               Tuple2<Map<String, List<T>>, Long> timeout = 
value.left();
-
-                               return 
Either.Left(patternTimeoutFunction.timeout(timeout.f0, timeout.f1));
-                       } else {
-                               return 
Either.Right(patternSelectFunction.select(value.right()));
-                       }
-               }
-       }
+               final OutputTag<L> outputTag = new 
OutputTag<L>("dummy-timeouted", leftTypeInfo);
 
-       private static class PatternFlatSelectTimeoutWrapper<T, L, R> 
implements FlatMapFunction<Either<Tuple2<Map<String, List<T>>, Long>, 
Map<String, List<T>>>, Either<L, R>> {
+               final SingleOutputStreamOperator<R> mainStream = 
CEPOperatorUtils.createTimeoutPatternStream(
+                       inputStream,
+                       pattern,
+                       comparator,
+                       clean(patternFlatSelectFunction),
+                       rightTypeInfo,
+                       outputTag,
+                       clean(patternFlatTimeoutFunction));
 
-               private static final long serialVersionUID = 
7483674669662261667L;
+               final DataStream<L> timeoutedStream = 
mainStream.getSideOutput(outputTag);
 
-               private final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction;
-               private final PatternFlatTimeoutFunction<T, L> 
patternFlatTimeoutFunction;
-
-               public PatternFlatSelectTimeoutWrapper(
-                       PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction,
-                       PatternFlatTimeoutFunction<T, L> 
patternFlatTimeoutFunction) {
-                       this.patternFlatSelectFunction = 
patternFlatSelectFunction;
-                       this.patternFlatTimeoutFunction = 
patternFlatTimeoutFunction;
-               }
-
-               @Override
-               public void flatMap(Either<Tuple2<Map<String, List<T>>, Long>, 
Map<String, List<T>>> value, Collector<Either<L, R>> out) throws Exception {
-                       if (value.isLeft()) {
-                               Tuple2<Map<String, List<T>>, Long> timeout = 
value.left();
-
-                               patternFlatTimeoutFunction.timeout(timeout.f0, 
timeout.f1, new LeftCollector<>(out));
-                       } else {
-                               
patternFlatSelectFunction.flatSelect(value.right(), new RightCollector(out));
-                       }
-               }
-
-               private static class LeftCollector<L, R> implements 
Collector<L> {
-
-                       private final Collector<Either<L, R>> out;
-
-                       private LeftCollector(Collector<Either<L, R>> out) {
-                               this.out = out;
-                       }
-
-                       @Override
-                       public void collect(L record) {
-                               out.collect(Either.<L, R>Left(record));
-                       }
-
-                       @Override
-                       public void close() {
-                               out.close();
-                       }
-               }
-
-               private static class RightCollector<L, R> implements 
Collector<R> {
-
-                       private final Collector<Either<L, R>> out;
-
-                       private RightCollector(Collector<Either<L, R>> out) {
-                               this.out = out;
-                       }
-
-                       @Override
-                       public void collect(R record) {
-                               out.collect(Either.<L, R>Right(record));
-                       }
+               TypeInformation<Either<L, R>> outTypeInfo = new 
EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
 
-                       @Override
-                       public void close() {
-                               out.close();
-                       }
-               }
+               return mainStream.connect(timeoutedStream).map(new 
CoMapTimeout<>()).returns(outTypeInfo);
        }
 
        /**
-        * Wrapper for a {@link PatternFlatSelectFunction}.
-        *
-        * @param <T> Type of the input elements
-        * @param <R> Type of the resulting elements
+        * Used for joining results from timeout side-output for API backward 
compatibility.
         */
-       private static class PatternFlatSelectMapper<T, R> implements 
FlatMapFunction<Map<String, List<T>>, R> {
+       @Internal
+       public static class CoMapTimeout<R, L> implements CoMapFunction<R, L, 
Either<L, R>> {
 
-               private static final long serialVersionUID = 
-8610796233077989108L;
+               private static final long serialVersionUID = 
2059391566945212552L;
 
-               private final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction;
-
-               public PatternFlatSelectMapper(PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction) {
-                       this.patternFlatSelectFunction = 
patternFlatSelectFunction;
+               @Override
+               public Either<L, R> map1(R value) throws Exception {
+                       return Either.Right(value);
                }
 
                @Override
-               public void flatMap(Map<String, List<T>> value, Collector<R> 
out) throws Exception {
-                       patternFlatSelectFunction.flatSelect(value, out);
+               public Either<L, R> map2(L value) throws Exception {
+                       return Either.Left(value);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 23084ee..3a1f621 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
+import org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -71,7 +72,7 @@ import java.util.Stack;
 /**
  * Non-deterministic finite automaton implementation.
  *
- * <p>The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator 
CEP operator}
+ * <p>The {@link AbstractKeyedCEPPatternOperator CEP operator}
  * keeps one NFA per key, for keyed input streams, and a single global NFA for 
non-keyed ones.
  * When an event gets processed, it updates the NFA's internal state machine.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 66663d2..4c67e9d 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -44,7 +45,7 @@ import 
org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStr
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -60,6 +61,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -78,9 +80,10 @@ import java.util.stream.StreamSupport;
  * @param <IN> Type of the input elements
  * @param <KEY> Type of the key on which the input stream is keyed
  * @param <OUT> Type of the output elements
+ * @param <F> user function that can be applied to matching sequences or 
timeouted sequences
  */
-public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
-       extends AbstractStreamOperator<OUT>
+public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends 
Function>
+       extends AbstractUdfStreamOperator<OUT, F>
        implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, 
VoidNamespace>, CheckpointedRestoringOperator {
 
        private static final long serialVersionUID = -4166778210774160757L;
@@ -126,7 +129,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                        final TypeSerializer<KEY> keySerializer,
                        final NFACompiler.NFAFactory<IN> nfaFactory,
                        final boolean migratingFromOldKeyedOperator,
-                       final EventComparator<IN> comparator) {
+                       final EventComparator<IN> comparator,
+                       final F function) {
+               super(function);
 
                this.inputSerializer = 
Preconditions.checkNotNull(inputSerializer);
                this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
@@ -348,7 +353,18 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
         * @param event The current event to be processed
         * @param timestamp The timestamp of the event
         */
-       protected abstract void processEvent(NFA<IN> nfa, IN event, long 
timestamp);
+       private void processEvent(NFA<IN> nfa, IN event, long timestamp)  {
+               Tuple2<Collection<Map<String, List<IN>>>, 
Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
+                       nfa.process(event, timestamp);
+
+               try {
+                       processMatchedSequences(patterns.f0, timestamp);
+                       processTimeoutedSequence(patterns.f1, timestamp);
+               } catch (Exception e) {
+                       //rethrow as Runtime, to be able to use processEvent in 
Stream.
+                       throw new RuntimeException(e);
+               }
+       }
 
        /**
         * Advances the time for the given NFA to the given timestamp. This can 
lead to pruning and
@@ -357,7 +373,16 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
         * @param nfa to advance the time for
         * @param timestamp to advance the time to
         */
-       protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
+       private void advanceTime(NFA<IN> nfa, long timestamp) throws Exception {
+               processEvent(nfa, null, timestamp);
+       }
+
+       protected abstract void processMatchedSequences(Iterable<Map<String, 
List<IN>>> matchesSequence, long timestamp) throws Exception;
+
+       protected void processTimeoutedSequence(
+                       Iterable<Tuple2<Map<String, List<IN>>, Long>> 
timedOutSequences,
+                       long timestamp) throws Exception {
+       }
 
        //////////////////////                  Backwards Compatibility         
        //////////////////////
 
@@ -606,10 +631,10 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                        ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
 
                                CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                               
previousElemSerializerAndConfig.f0,
-                                               
UnloadableDummyTypeSerializer.class,
-                                               
previousElemSerializerAndConfig.f1,
-                                               elementSerializer);
+                                       previousElemSerializerAndConfig.f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       previousElemSerializerAndConfig.f1,
+                                       elementSerializer);
 
                                if (!compatResult.isRequiresMigration()) {
                                        return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index de2d8f8..a662faf 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -18,28 +18,25 @@
 
 package org.apache.flink.cep.operator;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.NullByteKeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.EitherTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.PatternFlatTimeoutFunction;
+import org.apache.flink.cep.PatternSelectFunction;
 import org.apache.flink.cep.PatternStream;
+import org.apache.flink.cep.PatternTimeoutFunction;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.types.Either;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Utility methods for creating {@link PatternStream}.
@@ -47,99 +44,245 @@ import java.util.Map;
 public class CEPOperatorUtils {
 
        /**
-        * Creates a data stream containing the fully matching event patterns 
of the NFA computation.
+        * Creates a data stream containing results of {@link 
PatternSelectFunction} to fully matching event patterns.
         *
-        * @param <K> Type of the key
-        * @return Data stream containing fully matched event sequences stored 
in a {@link Map}. The
-        * events are indexed by their associated names of the pattern.
+        * @param inputStream stream of input events
+        * @param pattern pattern to be search for in the stream
+        * @param selectFunction function to be applied to matching event 
sequences
+        * @param outTypeInfo output TypeInformation of selectFunction
+        * @param <IN> type of input events
+        * @param <OUT> type of output events
+        * @return Data stream containing fully matched event sequence with 
applied {@link PatternSelectFunction}
         */
-       public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> 
createPatternStream(
-               DataStream<T> inputStream,
-               Pattern<T, ?> pattern,
-               EventComparator<T> comparator) {
-               final TypeSerializer<T> inputSerializer = 
inputStream.getType().createSerializer(inputStream.getExecutionConfig());
-
-               // check whether we use processing time
-               final boolean isProcessingTime = 
inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
-
-               // compile our pattern into a NFAFactory to instantiate NFAs 
later on
-               final NFACompiler.NFAFactory<T> nfaFactory = 
NFACompiler.compileFactory(pattern, inputSerializer, false);
-
-               final SingleOutputStreamOperator<Map<String, List<T>>> 
patternStream;
+       public static <IN, OUT> SingleOutputStreamOperator<OUT> 
createPatternStream(
+                       final DataStream<IN> inputStream,
+                       final Pattern<IN, ?> pattern,
+                       final EventComparator<IN> comparator,
+                       final PatternSelectFunction<IN, OUT> selectFunction,
+                       final TypeInformation<OUT> outTypeInfo) {
+               return createPatternStream(inputStream, pattern, outTypeInfo, 
false, comparator, new OperatorBuilder<IN, OUT>() {
+                       @Override
+                       public <KEY> OneInputStreamOperator<IN, OUT> build(
+                               TypeSerializer<IN> inputSerializer,
+                               boolean isProcessingTime,
+                               TypeSerializer<KEY> keySerializer,
+                               NFACompiler.NFAFactory<IN> nfaFactory,
+                               boolean migratingFromOldKeyedOperator,
+                               EventComparator<IN> comparator) {
+                               return new SelectCepOperator<>(
+                                       inputSerializer,
+                                       isProcessingTime,
+                                       keySerializer,
+                                       nfaFactory,
+                                       migratingFromOldKeyedOperator,
+                                       comparator,
+                                       selectFunction
+                               );
+                       }
 
-               if (inputStream instanceof KeyedStream) {
-                       // We have to use the KeyedCEPPatternOperator which can 
deal with keyed input streams
-                       KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) 
inputStream;
+                       @Override
+                       public String getKeyedOperatorName() {
+                               return "SelectCepOperator";
+                       }
 
-                       TypeSerializer<K> keySerializer = 
keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
+                       @Override
+                       public String getOperatorName() {
+                               return "SelectCepOperator";
+                       }
+               });
+       }
 
-                       patternStream = keyedStream.transform(
-                               "KeyedCEPPatternOperator",
-                               (TypeInformation<Map<String, List<T>>>) 
(TypeInformation<?>) TypeExtractor.getForClass(Map.class),
-                               new KeyedCEPPatternOperator<>(
+       /**
+        * Creates a data stream containing results of {@link 
PatternFlatSelectFunction} to fully matching event patterns.
+        *
+        * @param inputStream stream of input events
+        * @param pattern pattern to be search for in the stream
+        * @param selectFunction function to be applied to matching event 
sequences
+        * @param outTypeInfo output TypeInformation of selectFunction
+        * @param <IN> type of input events
+        * @param <OUT> type of output events
+        * @return Data stream containing fully matched event sequence with 
applied {@link PatternFlatSelectFunction}
+        */
+       public static <IN, OUT> SingleOutputStreamOperator<OUT> 
createPatternStream(
+                       final DataStream<IN> inputStream,
+                       final Pattern<IN, ?> pattern,
+                       final EventComparator<IN> comparator,
+                       final PatternFlatSelectFunction<IN, OUT> selectFunction,
+                       final TypeInformation<OUT> outTypeInfo) {
+               return createPatternStream(inputStream, pattern, outTypeInfo, 
false, comparator, new OperatorBuilder<IN, OUT>() {
+                       @Override
+                       public <KEY> OneInputStreamOperator<IN, OUT> build(
+                               TypeSerializer<IN> inputSerializer,
+                               boolean isProcessingTime,
+                               TypeSerializer<KEY> keySerializer,
+                               NFACompiler.NFAFactory<IN> nfaFactory,
+                               boolean migratingFromOldKeyedOperator,
+                               EventComparator<IN> comparator) {
+                               return new FlatSelectCepOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
                                        keySerializer,
                                        nfaFactory,
-                                       true,
-                                       comparator));
-               } else {
+                                       migratingFromOldKeyedOperator,
+                                       comparator,
+                                       selectFunction
+                               );
+                       }
 
-                       KeySelector<T, Byte> keySelector = new 
NullByteKeySelector<>();
-                       TypeSerializer<Byte> keySerializer = 
ByteSerializer.INSTANCE;
+                       @Override
+                       public String getKeyedOperatorName() {
+                               return "FlatSelectCepOperator";
+                       }
 
-                       patternStream = 
inputStream.keyBy(keySelector).transform(
-                               "CEPPatternOperator",
-                               (TypeInformation<Map<String, List<T>>>) 
(TypeInformation<?>) TypeExtractor.getForClass(Map.class),
-                               new KeyedCEPPatternOperator<>(
+                       @Override
+                       public String getOperatorName() {
+                               return "FlatSelectCepOperator";
+                       }
+               });
+       }
+
+       /**
+        * Creates a data stream containing results of {@link 
PatternFlatSelectFunction} to fully matching event patterns and
+        * also timeouted partially matched with applied {@link 
PatternFlatTimeoutFunction} as a sideoutput.
+        *
+        * @param inputStream stream of input events
+        * @param pattern pattern to be search for in the stream
+        * @param selectFunction function to be applied to matching event 
sequences
+        * @param outTypeInfo output TypeInformation of selectFunction
+        * @param outputTag {@link OutputTag} for a side-output with timeouted 
matches
+        * @param timeoutFunction function to be applied to timeouted event 
sequences
+        * @param <IN> type of input events
+        * @param <OUT1> type of fully matched events
+        * @param <OUT2> type of timeouted events
+        * @return Data stream containing fully matched event sequence with 
applied {@link PatternFlatSelectFunction} that
+        * contains timeouted patterns with applied {@link 
PatternFlatTimeoutFunction} as side-output
+        */
+       public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> 
createTimeoutPatternStream(
+                       final DataStream<IN> inputStream,
+                       final Pattern<IN, ?> pattern,
+                       final EventComparator<IN> comparator,
+                       final PatternFlatSelectFunction<IN, OUT1> 
selectFunction,
+                       final TypeInformation<OUT1> outTypeInfo,
+                       final OutputTag<OUT2> outputTag,
+                       final PatternFlatTimeoutFunction<IN, OUT2> 
timeoutFunction) {
+               return createPatternStream(inputStream, pattern, outTypeInfo, 
true, comparator, new OperatorBuilder<IN, OUT1>() {
+                       @Override
+                       public <KEY> OneInputStreamOperator<IN, OUT1> build(
+                               TypeSerializer<IN> inputSerializer,
+                               boolean isProcessingTime,
+                               TypeSerializer<KEY> keySerializer,
+                               NFACompiler.NFAFactory<IN> nfaFactory,
+                               boolean migratingFromOldKeyedOperator,
+                               EventComparator<IN> comparator) {
+                               return new FlatSelectTimeoutCepOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
                                        keySerializer,
                                        nfaFactory,
-                                       false,
-                                       comparator
-                               )).forceNonParallel();
-               }
+                                       migratingFromOldKeyedOperator,
+                                       comparator,
+                                       selectFunction,
+                                       timeoutFunction,
+                                       outputTag
+                               );
+                       }
 
-               return patternStream;
+                       @Override
+                       public String getKeyedOperatorName() {
+                               return "FlatSelectTimeoutCepOperator";
+                       }
+
+                       @Override
+                       public String getOperatorName() {
+                               return "FlatSelectTimeoutCepOperator";
+                       }
+               });
        }
 
        /**
-        * Creates a data stream containing fully matching event patterns or 
partially matching event
-        * patterns which have timed out. The former are wrapped in a 
Either.Right and the latter in a
-        * Either.Left type.
+        * Creates a data stream containing results of {@link 
PatternSelectFunction} to fully matching event patterns and
+        * also timeouted partially matched with applied {@link 
PatternTimeoutFunction} as a sideoutput.
         *
-        * @param <K> Type of the key
-        * @return Data stream containing fully matched and partially matched 
event sequences wrapped in
-        * a {@link Either} instance.
+        * @param inputStream stream of input events
+        * @param pattern pattern to be search for in the stream
+        * @param selectFunction function to be applied to matching event 
sequences
+        * @param outTypeInfo output TypeInformation of selectFunction
+        * @param outputTag {@link OutputTag} for a side-output with timeouted 
matches
+        * @param timeoutFunction function to be applied to timeouted event 
sequences
+        * @param <IN> type of input events
+        * @param <OUT1> type of fully matched events
+        * @param <OUT2> type of timeouted events
+        * @return Data stream containing fully matched event sequence with 
applied {@link PatternSelectFunction} that
+        * contains timeouted patterns with applied {@link 
PatternTimeoutFunction} as side-output
         */
-       public static <K, T> 
SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, 
Map<String, List<T>>>> createTimeoutPatternStream(
-                       DataStream<T> inputStream, Pattern<T, ?> pattern, 
EventComparator<T> comparator) {
+       public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> 
createTimeoutPatternStream(
+                       final DataStream<IN> inputStream,
+                       final Pattern<IN, ?> pattern,
+                       final EventComparator<IN> comparator,
+                       final PatternSelectFunction<IN, OUT1> selectFunction,
+                       final TypeInformation<OUT1> outTypeInfo,
+                       final OutputTag<OUT2> outputTag,
+                       final PatternTimeoutFunction<IN, OUT2> timeoutFunction) 
{
+               return createPatternStream(inputStream, pattern, outTypeInfo, 
true, comparator, new OperatorBuilder<IN, OUT1>() {
+                       @Override
+                       public <KEY> OneInputStreamOperator<IN, OUT1> build(
+                               TypeSerializer<IN> inputSerializer,
+                               boolean isProcessingTime,
+                               TypeSerializer<KEY> keySerializer,
+                               NFACompiler.NFAFactory<IN> nfaFactory,
+                               boolean migratingFromOldKeyedOperator,
+                               EventComparator<IN> comparator) {
+                               return new SelectTimeoutCepOperator<>(
+                                       inputSerializer,
+                                       isProcessingTime,
+                                       keySerializer,
+                                       nfaFactory,
+                                       migratingFromOldKeyedOperator,
+                                       comparator,
+                                       selectFunction,
+                                       timeoutFunction,
+                                       outputTag
+                               );
+                       }
+
+                       @Override
+                       public String getKeyedOperatorName() {
+                               return "SelectTimeoutCepOperator";
+                       }
 
-               final TypeSerializer<T> inputSerializer = 
inputStream.getType().createSerializer(inputStream.getExecutionConfig());
+                       @Override
+                       public String getOperatorName() {
+                               return "SelectTimeoutCepOperator";
+                       }
+               });
+       }
+
+       private static <IN, OUT, K> SingleOutputStreamOperator<OUT> 
createPatternStream(
+                       final DataStream<IN> inputStream,
+                       final Pattern<IN, ?> pattern,
+                       final TypeInformation<OUT> outTypeInfo,
+                       final boolean timeoutHandling,
+                       final EventComparator<IN> comparator,
+                       final OperatorBuilder<IN, OUT> operatorBuilder) {
+               final TypeSerializer<IN> inputSerializer = 
inputStream.getType().createSerializer(inputStream.getExecutionConfig());
 
                // check whether we use processing time
                final boolean isProcessingTime = 
inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
 
                // compile our pattern into a NFAFactory to instantiate NFAs 
later on
-               final NFACompiler.NFAFactory<T> nfaFactory = 
NFACompiler.compileFactory(pattern, inputSerializer, true);
-
-               final SingleOutputStreamOperator<Either<Tuple2<Map<String, 
List<T>>, Long>, Map<String, List<T>>>> patternStream;
+               final NFACompiler.NFAFactory<IN> nfaFactory = 
NFACompiler.compileFactory(pattern, inputSerializer, timeoutHandling);
 
-               final TypeInformation<Map<String, List<T>>> rightTypeInfo = 
(TypeInformation<Map<String, List<T>>>) (TypeInformation<?>)  
TypeExtractor.getForClass(Map.class);
-               final TypeInformation<Tuple2<Map<String, List<T>>, Long>> 
leftTypeInfo = new TupleTypeInfo<>(rightTypeInfo, BasicTypeInfo.LONG_TYPE_INFO);
-               final TypeInformation<Either<Tuple2<Map<String, List<T>>, 
Long>, Map<String, List<T>>>> eitherTypeInformation = new 
EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
+               final SingleOutputStreamOperator<OUT> patternStream;
 
                if (inputStream instanceof KeyedStream) {
-                       // We have to use the KeyedCEPPatternOperator which can 
deal with keyed input streams
-                       KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) 
inputStream;
+                       KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) 
inputStream;
 
                        TypeSerializer<K> keySerializer = 
keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
 
                        patternStream = keyedStream.transform(
-                               "TimeoutKeyedCEPPatternOperator",
-                               eitherTypeInformation,
-                               new TimeoutKeyedCEPPatternOperator<>(
+                               operatorBuilder.getKeyedOperatorName(),
+                               outTypeInfo,
+                               operatorBuilder.build(
                                        inputSerializer,
                                        isProcessingTime,
                                        keySerializer,
@@ -147,14 +290,13 @@ public class CEPOperatorUtils {
                                        true,
                                        comparator));
                } else {
-
-                       KeySelector<T, Byte> keySelector = new 
NullByteKeySelector<>();
+                       KeySelector<IN, Byte> keySelector = new 
NullByteKeySelector<>();
                        TypeSerializer<Byte> keySerializer = 
ByteSerializer.INSTANCE;
 
                        patternStream = 
inputStream.keyBy(keySelector).transform(
-                               "TimeoutCEPPatternOperator",
-                               eitherTypeInformation,
-                               new TimeoutKeyedCEPPatternOperator<>(
+                               operatorBuilder.getOperatorName(),
+                               outTypeInfo,
+                               operatorBuilder.build(
                                        inputSerializer,
                                        isProcessingTime,
                                        keySerializer,
@@ -166,4 +308,18 @@ public class CEPOperatorUtils {
 
                return patternStream;
        }
+
+       private interface OperatorBuilder<IN, OUT> {
+               <K> OneInputStreamOperator<IN, OUT> build(
+                       TypeSerializer<IN> inputSerializer,
+                       boolean isProcessingTime,
+                       TypeSerializer<K> keySerializer,
+                       NFACompiler.NFAFactory<IN> nfaFactory,
+                       boolean migratingFromOldKeyedOperator,
+                       EventComparator<IN> comparator);
+
+               String getKeyedOperatorName();
+
+               String getOperatorName();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
new file mode 100644
index 0000000..d44794e
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Version of {@link AbstractKeyedCEPPatternOperator} that applies given 
{@link PatternFlatSelectFunction} to fully matched event patterns.
+ *
+ * @param <IN> Type of the input elements
+ * @param <KEY> Type of the key on which the input stream is keyed
+ * @param <OUT> Type of the output elements
+ */
+public class FlatSelectCepOperator<IN, KEY, OUT>
+       extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT, 
PatternFlatSelectFunction<IN, OUT>> {
+       private static final long serialVersionUID = 5845993459551561518L;
+
+       public FlatSelectCepOperator(
+               TypeSerializer<IN> inputSerializer,
+               boolean isProcessingTime,
+               TypeSerializer<KEY> keySerializer,
+               NFACompiler.NFAFactory<IN> nfaFactory,
+               boolean migratingFromOldKeyedOperator,
+               EventComparator<IN> comparator,
+               PatternFlatSelectFunction<IN, OUT> function) {
+               super(inputSerializer, isProcessingTime, keySerializer, 
nfaFactory, migratingFromOldKeyedOperator, comparator, function);
+       }
+
+       private transient TimestampedCollector<OUT> collector;
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               collector = new TimestampedCollector<>(output);
+       }
+
+       @Override
+       protected void processMatchedSequences(Iterable<Map<String, List<IN>>> 
matchesSequence, long timestamp) throws Exception {
+               for (Map<String, List<IN>> match : matchesSequence) {
+                       collector.setAbsoluteTimestamp(timestamp);
+                       getUserFunction().flatSelect(match, collector);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
new file mode 100644
index 0000000..d46761b
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.PatternFlatTimeoutFunction;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Version of {@link AbstractKeyedCEPPatternOperator} that applies given 
{@link PatternFlatSelectFunction} to fully
+ * matched event patterns and {@link PatternFlatTimeoutFunction} to timeouted 
ones. The timeouted elements are returned
+ * as a side-output.
+ *
+ * @param <IN> Type of the input elements
+ * @param <KEY> Type of the key on which the input stream is keyed
+ * @param <OUT1> Type of the output elements
+ * @param <OUT2> Type of the timeouted output elements
+ */
+public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends
+       AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, 
FlatSelectTimeoutCepOperator.FlatSelectWrapper<IN, OUT1, OUT2>> {
+
+       private transient TimestampedCollector<OUT1> collector;
+
+       private transient TimestampedSideOutputCollector<OUT2> 
sideOutputCollector;
+
+       private OutputTag<OUT2> timeoutedOutputTag;
+
+       public FlatSelectTimeoutCepOperator(
+               TypeSerializer<IN> inputSerializer,
+               boolean isProcessingTime,
+               TypeSerializer<KEY> keySerializer,
+               NFACompiler.NFAFactory<IN> nfaFactory,
+               boolean migratingFromOldKeyedOperator,
+               EventComparator<IN> comparator,
+               PatternFlatSelectFunction<IN, OUT1> flatSelectFunction,
+               PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction,
+               OutputTag<OUT2> outputTag) {
+               super(
+                       inputSerializer,
+                       isProcessingTime,
+                       keySerializer,
+                       nfaFactory,
+                       migratingFromOldKeyedOperator,
+                       comparator,
+                       new FlatSelectWrapper<>(flatSelectFunction, 
flatTimeoutFunction));
+               this.timeoutedOutputTag = outputTag;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               collector = new TimestampedCollector<>(output);
+               sideOutputCollector = new 
TimestampedSideOutputCollector<>(timeoutedOutputTag, output);
+       }
+
+       @Override
+       protected void processMatchedSequences(
+               Iterable<Map<String, List<IN>>> matchesSequence,
+               long timestamp) throws Exception {
+               for (Map<String, List<IN>> match : matchesSequence) {
+                       
getUserFunction().getFlatSelectFunction().flatSelect(match, collector);
+               }
+       }
+
+       @Override
+       protected void processTimeoutedSequence(
+               Iterable<Tuple2<Map<String, List<IN>>, Long>> 
timedOutSequences, long timestamp) throws Exception {
+               for (Tuple2<Map<String, List<IN>>, Long> match : 
timedOutSequences) {
+                       sideOutputCollector.setAbsoluteTimestamp(timestamp);
+                       
getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1, 
sideOutputCollector);
+               }
+       }
+
+       /**
+        * Wrapper that enables storing {@link PatternFlatSelectFunction} and 
{@link PatternFlatTimeoutFunction} functions
+        * in one udf.
+        */
+       @Internal
+       public static class FlatSelectWrapper<IN, OUT1, OUT2> implements 
Function {
+
+               private static final long serialVersionUID = 
-8320546120157150202L;
+
+               private PatternFlatSelectFunction<IN, OUT1> flatSelectFunction;
+               private PatternFlatTimeoutFunction<IN, OUT2> 
flatTimeoutFunction;
+
+               @VisibleForTesting
+               public PatternFlatSelectFunction<IN, OUT1> 
getFlatSelectFunction() {
+                       return flatSelectFunction;
+               }
+
+               @VisibleForTesting
+               public PatternFlatTimeoutFunction<IN, OUT2> 
getFlatTimeoutFunction() {
+                       return flatTimeoutFunction;
+               }
+
+               public FlatSelectWrapper(
+                       PatternFlatSelectFunction<IN, OUT1> flatSelectFunction,
+                       PatternFlatTimeoutFunction<IN, OUT2> 
flatTimeoutFunction) {
+                       this.flatSelectFunction = flatSelectFunction;
+                       this.flatTimeoutFunction = flatTimeoutFunction;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
deleted file mode 100644
index 22f9c14..0000000
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * CEP pattern operator which only returns fully matched event patterns stored 
in a {@link Map}. The
- * events are indexed by the event names associated in the pattern 
specification. The operator works
- * on keyed input data.
- *
- * @param <IN> Type of the input events
- * @param <KEY> Type of the key
- */
-public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, List<IN>>> {
-       private static final long serialVersionUID = 5328573789532074581L;
-
-       public KeyedCEPPatternOperator(
-                       TypeSerializer<IN> inputSerializer,
-                       boolean isProcessingTime,
-                       TypeSerializer<KEY> keySerializer,
-                       NFACompiler.NFAFactory<IN> nfaFactory,
-                       boolean migratingFromOldKeyedOperator,
-                       EventComparator<IN> comparator) {
-
-               super(inputSerializer, isProcessingTime, keySerializer, 
nfaFactory, migratingFromOldKeyedOperator, comparator);
-       }
-
-       @Override
-       protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-               Tuple2<Collection<Map<String, List<IN>>>, 
Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
-                       nfa.process(event, timestamp);
-
-               emitMatchedSequences(patterns.f0, timestamp);
-       }
-
-       @Override
-       protected void advanceTime(NFA<IN> nfa, long timestamp) {
-               Tuple2<Collection<Map<String, List<IN>>>, 
Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
-                       nfa.process(null, timestamp);
-
-               emitMatchedSequences(patterns.f0, timestamp);
-       }
-
-       private void emitMatchedSequences(Iterable<Map<String, List<IN>>> 
matchedSequences, long timestamp) {
-               Iterator<Map<String, List<IN>>> iterator = 
matchedSequences.iterator();
-
-               if (iterator.hasNext()) {
-                       StreamRecord<Map<String, List<IN>>> streamRecord = new 
StreamRecord<>(null, timestamp);
-
-                       do {
-                               streamRecord.replace(iterator.next());
-                               output.collect(streamRecord);
-                       } while (iterator.hasNext());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ed5815e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
new file mode 100644
index 0000000..d687c67
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Version of {@link AbstractKeyedCEPPatternOperator} that applies given 
{@link PatternSelectFunction} to fully matched event patterns.
+ *
+ * @param <IN> Type of the input elements
+ * @param <KEY> Type of the key on which the input stream is keyed
+ * @param <OUT> Type of the output elements
+ */
+public class SelectCepOperator<IN, KEY, OUT>
+       extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT, 
PatternSelectFunction<IN, OUT>> {
+       public SelectCepOperator(
+               TypeSerializer<IN> inputSerializer,
+               boolean isProcessingTime,
+               TypeSerializer<KEY> keySerializer,
+               NFACompiler.NFAFactory<IN> nfaFactory,
+               boolean migratingFromOldKeyedOperator,
+               EventComparator<IN> comparator,
+               PatternSelectFunction<IN, OUT> function) {
+               super(inputSerializer, isProcessingTime, keySerializer, 
nfaFactory, migratingFromOldKeyedOperator, comparator, function);
+       }
+
+       @Override
+       protected void processMatchedSequences(Iterable<Map<String, List<IN>>> 
matchesSequence, long timestamp) throws Exception {
+               for (Map<String, List<IN>> match : matchesSequence) {
+                       output.collect(new 
StreamRecord<>(getUserFunction().select(match), timestamp));
+               }
+       }
+}

Reply via email to